컬렉션을 병렬로 처리하고 싶지만 구현하는 데 문제가있어서 도움이 필요합니다.
병렬 루프의 람다 내에서 C #에서 비동기로 표시된 메소드를 호출하려는 경우 문제가 발생합니다. 예를 들면 다음과 같습니다.
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
생성 된 모든 스레드가 사실상 백그라운드 스레드이고 Parallel.ForEach
호출이 완료를 기다리지 않기 때문에 카운트가 0 인 문제가 발생합니다 . async 키워드를 제거하면 메소드는 다음과 같습니다.
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
작동하지만 대기 영리를 완전히 비활성화하고 수동 예외 처리를 수행해야합니다. (간결하게 제거).
Parallel.ForEach
람다 내에서 await 키워드를 사용 하는 루프를 어떻게 구현할 수 있습니까? 가능합니까?
Parallel.ForEach 메서드의 프로토 타입은 Action<T>
매개 변수로 사용되지만 비동기 람다를 기다립니다.
답변
단순한 병렬 처리를 원한다면 다음과 같이하십시오.
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
더 복잡한 것이 필요하다면 Stephen Toub의 ForEachAsync
게시물을 확인하십시오 .
답변
AsyncEnumerator NuGet 패키지 의 ParallelForEachAsync
확장 메소드를 사용할 수 있습니다 .
using Dasync.Collections;
var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
답변
SemaphoreSlim
당신 과 함께 병렬 처리를 달성 할 수 있습니다.
var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
try
{
await throttler.WaitAsync();
var response = await GetData(item);
bag.Add(response);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
var count = bag.Count;
답변
ParallelForEach 비동기의 경량 구현.
풍모:
- 조절 (최대 병렬 처리 수준).
- 예외 처리 (완료시 집계 예외가 발생 함)
- 메모리 효율적 (작업 목록을 저장할 필요가 없음)
public static class AsyncEx
{
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
{
var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
var tcs = new TaskCompletionSource<object>();
var exceptions = new ConcurrentBag<Exception>();
bool addingCompleted = false;
foreach (T item in source)
{
await semaphoreSlim.WaitAsync();
asyncAction(item).ContinueWith(t =>
{
semaphoreSlim.Release();
if (t.Exception != null)
{
exceptions.Add(t.Exception);
}
if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
{
tcs.SetResult(null);
}
});
}
Volatile.Write(ref addingCompleted, true);
await tcs.Task;
if (exceptions.Count > 0)
{
throw new AggregateException(exceptions);
}
}
}
사용 예 :
await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
var data = await GetData(i);
}, maxDegreeOfParallelism: 100);
답변
SemaphoreSlim을 사용하고 최대 병렬 처리 수준을 설정할 수있는 확장 방법을 만들었습니다.
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
샘플 사용법 :
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);