走进Task(1):什么是Task
admin
撰写于 2022年 01月 22 日

目录
  • 前言
  • 从表象讲起
    • Task 从何而来
    • Task 常见用法
    • Task 的分类
      • 按是否包含 Result 分,也就是是否是泛型 Task
      • 按得到 Task 的方式,可以分为
    • 对 Task 进行分解
  • Task 在哪执行?
    • 线程池
    • 一个独立的后台线程中
    • 自定义的TaskScheduler里
    • Task 可以封装任何类型的别的任务
  • 小结

前言

本系列会拆分为以下几篇分次进行叙述:

  1. 什么是 Task(本文)
  2. Task 的回调执行与 await(TODO)
  3. async 到底干了什么(TODO)
  4. 总结与常见误区(TODO)

2 中,会和大家分享死锁相关的问题。23 中会穿插自定义 Awaitable 的话题。

本系列会直接引用前一篇博客概述 .NET 6 ThreadPool 实现 里的结论,所有请没看过的同学先麻烦看下。

文中所有例子均出于解释目的,并非具有实际意义的代码。有返回值的 Task 和无返回值的 Task 实际区别不是很大,下文大多数举例不做特别区分。不纠结 api 的使用细节,只讲 Task 的整体设计思路。

代码运行截图是在 .NET 6 中的,其他版本的设计没有大的改动,不影响学习。

笔者解读并非权威解读,只是希望能给大家一个理解 Task 的方法。

从表象讲起

Task 从何而来

以下仅做典型举例,并非全部

  • new Task
new Task(_ =>
{
Console.WriteLine("Hello World!");
}, null).Start();
  • TaskFactory.StartNew
new TaskFactory().StartNew(() =>
{
Console.WriteLine("Hello World!");
});
  • Task.Run
Task.Run(() =>
{
Console.WriteLine("Hello World!");
});
  • Task.FromResult 等直接创建一个已完成的 Task
Task.FromResult("Hello World!");
var task = Task.CompletedTask;
  • 某个不知道其内部实现的 async 方法
async Task FooAsync();

Task 常见用法

  • 注册一个回调,等待 Task 执行完成时获取结果并执行回调
var task = Task.Run(() => "Hello World!");
task.ContinueWith(t => Console.WriteLine(t.Result));
  • await 一个 Task 并得到结果
var task = Task.Run(() => "Hello World!");
var result = await task;
Console.WriteLine(result);
  • 直接 GetResult
var task = Task.Run(() => "Hello World!");
// 等效于 task.Result
var result = task.GetAwaiter().GetResult();
Console.WriteLine(result);

Task 的分类

按是否包含 Result 分,也就是是否是泛型 Task

  • Task
  • Task

按得到 Task 的方式,可以分为

  • 我知道这个 Task 是怎么来的,这种情况下,我们自己参与了 Task 的创建过程,知道这个 Task 是在干啥。比如:
Task task = Task.Run(() => 1 + 2);

计算 1 + 2,并将结果作为 Task 的结果。

  • 不知道这个 Task 是怎么来的。比如:
Task task = new HttpClient().GetStringAsync("http://localhost:5000/api/values");

而这两种获取方式的不同对应的是两种完全不同的侧重点:

  1. Task 是一个白盒,关注 Task 里干了什么,在哪执行里面这些代码。
  2. Task 是一个黑盒,关注 Task 能给到我什么,Task 完成执行之后,我该干什么。

对 Task 进行分解

按功能点可以将 Task 分为三个部分

  • 任务执行:通过 Task.Run 等方式执行一段我们自定义的逻辑。
  • 回调通知及回调执行:注册一个回调,等待 Task 完成时执行。
  • await 语法支持:脱离了 await,task 的上述两个功能依旧可以完整执行。但却会丧失代码的简洁性。

Task 在哪执行?

线程池

Task 可以作为 ThreadPool 队列系统的基本单元被 ThreadPool 调度执行。

下面这些常见的创建 Task 的方式,默认情况都是在 ThreadPool 中被调度执行的,这几个本质上是一样的,只是使用方式上和可支持传入的自定义选项上的区别。

  • new Task
new Task(_ =>
{
Console.WriteLine("Hello World!");
}, null).Start();
  • TaskFactory.StartNew
new TaskFactory().StartNew(() =>
{
Console.WriteLine("Hello World!");
});
  • Task.Run
// 可以看做简化版的 TaskFactory.StartNew
Task.Run(() =>
{
Console.WriteLine("Hello World!");
});

Task.Run 为例来看下里面到底做了些什么。

PortableThreadPool.TryCreateWorkerThread 和实际要要执行的 lambda 表达式中打上断点,我们便可以清晰的看到整个执行过程。

整理一下的话,主要就是这个样子,为简化理解,ThreadPool 中的调用细节已省略。

Task 关键代码摘录:

class Task
{
// 任务的主体,我们要执行的实际逻辑
// 可能有返回值,可能没有
internal Delegate m_action; // 任务的状态
internal volatile int m_stateFlags; // ThreadPool 调用入口,由于 JIT 的内联优化,调用栈里只能看到 ExecuteEntryUnsafe,看不到这个方法
internal virtual void ExecuteFromThreadPool(Thread threadPoolThread) => ExecuteEntryUnsafe(threadPoolThread); internal void ExecuteEntryUnsafe(Thread? threadPoolThread)
{
// 设置 Task 状态为已经执行
m_stateFlags |= (int)TaskStateFlags.DelegateInvoked; if (!IsCancellationRequested & !IsCanceled)
{
ExecuteWithThreadLocal(ref t_currentTask, threadPoolThread);
}
else
{
ExecuteEntryCancellationRequestedOrCanceled();
}
} // 创建 Task 的时候可传入的数据,用于执行时使用
// new Task(state => Console.WriteLine(state), "Hello World").Start();
internal object? m_stateObject; private void ExecuteWithThreadLocal(ref Task currentTaskSlot, Thread threadPoolThread = null)
{
// 执行上下文维护着代码执行逻辑上下文的一些数据,如 AsyncLocal
// 具体请看我的 AsyncLocal 博客 https://www.cnblogs.com/eventhorizon/p/12240767.html
ExecutionContext? ec = CapturedContext;
if (ec == null)
{
// 没有执行上下文,直接执行
InnerInvoke();
}
else
{
// 是否是在 ThreadPool 线程上执行
if (threadPoolThread is null)
{
ExecutionContext.RunInternal(ec, s_ecCallback, this);
}
else
{
ExecutionContext.RunFromThreadPoolDispatchLoop(threadPoolThread, ec, s_ecCallback, this);
}
}
} // 不管 ExecuteWithThreadLocal 分支如何,最后会走到 InnerInvoke
internal virtual void InnerInvoke()
{
if (m_action is Action action)
{
action();
return;
} if (m_action is Action actionWithState)
{
actionWithState(m_stateObject);
}
}
}

可以看到 Task 以 ThreadPoolTaskScheduler 为媒介,进入了 ThreadPool。ThreadPool 调用 Task.ExecuteFromThreadPool 方法最终触发 Task 所封装的 action 的执行。

与 ThreadPool 中另一种基本单元 IThreadPoolWorkItem 一样,Task 在进入 ThreadPoolWorkQueue 时会有两种可能,进入全局队列或者本地队列。

理解这个问题,我们需要看一下 ThreadPoolTaskScheduler.QueueTask 里做了些什么。

internal sealed class ThreadPoolTaskScheduler : TaskScheduler
{
protected internal override void QueueTask(Task task)
{
TaskCreationOptions options = task.Options;
if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
{
// 创建独立线程,和线程池无关
new Thread(s_longRunningThreadWork)
{
IsBackground = true,
Name = ".NET Long Running Task"
}.UnsafeStart(task);
}
else
{
// 第二个参数是 preferLocal
// options & TaskCreationOptions.PreferFairness 这个位标志的枚举用法可查看官方资料
// https://docs.microsoft.com/zh-cn/dotnet/csharp/language-reference/builtin-types/enum#enumeration-types-as-bit-flags
ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0);
}
}
}

上面代码里的 TaskCreationOptions 是我们在创建 Task 的时候可以指定的一个选项,默认是 None。

Task.Run 不支持传入该选项,可使用 TaskFactory.StartNew 的重载进行指定:

new TaskFactory().StartNew(() =>
{
Console.WriteLine("Hello World!");
}, TaskCreationOptions.PreferFairness);

根据 TaskCreationOptions 的不同,出现了三个分支

  • LongRunning:独立线程,和线程池无关
  • 包含 PreferFairness时:preferLocal=false,进入全局队列
  • 不包含 PreferFairness时:preferLocal=ture,进入本地队列

进入全局队列的任务能够公平地被各个线程池中的线程领取执行,也是就是 prefer fairness 这个词组的字面意思了。

下图中 Task666 先进入全局队列,随后被 Thread1 领走。Thread3 通过 WorkStealing 机制窃取了 Thread2 中的 Task2。

一个独立的后台线程中

也就是上文提到的创建 Task 时使用 TaskCreationOptions.LongRunning,如果你需要一个执行一个长时间的任务,比如一段耗时很久的同步代码,就可以使用这个。执行异步代码(指 await xxx)时不推荐使用,后面会讲原因。

new TaskFactory().StartNew(() =>
{
// 耗时较长的同步代码
}, TaskCreationOptions.LongRunning);

ThreadPool 管理的线程是出于可复用的目的设计的,不停地从队列系统中领取任务执行。如果一个 WorkThread 阻塞在一个耗时较长的任务上,它就没办法处理其他任务,ThreadPool 的吞吐率会受影响。

当然并不意味着 ThreadPool 不能处理这样的任务。举个极端的例子,如果线程池目前的 WorkThread 全在处理 LongRunning Task。在 Starvation Avoidance 机制(每隔500ms)创建新的 WorkThread 之前,ThreadPool 没法执行新的任务。

LongRunning 的 Task 生命周期与 ThreadPool 设计目的不符合,因此需独立开来。

自定义的TaskScheduler里

除了 ThreadPoolTaskScheduler 外,我们还可以定义自己的 TaskScheduler

首先需要继承 TaskScheduler 这个抽象类,有三个抽象方法需要我们实现。

public abstract class TaskScheduler
{
// 入口,待调度执行的 Task 会通过该方法传入
protected internal abstract void QueueTask(Task task); // 这个是在执行 Task 回调的时候才会被执行到的方法,放到后面再讲
protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued); // 获取所有调度到该 TaskScheduler 的 Task
protected abstract IEnumerable? GetScheduledTasks();
}

在我们自定义的 TaskScheduler 里,在 QueueTask 被执行时会拿到 Task,但是 Task 要怎么去触发里面的 action 呢。

Task 针对 ThreadPool 的调用场景暴露了一个 ExecuteFromThreadPool 的 internal 方法,同时也提供了一个 ExecuteEntry 方法供其他场景调用,但是这个方法也是 internal 的。只能通过 TaskScheduler 的 protect 方法进行间接调用。

public abstract class TaskScheduler
{
protected bool TryExecuteTask(Task task)
{
if (task.ExecutingTaskScheduler != this)
{
throw new InvalidOperationException(SR.TaskScheduler_ExecuteTask_WrongTaskScheduler);
} return task.ExecuteEntry();
}
} 下面是一个自定义的 TaskScheduler,在一个固定的线程上顺序执行 Task。
```C#
class CustomTaskScheduler : TaskScheduler
{
private readonly BlockingCollection _queue = new(); public CustomTaskScheduler()
{
new Thread(() =>
{
while (true)
{
var task = _queue.Take();
Console.WriteLine($"task {task.Id} is going to be executed");
TryExecuteTask(task);
Console.WriteLine($"task {task.Id} has been executed");
}
})
{
IsBackground = true
}.Start();
} protected override IEnumerable GetScheduledTasks()
{
return _queue.ToArray();
} protected override void QueueTask(Task task)
{
_queue.Add(task);
} protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
}

在 TaskFactory 的构造函数中可以传入我们自定义的 TaskScheduler

var taskFactory = new TaskFactory(new CustomTaskScheduler());
taskFactory.StartNew(() =>
Console.WriteLine($"task {Task.CurrentId}" +
$" threadId: {Thread.CurrentThread.ManagedThreadId}"));
taskFactory.StartNew(() =>
Console.WriteLine($"task {Task.CurrentId}" +
$" threadId: {Thread.CurrentThread.ManagedThreadId}"));
Console.ReadLine();

输出结果如下:

var taskFactory = new TaskFactory(new CustomTaskScheduler());
taskFactory.StartNew(() =>
Console.WriteLine($"task {Task.CurrentId}" +
$" threadId: {Thread.CurrentThread.ManagedThreadId}"));
taskFactory.StartNew(() =>
Console.WriteLine($"task {Task.CurrentId}" +
$" threadId: {Thread.CurrentThread.ManagedThreadId}"));
Console.ReadLine();
task 1 is going to be executed
task 1 threadId: 10
task 1 has been executed
task 2 is going to be executed
task 2 threadId: 10
task 2 has been executed

所有的 Task 都会在一个线程里被调度执行。

Task 可以封装任何类型的别的任务

上面两种情况,Task 都存在明确的执行实体,但有时候,可能是没有的。看下面这样的例子。

var task = FooAsync();
var action = typeof(Task).GetField("m_action", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(task);
Console.WriteLine($"Task action is null: {action == null}");
task.ContinueWith(t => Console.WriteLine(t.Result));
// 回调可以注册多个
task.ContinueWith(t => Console.WriteLine(t.Result)); Task FooAsync()
{
var tsc = new TaskCompletionSource();
new Thread(() =>
{
Thread.Sleep(1000);
tsc.SetResult("Hello World");
})
{
IsBackground = true
}.Start();
return tsc.Task;
}

输出:

Task action is null: True
Hello World
Hello World

从 FooAsync 外部和内部两个角度来看这个问题

  • FooAsync 外:拿到了一个 Task 并注册了回调
  • FooAsync 内:相当于间接的持有了这个回调,并通过 tsc.SetResult 间接地调用了这个回调。

下面是关键代码的摘录

class Task
{
// 保存一个或一组回调
private volatile object? m_continuationObject; internal bool TrySetResult(TResult result)
{
// ...
this.m_result = result;
FinishContinuations();
// ...
} internal void FinishContinuations()
{
// 处理回调的执行
}
} public class TaskCompletionSource
{
public TaskCompletionSource() => _task = new Task(); public Task Task => _task; public void SetResult(TResult result)
{
TrySetResult(result);
} public bool TrySetResult(TResult result)
{
_task.TrySetResult(result);
// ...
}
}

有时候 Task.TrySetResult() 的触发源可能是一个异步IO完成事件导致的,也就是我们常说的异步IO,硬件有自己的处理芯片,在异步IO完成通知CPU(硬件中断 hardware interrupt)之前,CPU并不需要参与,这也是异步IO的价值所在。

小结

Task 是个已经完成或者将在未来某个时间点完成的任务,可以向其注册一个回调等待任务完成时被执行。

走进Task(1):什么是Task的

走进Task(1):什么是Task

目录
  • 前言
  • 从表象讲起
    • Task 从何而来
    • Task 常见用法
    • Task 的分类
      • 按是否包含 Result 分,也就是是否是泛型 Task
      • 按得到 Task 的方式,可以分为
    • 对 Task 进行分解
  • Task 在哪执行?
    • 线程池
    • 一个独立的后台线程中
    • 自定义的TaskScheduler里
    • Task 可以封装任何类型的别的任务
  • 小结

前言

本系列会拆分为以下几篇分次进行叙述:

  1. 什么是 Task(本文)
  2. Task 的回调执行与 await(TODO)
  3. async 到底干了什么(TODO)
  4. 总结与常见误区(TODO)

2 中,会和大家分享死锁相关的问题。23 中会穿插自定义 Awaitable 的话题。

本系列会直接引用前一篇博客概述 .NET 6 ThreadPool 实现 里的结论,所有请没看过的同学先麻烦看下。

文中所有例子均出于解释目的,并非具有实际意义的代码。有返回值的 Task 和无返回值的 Task 实际区别不是很大,下文大多数举例不做特别区分。不纠结 api 的使用细节,只讲 Task 的整体设计思路。

代码运行截图是在 .NET 6 中的,其他版本的设计没有大的改动,不影响学习。

笔者解读并非权威解读,只是希望能给大家一个理解 Task 的方法。

从表象讲起

Task 从何而来

以下仅做典型举例,并非全部

  • new Task
new Task(_ =>
{
Console.WriteLine("Hello World!");
}, null).Start();
  • TaskFactory.StartNew
new TaskFactory().StartNew(() =>
{
Console.WriteLine("Hello World!");
});
  • Task.Run
Task.Run(() =>
{
Console.WriteLine("Hello World!");
});
  • Task.FromResult 等直接创建一个已完成的 Task
Task.FromResult("Hello World!");
var task = Task.CompletedTask;
  • 某个不知道其内部实现的 async 方法
async Task FooAsync();

Task 常见用法

  • 注册一个回调,等待 Task 执行完成时获取结果并执行回调
var task = Task.Run(() => "Hello World!");
task.ContinueWith(t => Console.WriteLine(t.Result));
  • await 一个 Task 并得到结果
var task = Task.Run(() => "Hello World!");
var result = await task;
Console.WriteLine(result);
  • 直接 GetResult
var task = Task.Run(() => "Hello World!");
// 等效于 task.Result
var result = task.GetAwaiter().GetResult();
Console.WriteLine(result);

Task 的分类

按是否包含 Result 分,也就是是否是泛型 Task

  • Task
  • Task

按得到 Task 的方式,可以分为

  • 我知道这个 Task 是怎么来的,这种情况下,我们自己参与了 Task 的创建过程,知道这个 Task 是在干啥。比如:
Task task = Task.Run(() => 1 + 2);

计算 1 + 2,并将结果作为 Task 的结果。

  • 不知道这个 Task 是怎么来的。比如:
Task task = new HttpClient().GetStringAsync("http://localhost:5000/api/values");

而这两种获取方式的不同对应的是两种完全不同的侧重点:

  1. Task 是一个白盒,关注 Task 里干了什么,在哪执行里面这些代码。
  2. Task 是一个黑盒,关注 Task 能给到我什么,Task 完成执行之后,我该干什么。

对 Task 进行分解

按功能点可以将 Task 分为三个部分

  • 任务执行:通过 Task.Run 等方式执行一段我们自定义的逻辑。
  • 回调通知及回调执行:注册一个回调,等待 Task 完成时执行。
  • await 语法支持:脱离了 await,task 的上述两个功能依旧可以完整执行。但却会丧失代码的简洁性。

Task 在哪执行?

线程池

Task 可以作为 ThreadPool 队列系统的基本单元被 ThreadPool 调度执行。

下面这些常见的创建 Task 的方式,默认情况都是在 ThreadPool 中被调度执行的,这几个本质上是一样的,只是使用方式上和可支持传入的自定义选项上的区别。

  • new Task
new Task(_ =>
{
Console.WriteLine("Hello World!");
}, null).Start();
  • TaskFactory.StartNew
new TaskFactory().StartNew(() =>
{
Console.WriteLine("Hello World!");
});
  • Task.Run
// 可以看做简化版的 TaskFactory.StartNew
Task.Run(() =>
{
Console.WriteLine("Hello World!");
});

Task.Run 为例来看下里面到底做了些什么。

PortableThreadPool.TryCreateWorkerThread 和实际要要执行的 lambda 表达式中打上断点,我们便可以清晰的看到整个执行过程。

整理一下的话,主要就是这个样子,为简化理解,ThreadPool 中的调用细节已省略。

Task 关键代码摘录:

class Task
{
// 任务的主体,我们要执行的实际逻辑
// 可能有返回值,可能没有
internal Delegate m_action; // 任务的状态
internal volatile int m_stateFlags; // ThreadPool 调用入口,由于 JIT 的内联优化,调用栈里只能看到 ExecuteEntryUnsafe,看不到这个方法
internal virtual void ExecuteFromThreadPool(Thread threadPoolThread) => ExecuteEntryUnsafe(threadPoolThread); internal void ExecuteEntryUnsafe(Thread? threadPoolThread)
{
// 设置 Task 状态为已经执行
m_stateFlags |= (int)TaskStateFlags.DelegateInvoked; if (!IsCancellationRequested & !IsCanceled)
{
ExecuteWithThreadLocal(ref t_currentTask, threadPoolThread);
}
else
{
ExecuteEntryCancellationRequestedOrCanceled();
}
} // 创建 Task 的时候可传入的数据,用于执行时使用
// new Task(state => Console.WriteLine(state), "Hello World").Start();
internal object? m_stateObject; private void ExecuteWithThreadLocal(ref Task currentTaskSlot, Thread threadPoolThread = null)
{
// 执行上下文维护着代码执行逻辑上下文的一些数据,如 AsyncLocal
// 具体请看我的 AsyncLocal 博客 https://www.cnblogs.com/eventhorizon/p/12240767.html
ExecutionContext? ec = CapturedContext;
if (ec == null)
{
// 没有执行上下文,直接执行
InnerInvoke();
}
else
{
// 是否是在 ThreadPool 线程上执行
if (threadPoolThread is null)
{
ExecutionContext.RunInternal(ec, s_ecCallback, this);
}
else
{
ExecutionContext.RunFromThreadPoolDispatchLoop(threadPoolThread, ec, s_ecCallback, this);
}
}
} // 不管 ExecuteWithThreadLocal 分支如何,最后会走到 InnerInvoke
internal virtual void InnerInvoke()
{
if (m_action is Action action)
{
action();
return;
} if (m_action is Action actionWithState)
{
actionWithState(m_stateObject);
}
}
}

可以看到 Task 以 ThreadPoolTaskScheduler 为媒介,进入了 ThreadPool。ThreadPool 调用 Task.ExecuteFromThreadPool 方法最终触发 Task 所封装的 action 的执行。

与 ThreadPool 中另一种基本单元 IThreadPoolWorkItem 一样,Task 在进入 ThreadPoolWorkQueue 时会有两种可能,进入全局队列或者本地队列。

理解这个问题,我们需要看一下 ThreadPoolTaskScheduler.QueueTask 里做了些什么。

internal sealed class ThreadPoolTaskScheduler : TaskScheduler
{
protected internal override void QueueTask(Task task)
{
TaskCreationOptions options = task.Options;
if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
{
// 创建独立线程,和线程池无关
new Thread(s_longRunningThreadWork)
{
IsBackground = true,
Name = ".NET Long Running Task"
}.UnsafeStart(task);
}
else
{
// 第二个参数是 preferLocal
// options & TaskCreationOptions.PreferFairness 这个位标志的枚举用法可查看官方资料
// https://docs.microsoft.com/zh-cn/dotnet/csharp/language-reference/builtin-types/enum#enumeration-types-as-bit-flags
ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0);
}
}
}

上面代码里的 TaskCreationOptions 是我们在创建 Task 的时候可以指定的一个选项,默认是 None。

Task.Run 不支持传入该选项,可使用 TaskFactory.StartNew 的重载进行指定:

new TaskFactory().StartNew(() =>
{
Console.WriteLine("Hello World!");
}, TaskCreationOptions.PreferFairness);

根据 TaskCreationOptions 的不同,出现了三个分支

  • LongRunning:独立线程,和线程池无关
  • 包含 PreferFairness时:preferLocal=false,进入全局队列
  • 不包含 PreferFairness时:preferLocal=ture,进入本地队列

进入全局队列的任务能够公平地被各个线程池中的线程领取执行,也是就是 prefer fairness 这个词组的字面意思了。

下图中 Task666 先进入全局队列,随后被 Thread1 领走。Thread3 通过 WorkStealing 机制窃取了 Thread2 中的 Task2。

一个独立的后台线程中

也就是上文提到的创建 Task 时使用 TaskCreationOptions.LongRunning,如果你需要一个执行一个长时间的任务,比如一段耗时很久的同步代码,就可以使用这个。执行异步代码(指 await xxx)时不推荐使用,后面会讲原因。

new TaskFactory().StartNew(() =>
{
// 耗时较长的同步代码
}, TaskCreationOptions.LongRunning);

ThreadPool 管理的线程是出于可复用的目的设计的,不停地从队列系统中领取任务执行。如果一个 WorkThread 阻塞在一个耗时较长的任务上,它就没办法处理其他任务,ThreadPool 的吞吐率会受影响。

当然并不意味着 ThreadPool 不能处理这样的任务。举个极端的例子,如果线程池目前的 WorkThread 全在处理 LongRunning Task。在 Starvation Avoidance 机制(每隔500ms)创建新的 WorkThread 之前,ThreadPool 没法执行新的任务。

LongRunning 的 Task 生命周期与 ThreadPool 设计目的不符合,因此需独立开来。

自定义的TaskScheduler里

除了 ThreadPoolTaskScheduler 外,我们还可以定义自己的 TaskScheduler

首先需要继承 TaskScheduler 这个抽象类,有三个抽象方法需要我们实现。

public abstract class TaskScheduler
{
// 入口,待调度执行的 Task 会通过该方法传入
protected internal abstract void QueueTask(Task task); // 这个是在执行 Task 回调的时候才会被执行到的方法,放到后面再讲
protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued); // 获取所有调度到该 TaskScheduler 的 Task
protected abstract IEnumerable? GetScheduledTasks();
}

在我们自定义的 TaskScheduler 里,在 QueueTask 被执行时会拿到 Task,但是 Task 要怎么去触发里面的 action 呢。

Task 针对 ThreadPool 的调用场景暴露了一个 ExecuteFromThreadPool 的 internal 方法,同时也提供了一个 ExecuteEntry 方法供其他场景调用,但是这个方法也是 internal 的。只能通过 TaskScheduler 的 protect 方法进行间接调用。

public abstract class TaskScheduler
{
protected bool TryExecuteTask(Task task)
{
if (task.ExecutingTaskScheduler != this)
{
throw new InvalidOperationException(SR.TaskScheduler_ExecuteTask_WrongTaskScheduler);
} return task.ExecuteEntry();
}
} 下面是一个自定义的 TaskScheduler,在一个固定的线程上顺序执行 Task。
```C#
class CustomTaskScheduler : TaskScheduler
{
private readonly BlockingCollection _queue = new(); public CustomTaskScheduler()
{
new Thread(() =>
{
while (true)
{
var task = _queue.Take();
Console.WriteLine($"task {task.Id} is going to be executed");
TryExecuteTask(task);
Console.WriteLine($"task {task.Id} has been executed");
}
})
{
IsBackground = true
}.Start();
} protected override IEnumerable GetScheduledTasks()
{
return _queue.ToArray();
} protected override void QueueTask(Task task)
{
_queue.Add(task);
} protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
}

在 TaskFactory 的构造函数中可以传入我们自定义的 TaskScheduler

var taskFactory = new TaskFactory(new CustomTaskScheduler());
taskFactory.StartNew(() =>
Console.WriteLine($"task {Task.CurrentId}" +
$" threadId: {Thread.CurrentThread.ManagedThreadId}"));
taskFactory.StartNew(() =>
Console.WriteLine($"task {Task.CurrentId}" +
$" threadId: {Thread.CurrentThread.ManagedThreadId}"));
Console.ReadLine();

输出结果如下:

var taskFactory = new TaskFactory(new CustomTaskScheduler());
taskFactory.StartNew(() =>
Console.WriteLine($"task {Task.CurrentId}" +
$" threadId: {Thread.CurrentThread.ManagedThreadId}"));
taskFactory.StartNew(() =>
Console.WriteLine($"task {Task.CurrentId}" +
$" threadId: {Thread.CurrentThread.ManagedThreadId}"));
Console.ReadLine();
task 1 is going to be executed
task 1 threadId: 10
task 1 has been executed
task 2 is going to be executed
task 2 threadId: 10
task 2 has been executed

所有的 Task 都会在一个线程里被调度执行。

Task 可以封装任何类型的别的任务

上面两种情况,Task 都存在明确的执行实体,但有时候,可能是没有的。看下面这样的例子。

var task = FooAsync();
var action = typeof(Task).GetField("m_action", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(task);
Console.WriteLine($"Task action is null: {action == null}");
task.ContinueWith(t => Console.WriteLine(t.Result));
// 回调可以注册多个
task.ContinueWith(t => Console.WriteLine(t.Result)); Task FooAsync()
{
var tsc = new TaskCompletionSource();
new Thread(() =>
{
Thread.Sleep(1000);
tsc.SetResult("Hello World");
})
{
IsBackground = true
}.Start();
return tsc.Task;
}

输出:

Task action is null: True
Hello World
Hello World

从 FooAsync 外部和内部两个角度来看这个问题

  • FooAsync 外:拿到了一个 Task 并注册了回调
  • FooAsync 内:相当于间接的持有了这个回调,并通过 tsc.SetResult 间接地调用了这个回调。

下面是关键代码的摘录

class Task
{
// 保存一个或一组回调
private volatile object? m_continuationObject; internal bool TrySetResult(TResult result)
{
// ...
this.m_result = result;
FinishContinuations();
// ...
} internal void FinishContinuations()
{
// 处理回调的执行
}
} public class TaskCompletionSource
{
public TaskCompletionSource() => _task = new Task(); public Task Task => _task; public void SetResult(TResult result)
{
TrySetResult(result);
} public bool TrySetResult(TResult result)
{
_task.TrySetResult(result);
// ...
}
}

有时候 Task.TrySetResult() 的触发源可能是一个异步IO完成事件导致的,也就是我们常说的异步IO,硬件有自己的处理芯片,在异步IO完成通知CPU(硬件中断 hardware interrupt)之前,CPU并不需要参与,这也是异步IO的价值所在。

小结

Task 是个已经完成或者将在未来某个时间点完成的任务,可以向其注册一个回调等待任务完成时被执行。

走进Task(1):什么是Task的

赞 (0)

猜您想看

评论区(暂无评论)

这里空空如也,快来评论吧~

我要评论