나는 싶습니다 await
의 결과에 BlockingCollection<T>.Take()
비동기, 그래서 스레드를 차단하지 않습니다. 다음과 같은 것을 찾고 있습니다.
var item = await blockingCollection.TakeAsync();
나는 이것을 할 수 있다는 것을 안다.
var item = await Task.Run(() => blockingCollection.Take());
그러나 그것은 다른 스레드 (의 ThreadPool
)가 대신 차단 되기 때문에 전체 아이디어를 죽 입니다.
대안이 있습니까?
답변
내가 아는 네 가지 대안이 있습니다.
첫 번째는 비동기 및 작업 을 지원하는 스레드 세이프 대기열을 제공하는 Channels 입니다. 채널은 고도로 최적화되어 있으며 임계 값에 도달하면 일부 항목을 삭제할 수 있습니다.Read
Write
다음은 TPL DataflowBufferBlock<T>
에서 가져온 것 입니다. 당신은 단지 하나의 소비자가있는 경우 사용 하거나 , 또는 단지는 링크 . 자세한 내용 은 내 블로그를 참조하십시오 .OutputAvailableAsync
ReceiveAsync
ActionBlock<T>
마지막 두 가지는 내가 만든 유형이며 AsyncEx 라이브러리 에서 사용할 수 있습니다 .
AsyncCollection<T>
과 async
거의 동일 BlockingCollection<T>
하며 ConcurrentQueue<T>
또는 같은 동시 생산자 / 소비자 컬렉션을 래핑 할 수 ConcurrentBag<T>
있습니다. TakeAsync
컬렉션의 항목을 비동기 적으로 소비 하는 데 사용할 수 있습니다 . 자세한 내용 은 내 블로그를 참조하십시오 .
AsyncProducerConsumerQueue<T>
더 이식 가능하고 async
호환되는 생산자 / 소비자 대기열입니다. 를 사용 DequeueAsync
하여 큐에서 항목을 비동기 적으로 사용할 수 있습니다 . 자세한 내용 은 내 블로그를 참조하십시오 .
마지막 세 가지 대안은 동기 및 비동기 풋 앤 테이크를 허용합니다.
답변
… 또는 다음과 같이 할 수 있습니다.
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class AsyncQueue<T>
{
private readonly SemaphoreSlim _sem;
private readonly ConcurrentQueue<T> _que;
public AsyncQueue()
{
_sem = new SemaphoreSlim(0);
_que = new ConcurrentQueue<T>();
}
public void Enqueue(T item)
{
_que.Enqueue(item);
_sem.Release();
}
public void EnqueueRange(IEnumerable<T> source)
{
var n = 0;
foreach (var item in source)
{
_que.Enqueue(item);
n++;
}
_sem.Release(n);
}
public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
{
for (; ; )
{
await _sem.WaitAsync(cancellationToken);
T item;
if (_que.TryDequeue(out item))
{
return item;
}
}
}
}
간단하고 완벽하게 작동하는 비동기식 FIFO 대기열.
참고 :
SemaphoreSlim.WaitAsync
그 전에 .NET 4.5에 추가되었는데, 이것이 그렇게 간단하지는 않았습니다.
답변
다음은 BlockingCollection
많은 누락 된 기능과 함께 대기를 지원 하는의 매우 기본적인 구현입니다 . AsyncEnumerable
8.0 이전의 C # 버전에 대해 비동기 열거를 가능하게 하는 라이브러리를 사용합니다 .
public class AsyncBlockingCollection<T>
{ // Missing features: cancellation, boundedCapacity, TakeAsync
private Queue<T> _queue = new Queue<T>();
private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
private int _consumersCount = 0;
private bool _isAddingCompleted;
public void Add(T item)
{
lock (_queue)
{
if (_isAddingCompleted) throw new InvalidOperationException();
_queue.Enqueue(item);
}
_semaphore.Release();
}
public void CompleteAdding()
{
lock (_queue)
{
if (_isAddingCompleted) return;
_isAddingCompleted = true;
if (_consumersCount > 0) _semaphore.Release(_consumersCount);
}
}
public IAsyncEnumerable<T> GetConsumingEnumerable()
{
lock (_queue) _consumersCount++;
return new AsyncEnumerable<T>(async yield =>
{
while (true)
{
lock (_queue)
{
if (_queue.Count == 0 && _isAddingCompleted) break;
}
await _semaphore.WaitAsync();
bool hasItem;
T item = default;
lock (_queue)
{
hasItem = _queue.Count > 0;
if (hasItem) item = _queue.Dequeue();
}
if (hasItem) await yield.ReturnAsync(item);
}
});
}
}
사용 예 :
var abc = new AsyncBlockingCollection<int>();
var producer = Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(100);
abc.Add(i);
}
abc.CompleteAdding();
});
var consumer = Task.Run(async () =>
{
await abc.GetConsumingEnumerable().ForEachAsync(async item =>
{
await Task.Delay(200);
await Console.Out.WriteAsync(item + " ");
});
});
await Task.WhenAll(producer, consumer);
산출:
12 34 5678 9 10
업데이트 : C # 8이 출시되면서 비동기 열거 가 기본 제공 언어 기능이되었습니다. 필수 클래스 ( IAsyncEnumerable
, IAsyncEnumerator
)는 .NET Core 3.0에 포함되며 .NET Framework 4.6.1+ ( Microsoft.Bcl.AsyncInterfaces ) 용 패키지로 제공됩니다 .
다음은 GetConsumingEnumerable
새로운 C # 8 구문을 특징으로 하는 대체 구현입니다.
public async IAsyncEnumerable<T> GetConsumingEnumerable()
{
lock (_queue) _consumersCount++;
while (true)
{
lock (_queue)
{
if (_queue.Count == 0 && _isAddingCompleted) break;
}
await _semaphore.WaitAsync();
bool hasItem;
T item = default;
lock (_queue)
{
hasItem = _queue.Count > 0;
if (hasItem) item = _queue.Dequeue();
}
if (hasItem) yield return item;
}
}
동일한 방법 으로 await
및 의 공존에 유의하십시오 yield
.
사용 예 (C # 8) :
var consumer = Task.Run(async () =>
{
await foreach (var item in abc.GetConsumingEnumerable())
{
await Task.Delay(200);
await Console.Out.WriteAsync(item + " ");
}
});
메모 await
전과를 foreach
.
답변
약간의 해킹이 괜찮다면이 확장 프로그램을 사용해 볼 수 있습니다.
public static async Task AddAsync<TEntity>(
this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt)
{
while (true)
{
try
{
if (Bc.TryAdd(item, 0, abortCt))
return;
else
await Task.Delay(100, abortCt);
}
catch (Exception)
{
throw;
}
}
}
public static async Task<TEntity> TakeAsync<TEntity>(
this BlockingCollection<TEntity> Bc, CancellationToken abortCt)
{
while (true)
{
try
{
TEntity item;
if (Bc.TryTake(out item, 0, abortCt))
return item;
else
await Task.Delay(100, abortCt);
}
catch (Exception)
{
throw;
}
}
}