[C#] 병렬로 중첩이 대기합니다.

메트로 앱에서는 여러 WCF 호출을 실행해야합니다. 많은 호출이 이루어져야하므로 병렬 루프에서 호출해야합니다. 문제는 WCF 호출이 모두 완료되기 전에 병렬 루프가 종료된다는 것입니다.

예상대로 작동하도록 이것을 리팩토링 하시겠습니까?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();



답변

배후의 전체 아이디어 Parallel.ForEach()는 스레드 세트가 있고 각 스레드가 콜렉션의 일부를 처리한다는 것입니다. 알다시피, 이것은 비동기 호출 중에 스레드를 해제하려는 async– 와 함께 작동하지 않습니다 await.

ForEach()쓰레드 를 막아서“수정”할 수는 있지만 async– 의 전체 요점을 무너 뜨 await립니다.

할 수있는 일은 대신 비동기식을 지원 하는 TPL Dataflow 를 사용 Parallel.ForEach()하는 Task것입니다.

특히, TransformBlock각 ID를 Customer사용하는 async람다 로 변환 하는 을 사용하여 코드를 작성할 수 있습니다 . 이 블록은 병렬로 실행되도록 구성 할 수 있습니다. 해당 블록을 콘솔에 ActionBlock쓰는 블록에 연결합니다 Customer. 블록 네트워크를 설정 한 후 Post()각 ID를에 연결할 수 있습니다 TransformBlock.

코드에서 :

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

비록 당신은 아마도 TransformBlock약간의 상수로 의 병렬성을 제한하고 싶을 것입니다 . 또한 예를 들어 컬렉션이 너무 큰 TransformBlock경우을 사용하여 의 용량을 제한하고 항목을 비동기식으로 추가 할 수 있습니다 SendAsync().

코드와 비교할 때 추가 이점으로 (작동 한 경우) 단일 항목이 완료되는 즉시 쓰기가 시작되고 모든 처리가 완료 될 때까지 기다리지 않는다는 것입니다.


답변

svick의 답변 은 (평소대로) 훌륭합니다.

그러나 실제로 대량의 데이터를 전송할 때 Dataflow가 더 유용하다는 것을 알았습니다. 또는 async호환 가능한 대기열 이 필요할 때 .

귀하의 경우 더 간단한 해결책은 async-style 병렬 처리를 사용하는 것입니다 .

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();


답변

svick이 제안한대로 DataFlow를 사용하는 것은 과도 할 수 있으며 Stephen의 답변은 작업의 동시성을 제어하는 ​​수단을 제공하지 않습니다. 그러나 그것은 간단하게 달성 할 수 있습니다.

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted);
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
    {
        //observe exceptions in a manner consistent with the above   
    });
}

ToArray()호출은 배열 대신 목록을 사용하여 완료된 작업을 대체하여 최적화 할 수 있습니다,하지만 난 그것을 훨씬 대부분의 경우에서 차이 만들 것이라고 의심한다. OP 질문에 따른 샘플 사용량 :

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

편집 Fellow SO 사용자와 TPL wiz Eli ArbelStephen Toub관련 기사를 알려주었습니다 . 평소와 같이, 그의 구현은 우아하고 효율적입니다.

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current).ContinueWith(t =>
                          {
                              //observe exceptions
                          });

        }));
}


답변

질문이 처음 게시되었을 때 4 년 전에 존재하지 않았던 새로운 AsyncEnumerator NuGet 패키지를 사용하면 노력을 절약 할 수 있습니다 . 병렬 처리 수준을 제어 할 수 있습니다.

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

면책 조항 : 저는 공개 소스이며 MIT에 따라 라이센스가 부여 된 AsyncEnumerator 라이브러리의 저자이며 커뮤니티를 돕기 위해이 메시지를 게시하고 있습니다.


답변

Parallel.ForeachTask.Run()대신의 await키워드 사용[yourasyncmethod].Result

(UI 스레드를 차단하지 않으려면 Task.Run 작업을 수행해야합니다)

이 같은:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;


답변

이것은 전체 TPL Dataflow를 작동시키는 것보다 매우 효율적이고 쉬워야합니다.

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}


답변

나는 파티에 조금 늦었지만 동기화 컨텍스트에서 비동기 코드를 실행하기 위해 GetAwaiter.GetResult () 사용을 고려할 수도 있지만 아래와 같이 병렬화됩니다.

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});