[c#] 비동기 BlockingCollection <T>과 같은 것이 있습니까?

나는 싶습니다 await의 결과에 BlockingCollection<T>.Take()비동기, 그래서 스레드를 차단하지 않습니다. 다음과 같은 것을 찾고 있습니다.

var item = await blockingCollection.TakeAsync();

나는 이것을 할 수 있다는 것을 안다.

var item = await Task.Run(() => blockingCollection.Take());

그러나 그것은 다른 스레드 (의 ThreadPool)가 대신 차단 되기 때문에 전체 아이디어를 죽 입니다.

대안이 있습니까?



답변

내가 아는 네 가지 대안이 있습니다.

첫 번째는 비동기 및 작업 을 지원하는 스레드 세이프 대기열을 제공하는 Channels 입니다. 채널은 고도로 최적화되어 있으며 임계 값에 도달하면 일부 항목을 삭제할 수 있습니다.ReadWrite

다음은 TPL DataflowBufferBlock<T> 에서 가져온 것 입니다. 당신은 단지 하나의 소비자가있는 경우 사용 하거나 , 또는 단지는 링크 . 자세한 내용 은 내 블로그를 참조하십시오 .OutputAvailableAsyncReceiveAsyncActionBlock<T>

마지막 두 가지는 내가 만든 유형이며 AsyncEx 라이브러리 에서 사용할 수 있습니다 .

AsyncCollection<T>async거의 동일 BlockingCollection<T>하며 ConcurrentQueue<T>또는 같은 동시 생산자 / 소비자 컬렉션을 래핑 할 수 ConcurrentBag<T>있습니다. TakeAsync컬렉션의 항목을 비동기 적으로 소비 하는 데 사용할 수 있습니다 . 자세한 내용 은 내 블로그를 참조하십시오 .

AsyncProducerConsumerQueue<T>더 이식 가능하고 async호환되는 생산자 / 소비자 대기열입니다. 를 사용 DequeueAsync하여 큐에서 항목을 비동기 적으로 사용할 수 있습니다 . 자세한 내용 은 내 블로그를 참조하십시오 .

마지막 세 가지 대안은 동기 및 비동기 풋 앤 테이크를 허용합니다.


답변

… 또는 다음과 같이 할 수 있습니다.

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class AsyncQueue<T>
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentQueue<T> _que;

    public AsyncQueue()
    {
        _sem = new SemaphoreSlim(0);
        _que = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        _que.Enqueue(item);
        _sem.Release();
    }

    public void EnqueueRange(IEnumerable<T> source)
    {
        var n = 0;
        foreach (var item in source)
        {
            _que.Enqueue(item);
            n++;
        }
        _sem.Release(n);
    }

    public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
    {
        for (; ; )
        {
            await _sem.WaitAsync(cancellationToken);

            T item;
            if (_que.TryDequeue(out item))
            {
                return item;
            }
        }
    }
}

간단하고 완벽하게 작동하는 비동기식 FIFO 대기열.

참고 : SemaphoreSlim.WaitAsync그 전에 .NET 4.5에 추가되었는데, 이것이 그렇게 간단하지는 않았습니다.


답변

다음은 BlockingCollection많은 누락 된 기능과 함께 대기를 지원 하는의 매우 기본적인 구현입니다 . AsyncEnumerable8.0 이전의 C # 버전에 대해 비동기 열거를 가능하게 하는 라이브러리를 사용합니다 .

public class AsyncBlockingCollection<T>
{ // Missing features: cancellation, boundedCapacity, TakeAsync
    private Queue<T> _queue = new Queue<T>();
    private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
    private int _consumersCount = 0;
    private bool _isAddingCompleted;

    public void Add(T item)
    {
        lock (_queue)
        {
            if (_isAddingCompleted) throw new InvalidOperationException();
            _queue.Enqueue(item);
        }
        _semaphore.Release();
    }

    public void CompleteAdding()
    {
        lock (_queue)
        {
            if (_isAddingCompleted) return;
            _isAddingCompleted = true;
            if (_consumersCount > 0) _semaphore.Release(_consumersCount);
        }
    }

    public IAsyncEnumerable<T> GetConsumingEnumerable()
    {
        lock (_queue) _consumersCount++;
        return new AsyncEnumerable<T>(async yield =>
        {
            while (true)
            {
                lock (_queue)
                {
                    if (_queue.Count == 0 && _isAddingCompleted) break;
                }
                await _semaphore.WaitAsync();
                bool hasItem;
                T item = default;
                lock (_queue)
                {
                    hasItem = _queue.Count > 0;
                    if (hasItem) item = _queue.Dequeue();
                }
                if (hasItem) await yield.ReturnAsync(item);
            }
        });
    }
}

사용 예 :

var abc = new AsyncBlockingCollection<int>();
var producer = Task.Run(async () =>
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(100);
        abc.Add(i);
    }
    abc.CompleteAdding();
});
var consumer = Task.Run(async () =>
{
    await abc.GetConsumingEnumerable().ForEachAsync(async item =>
    {
        await Task.Delay(200);
        await Console.Out.WriteAsync(item + " ");
    });
});
await Task.WhenAll(producer, consumer);

산출:

12 34 5678 9 10


업데이트 : C # 8이 출시되면서 비동기 열거 가 기본 제공 언어 기능이되었습니다. 필수 클래스 ( IAsyncEnumerable, IAsyncEnumerator)는 .NET Core 3.0에 포함되며 .NET Framework 4.6.1+ ( Microsoft.Bcl.AsyncInterfaces ) 용 패키지로 제공됩니다 .

다음은 GetConsumingEnumerable새로운 C # 8 구문을 특징으로 하는 대체 구현입니다.

public async IAsyncEnumerable<T> GetConsumingEnumerable()
{
    lock (_queue) _consumersCount++;
    while (true)
    {
        lock (_queue)
        {
            if (_queue.Count == 0 && _isAddingCompleted) break;
        }
        await _semaphore.WaitAsync();
        bool hasItem;
        T item = default;
        lock (_queue)
        {
            hasItem = _queue.Count > 0;
            if (hasItem) item = _queue.Dequeue();
        }
        if (hasItem) yield return item;
    }
}

동일한 방법 으로 await및 의 공존에 유의하십시오 yield.

사용 예 (C # 8) :

var consumer = Task.Run(async () =>
{
    await foreach (var item in abc.GetConsumingEnumerable())
    {
        await Task.Delay(200);
        await Console.Out.WriteAsync(item + " ");
    }
});

메모 await전과를 foreach.


답변

약간의 해킹이 괜찮다면이 확장 프로그램을 사용해 볼 수 있습니다.

public static async Task AddAsync<TEntity>(
    this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt)
{
    while (true)
    {
        try
        {
            if (Bc.TryAdd(item, 0, abortCt))
                return;
            else
                await Task.Delay(100, abortCt);
        }
        catch (Exception)
        {
            throw;
        }
    }
}

public static async Task<TEntity> TakeAsync<TEntity>(
    this BlockingCollection<TEntity> Bc, CancellationToken abortCt)
{
    while (true)
    {
        try
        {
            TEntity item;

            if (Bc.TryTake(out item, 0, abortCt))
                return item;
            else
                await Task.Delay(100, abortCt);
        }
        catch (Exception)
        {
            throw;
        }
    }
}


답변