[c#] 태스크에서 동기식 연속을 방지하려면 어떻게해야합니까?

Task.NET을 기반으로 요청에 대한 보류중인 응답에 대한 기반 API를 제공하는 라이브러리 (소켓 네트워킹) 코드가 TaskCompletionSource<T>있습니다. 그러나 동기식 연속을 방지하는 것이 불가능 해 보인다는 점에서 TPL에는 성가심이 있습니다. 내가 것 처럼 할 수 있도록하는 중입니다 :

  • TaskCompletionSource<T>발신자가을 (를) 첨부하는 것을 허용해서는 안된다고 알리 TaskContinuationOptions.ExecuteSynchronously거나
  • 대신 풀을 사용하여 무시해야 함 을 지정하는 방식으로 결과 ( SetResult/ TrySetResult)를 설정합니다.TaskContinuationOptions.ExecuteSynchronously

특히, 내가 가진 문제는 들어오는 데이터가 전용 리더에 의해 처리되고 있으며 호출자가 첨부 TaskContinuationOptions.ExecuteSynchronously할 수있는 경우 리더를 지연시킬 수 있다는 것입니다 (단지 그 이상에 영향을 미침). 이전에, 내가 여부를 감지 몇 가지 해커에 의해이 문제를 해결 일한 어떤 연속성을가 존재하고, 그들이하는 경우는 위에 완료를 밀어 ThreadPool완료 처리되지되므로 발신자가 자신의 작업 대기열을 포화 경우, 그러나이 상당한 영향을 적시에. 그들이 Task.Wait()(또는 유사한) 사용하고 있다면 본질적으로 교착 상태가 될 것입니다. 마찬가지로 독자가 워커를 사용하는 대신 전용 스레드에있는 이유입니다.

그래서; TPL 팀에 잔소리를하기 전에 옵션이 없나요?

키 포인트:

  • 외부 발신자가 내 대화 목록을 탈취하는 것을 원하지 않습니다.
  • ThreadPool풀이 포화 상태 일 때 작동해야하므로 구현으로 사용할 수 없습니다.

아래 예는 출력을 생성합니다 (순서는 타이밍에 따라 다를 수 있음).

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

문제는 임의의 호출자가 “메인 스레드”에서 계속 될 수 있다는 사실입니다. 실제 코드에서 이것은 기본 판독기를 방해합니다. 나쁜 것들!

암호:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}



답변

.NET 4.6의 새로운 기능 :

.NET 4.6는 새로운 포함되어 있습니다 TaskCreationOptions: RunContinuationsAsynchronously.


Reflection을 사용하여 비공개 필드에 액세스 할 의향이 있기 때문에 …

TCS의 태스크를 TASK_STATE_THREAD_WAS_ABORTED플래그로 표시하면 모든 연속이 인라인되지 않습니다.

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

편집하다:

Reflection emit를 사용하는 대신 표현식을 사용하는 것이 좋습니다. 이것은 훨씬 더 읽기 쉽고 PCL과 호환된다는 장점이 있습니다.

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

Reflection을 사용하지 않고 :

관심이 있다면 리플렉션없이이 작업을 수행 할 수있는 방법을 찾았지만 약간 “더럽다”며 물론 무시할 수없는 성능 페널티가 부과됩니다.

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}


답변

연속에 대한 명시적인 API 제어를 제공하는 TPL에는 아무것도 없다고 생각합니다 TaskCompletionSource.SetResult. 이 동작을 제어 하기 위해 초기 답변 을 유지하기로 결정했습니다.async/await시나리오 .

다음은 호출 된 동일한 스레드에서 트리거 된 연속이 발생하는 ContinueWith경우에 비동기를 부과하는 또 다른 솔루션입니다 .tcs.SetResultSetResult

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

댓글을 해결하도록 업데이트되었습니다.

나는 발신자를 통제하지 않는다-그들에게 특정한 continue-with 변형을 사용하도록 할 수 없다 : 가능하다면, 문제는 처음에 존재하지 않을 것이다.

나는 당신이 발신자를 통제하지 않는다는 것을 몰랐습니다. 그럼에도 불구하고, 당신이 그것을 제어하지 않는다면, 당신은 아마도 TaskCompletionSource객체 를 호출자에게 직접 전달하지 않을 것입니다 . 논리적으로, 당신 은 그것 의 토큰 부분, 즉 tcs.Task. 이 경우 위의 다른 확장 메서드를 추가하면 솔루션이 더 쉬울 수 있습니다.

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t,
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

사용하다:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

이것은 실제로 ( 바이올린 ) 모두에서 작동하며awaitContinueWith 반사 해킹이 없습니다.


답변

하는 대신 어떨까요

var task = source.Task;

대신 이것을한다

var task = source.Task.ContinueWith<Int32>( x => x.Result );

따라서 항상 비동기 적으로 실행되는 하나의 연속을 추가하고 구독자가 동일한 컨텍스트에서 연속을 원하는지 여부는 중요하지 않습니다. 그것은 일을 카레 링하는 것입니다.


답변

리플렉션을 사용할 수 있고 사용할 준비가 되었으면이 작업을 수행해야합니다.

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }
}


답변

업데이트 , 내가 게시 된 별도의 답변을 다루는 ContinueWith반대를 await(때문에 ContinueWith현재 동기화 컨텍스트에 대해 상관하지 않는다).

당신은 계속 호출에 의해 트리거에 비동기를 부과하는 바보 동기화 컨텍스트를 사용할 수 SetResult/SetCancelled/SetExceptionTaskCompletionSource. 현재 동기화 컨텍스트 (시점 await tcs.Task)는 TPL이 이러한 연속을 동기 또는 비동기로 만들지 여부를 결정하는 데 사용하는 기준 이라고 생각합니다 .

다음은 나를 위해 작동합니다.

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync 다음과 같이 구현됩니다.

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext 추가되는 오버 헤드 측면에서 매우 저렴 합니다. 사실, WPF 구현Dispatcher.BeginInvoke 에서도 매우 유사한 접근 방식을 취합니다 .

TPL이 시점에서 목표 동기 콘텍스트를 비교 await지점의 것과 tcs.SetResult. 동기화 컨텍스트가 동일하거나 두 위치에 동기화 컨텍스트가없는 경우 연속은 동 기적으로 직접 호출됩니다. 그렇지 않으면 SynchronizationContext.Post대상 동기화 컨텍스트, 즉 정상적인 await동작을 사용하여 대기열 에 추가 됩니다. 이 접근 방식은 항상 SynchronizationContext.Post동작 (또는 대상 동기화 컨텍스트가없는 경우 풀 스레드 연속)을 부과합니다 .

업데이트되었습니다 . 현재 동기화 컨텍스트를 고려하지 않기 task.ContinueWith때문에 ContinueWith에서는 작동 하지 않습니다. 그러나 await task( 바이올린 )에서 작동합니다 . 또한 await task.ConfigureAwait(false).

OTOH, 이 접근 방식은 ContinueWith.


답변

중단 시뮬레이션 접근 방식은 정말 좋은 듯했지만 TPL 하이재킹 스레드을 주도 일부 시나리오에서 .

나는 다음과 유사했다 구현했다 지속 개체를 확인 하지만, 단지 확인 어떤 아니라 실제로 작업에 주어진 코드에 대한 너무 많은 시나리오가 있기 때문에 지속하지만, 심지어 물건이 좋아하는 것을 의미하는 Task.Wait스레드 풀 조회 결과.

궁극적으로 많은 IL을 검사 한 후 안전하고 유용한 시나리오는 SetOnInvokeMres시나리오 (수동 재설정 이벤트 슬림 연속) 뿐입니다 . 다른 많은 시나리오가 있습니다.

  • 일부는 안전하지 않으며 스레드 하이재킹으로 이어집니다.
  • 나머지는 결국 쓰레드 풀로 이어 지므로 유용하지 않습니다.

그래서 결국에는 null이 아닌 연속 객체를 확인하기로 결정했습니다. null이면 괜찮습니다 (연속 없음). null이 아닌 경우 특수 사례 검사 SetOnInvokeMres-다음과 같은 경우 : 괜찮음 (호출하기에 안전함); 그렇지 않으면 TrySetComplete스푸핑 중단과 같은 특별한 작업을 수행하도록 작업에 지시하지 않고 스레드 풀이를 수행하도록합니다 . 접근 방식을 Task.Wait사용합니다 SetOnInvokeMres. 이것은 교착 상태를 방지 하기 위해 정말 열심히 노력하려는 특정 시나리오 입니다.

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));


답변