[C#] .NET에서 블로킹 큐 <T>를 작성 하시겠습니까?

대기열에 여러 스레드를 추가하고 동일한 대기열에서 여러 스레드를 읽는 시나리오가 있습니다. 대기열이 특정 크기에 도달 하면 대기열에서 항목을 제거 할 때까지 대기열을 채우는 모든 스레드 가 추가시 차단됩니다.

아래 해결책은 현재 사용중인 것이며 내 질문은 : 어떻게 개선 할 수 있습니까? 사용해야하는 BCL에서이 동작을 이미 활성화 한 개체가 있습니까?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}



답변

그것은 매우 안전하지 않은 것처럼 보입니다 (매우 작은 동기화). 어떻습니까 :

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(편집하다)

실제로, 독자는 부울 플래그와 같은 독자가 깨끗하게 종료하기 시작하여 대기열이 막히지 않고 빈 대기열이 반환되도록 대기열을 닫는 방법을 원합니다.

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}


답변

.net 4 BlockingCollection을 사용하여 Add ()를 대기열에 넣고 Take ()를 대기열에 넣습니다. 내부적으로 비 차단 ConcurrentQueue를 사용합니다. 자세한 내용은 여기를 클릭하십시오. 최고 및 빠른 생산자 / 소비자 대기열 기술 BlockingCollection vs 동시 대기열


답변

“어떻게 개선 할 수 있습니까?”

글쎄, 클래스의 모든 메소드를 살펴보고 다른 스레드가 해당 메소드 또는 다른 메소드를 동시에 호출하면 어떻게 될지 고려해야합니다. 예를 들어, Add 메서드가 아닌 Remove 메서드에는 잠금을 설정합니다. 한 스레드가 다른 스레드 제거와 동시에 추가되면 어떻게됩니까? 나쁜 것들.

또한 메소드가 첫 번째 오브젝트의 내부 데이터 (예 : GetEnumerator)에 대한 액세스를 제공하는 두 번째 오브젝트를 리턴 할 수 있음을 고려하십시오. 한 스레드가 해당 열거자를 통과하고 다른 스레드가 목록을 동시에 수정한다고 가정하십시오. 안좋다.

경험상 가장 좋은 방법은 클래스의 메소드 수를 절대 최소값으로 줄여서 더 간단하게 만드는 것입니다.

특히 다른 컨테이너 클래스를 상속하지 마십시오. 클래스의 모든 메소드를 노출시켜 호출자가 내부 데이터를 손상 시키거나 데이터가 부분적으로 완전히 변경된 것을 볼 수있는 방법을 제공하기 때문에 해당 시점에 손상된 것으로 표시됨). 모든 세부 사항을 숨기고 액세스를 허용하는 방법에 대해 완전히 무자비하십시오.

기성품 솔루션을 사용하는 것이 좋습니다. 스레딩에 대한 책을 얻거나 타사 라이브러리를 사용하십시오. 그렇지 않으면 시도중인 것을 감안할 때 오랫동안 코드를 디버깅 할 것입니다.

또한 Remove가 호출자가 특정 항목을 선택하는 대신 항목 (예 : 대기열이므로 처음에 추가 된 항목)을 반환하는 것이 더 합리적이지 않습니까? 대기열이 비어 있으면 Remove도 차단해야합니다.

업데이트 : Marc의 답변은 실제로 이러한 모든 제안을 구현합니다! 🙂 그러나 그의 버전이 왜 그렇게 개선되었는지 이해하는 것이 도움이 될 수 있으므로 여기에 남겨 두겠습니다.


답변

System.Collections.Concurrent 네임 스페이스에서 BlockingCollectionConcurrentQueue 를 사용할 수 있습니다.

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}


답변

방금 Reactive Extensions를 사용 하여이 문제를 해결 했으며이 질문을 기억했습니다.

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

반드시 완전히 안전 할 필요는 없지만 매우 간단합니다.


답변

이것이 스레드 안전 경계 블로킹 대기열을 선택했습니다.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}


답변

나는 TPL을 완전히 탐구하지 않았다 하지는 않았지만, 그들은 당신의 요구에 맞는 것이거나 최소한 영감을 얻기 위해 어떤 리플렉터 사료를 가질 수도 있습니다.

희망이 도움이됩니다.