[c#] 동시 비동기 I / O 작업의 양을 제한하는 방법은 무엇입니까?

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

여기에 문제가 있습니다. 1000 개 이상의 동시 웹 요청을 시작합니다. 이러한 비동기 http 요청의 동시 양을 제한하는 쉬운 방법이 있습니까? 따라서 한 번에 20 개 이상의 웹 페이지가 다운로드되지 않도록합니다. 가장 효율적인 방법으로 수행하는 방법은 무엇입니까?



답변

.NET 4.5 베타를 사용하여 .NET 용 비동기 최신 버전에서 확실히이 작업을 수행 할 수 있습니다. ‘usr’의 이전 게시물은 Stephen Toub가 작성한 좋은 기사를 가리 키지 만 덜 알려진 소식은 비동기 세마포어가 실제로 .NET 4.5의 베타 릴리스에 포함되었다는 것입니다.

우리가 사랑하는 SemaphoreSlim클래스 (원래보다 성능이 뛰어 나기 때문에 사용해야 함)를 살펴보면 Semaphore, 이제 WaitAsync(...)예상되는 모든 인수 (시간 초과 간격, 취소 토큰, 모든 일반적인 스케줄링 친구)와 함께 일련의 오버로드를 자랑합니다 . )

Stephen은 또한 베타와 함께 출시 된 새로운 .NET 4.5 기능에 대한 최신 블로그 게시물을 작성했습니다. What ‘s New for Parallelism in .NET 4.5 Beta를 참조하십시오 .

마지막으로, 비동기 메서드 조절에 SemaphoreSlim을 사용하는 방법에 대한 몇 가지 샘플 코드가 있습니다.

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

마지막으로 TPL 기반 스케줄링을 사용하는 솔루션을 언급 할 가치가있을 것입니다. 아직 시작되지 않은 TPL에서 위임 바인딩 작업을 생성하고 사용자 지정 작업 스케줄러가 동시성을 제한하도록 허용 할 수 있습니다. 실제로 여기에 MSDN 샘플이 있습니다.

TaskScheduler를 참조하십시오 .


답변

IEnumerable (즉, URL 문자열)이 있고 이들 각각에 대해 I / O 바운드 작업을 동시에 수행하고 (즉, 비동기 http 요청 만들기) 선택적으로 최대 동시 수를 설정하려는 경우 실시간 I / O 요청을 수행하는 방법은 다음과 같습니다. 이렇게하면 스레드 풀 등을 사용하지 않고이 메서드는 세마포어 슬림을 사용하여 하나의 요청이 완료되고 세마포어를 떠나고 다음 요청이 들어오는 슬라이딩 창 패턴과 유사한 최대 동시 I / O 요청을 제어합니다.

사용법 : await ForEachAsync (urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);

public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }


답변

불행히도 .NET Framework에는 병렬 비동기 작업을 조정하기위한 가장 중요한 결합자가 없습니다. 그런 것은 내장되어 있지 않습니다.

가장 존경받는 Stephen Toub가 만든 AsyncSemaphore 클래스를 살펴보십시오 . 원하는 것은 세마포어라고하며 비동기 버전이 필요합니다.


답변

많은 함정이 있으며 오류의 경우 세마포어를 직접 사용하는 것이 까다로울 수 있으므로 바퀴를 다시 발명하는 대신 AsyncEnumerator NuGet 패키지 를 사용하는 것이 좋습니다 .

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);


답변

Theo Yaung 예제는 좋지만 대기 작업 목록이없는 변형이 있습니다.

 class SomeChecker
 {
    private const int ThreadCount=20;
    private CountdownEvent _countdownEvent;
    private SemaphoreSlim _throttler;

    public Task Check(IList<string> urls)
    {
        _countdownEvent = new CountdownEvent(urls.Count);
        _throttler = new SemaphoreSlim(ThreadCount);

        return Task.Run( // prevent UI thread lock
            async  () =>{
                foreach (var url in urls)
                {
                    // do an async wait until we can schedule again
                    await _throttler.WaitAsync();
                    ProccessUrl(url); // NOT await
                }
                //instead of await Task.WhenAll(allTasks);
                _countdownEvent.Wait();
            });
    }

    private async Task ProccessUrl(string url)
    {
        try
        {
            var page = await new WebClient()
                       .DownloadStringTaskAsync(new Uri(url));
            ProccessResult(page);
        }
        finally
        {
            _throttler.Release();
            _countdownEvent.Signal();
        }
    }

    private void ProccessResult(string page){/*....*/}
}


답변

SemaphoreSlim은 여기에서 매우 유용 할 수 있습니다. 내가 만든 확장 방법은 다음과 같습니다.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxActionsToRunInParallel = null)
    {
        if (maxActionsToRunInParallel.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all of the provided tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

샘플 사용법 :

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);


답변

오래된 질문, 새로운 답변. @vitidev에는 내가 검토 한 프로젝트에서 거의 그대로 재사용 된 코드 블록이 있습니다. 몇몇 동료들과 논의한 후 “내장 된 TPL 방법을 사용하지 않는 이유는 무엇입니까?” ActionBlock이 승자처럼 보입니다. https://msdn.microsoft.com/en-us/library/hh194773(v=vs.110).aspx . 아마도 기존 코드를 변경하지 않을 것이지만 확실히이 너겟을 채택하고 조절 된 병렬 처리에 대해 Mr. Softy의 모범 사례를 재사용 할 것입니다.