메트로 앱에서는 여러 WCF 호출을 실행해야합니다. 많은 호출이 이루어져야하므로 병렬 루프에서 호출해야합니다. 문제는 WCF 호출이 모두 완료되기 전에 병렬 루프가 종료된다는 것입니다.
예상대로 작동하도록 이것을 리팩토링 하시겠습니까?
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>();
Parallel.ForEach(ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
foreach ( var customer in customers )
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
답변
배후의 전체 아이디어 Parallel.ForEach()
는 스레드 세트가 있고 각 스레드가 콜렉션의 일부를 처리한다는 것입니다. 알다시피, 이것은 비동기 호출 중에 스레드를 해제하려는 async
– 와 함께 작동하지 않습니다 await
.
ForEach()
쓰레드 를 막아서“수정”할 수는 있지만 async
– 의 전체 요점을 무너 뜨 await
립니다.
할 수있는 일은 대신 비동기식을 지원 하는 TPL Dataflow 를 사용 Parallel.ForEach()
하는 Task
것입니다.
특히, TransformBlock
각 ID를 Customer
사용하는 async
람다 로 변환 하는 을 사용하여 코드를 작성할 수 있습니다 . 이 블록은 병렬로 실행되도록 구성 할 수 있습니다. 해당 블록을 콘솔에 ActionBlock
쓰는 블록에 연결합니다 Customer
. 블록 네트워크를 설정 한 후 Post()
각 ID를에 연결할 수 있습니다 TransformBlock
.
코드에서 :
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
비록 당신은 아마도 TransformBlock
약간의 상수로 의 병렬성을 제한하고 싶을 것입니다 . 또한 예를 들어 컬렉션이 너무 큰 TransformBlock
경우을 사용하여 의 용량을 제한하고 항목을 비동기식으로 추가 할 수 있습니다 SendAsync()
.
코드와 비교할 때 추가 이점으로 (작동 한 경우) 단일 항목이 완료되는 즉시 쓰기가 시작되고 모든 처리가 완료 될 때까지 기다리지 않는다는 것입니다.
답변
svick의 답변 은 (평소대로) 훌륭합니다.
그러나 실제로 대량의 데이터를 전송할 때 Dataflow가 더 유용하다는 것을 알았습니다. 또는 async
호환 가능한 대기열 이 필요할 때 .
귀하의 경우 더 간단한 해결책은 async
-style 병렬 처리를 사용하는 것입니다 .
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customerTasks = ids.Select(i =>
{
ICustomerRepo repo = new CustomerRepo();
return repo.GetCustomer(i);
});
var customers = await Task.WhenAll(customerTasks);
foreach (var customer in customers)
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
답변
svick이 제안한대로 DataFlow를 사용하는 것은 과도 할 수 있으며 Stephen의 답변은 작업의 동시성을 제어하는 수단을 제공하지 않습니다. 그러나 그것은 간단하게 달성 할 수 있습니다.
public static async Task RunWithMaxDegreeOfConcurrency<T>(
int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
var activeTasks = new List<Task>(maxDegreeOfConcurrency);
foreach (var task in collection.Select(taskFactory))
{
activeTasks.Add(task);
if (activeTasks.Count == maxDegreeOfConcurrency)
{
await Task.WhenAny(activeTasks.ToArray());
//observe exceptions here
activeTasks.RemoveAll(t => t.IsCompleted);
}
}
await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
{
//observe exceptions in a manner consistent with the above
});
}
ToArray()
호출은 배열 대신 목록을 사용하여 완료된 작업을 대체하여 최적화 할 수 있습니다,하지만 난 그것을 훨씬 대부분의 경우에서 차이 만들 것이라고 의심한다. OP 질문에 따른 샘플 사용량 :
RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
편집 Fellow SO 사용자와 TPL wiz Eli Arbel 은 Stephen Toub 의 관련 기사를 알려주었습니다 . 평소와 같이, 그의 구현은 우아하고 효율적입니다.
public static Task ForEachAsync<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
});
}));
}
답변
질문이 처음 게시되었을 때 4 년 전에 존재하지 않았던 새로운 AsyncEnumerator NuGet 패키지를 사용하면 노력을 절약 할 수 있습니다 . 병렬 처리 수준을 제어 할 수 있습니다.
using System.Collections.Async;
...
await ids.ParallelForEachAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
},
maxDegreeOfParallelism: 10);
면책 조항 : 저는 공개 소스이며 MIT에 따라 라이센스가 부여 된 AsyncEnumerator 라이브러리의 저자이며 커뮤니티를 돕기 위해이 메시지를 게시하고 있습니다.
답변
랩 Parallel.Foreach
로 Task.Run()
대신의 await
키워드 사용[yourasyncmethod].Result
(UI 스레드를 차단하지 않으려면 Task.Run 작업을 수행해야합니다)
이 같은:
var yourForeachTask = Task.Run(() =>
{
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = repo.GetCustomer(i).Result;
customers.Add(cust);
});
});
await yourForeachTask;
답변
이것은 전체 TPL Dataflow를 작동시키는 것보다 매우 효율적이고 쉬워야합니다.
var customers = await ids.SelectAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
});
...
public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
var results = new List<TResult>();
var activeTasks = new HashSet<Task<TResult>>();
foreach (var item in source)
{
activeTasks.Add(selector(item));
if (activeTasks.Count >= maxDegreesOfParallelism)
{
var completed = await Task.WhenAny(activeTasks);
activeTasks.Remove(completed);
results.Add(completed.Result);
}
}
results.AddRange(await Task.WhenAll(activeTasks));
return results;
}
답변
나는 파티에 조금 늦었지만 동기화 컨텍스트에서 비동기 코드를 실행하기 위해 GetAwaiter.GetResult () 사용을 고려할 수도 있지만 아래와 같이 병렬화됩니다.
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
// Run this in thread which Parallel library occupied.
var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
customers.Add(cust);
});