workqueue对work的处理流程 - 549642238/linux-stable GitHub Wiki
workqueue用于异步执行任务,分离了任务(work)的发布者和执行者。当需要执行任务的时候,构造一个work,塞进相应的workqueue,由workqueue所绑定的worker(thread)去执行。如果任务不重要,我们可以让多种任务共享一个worker,而无需每种任务都开一个thread去处理(n -> 1);相反如果任务很多很重要,那么我们可以开多个worker加速处理(1 -> n),类似于生产者消费者模型。
相比传统实现,CMWQRef做了一个很重要的改进就是workqueue和worker进行解耦,提出了worker pool的概念。worker的创建不再与workqueue相关,而是由worker pool统一管理。不同workqueue共享全局的worker pool,但workqueue可以根据需要(flags)选择使用特定的worker pool。于是有以下概念:
· work:任务。被加到workqueue中
· workqueue:工作队列。逻辑上是work排成的队列
· worker:工人。本质上是内核线程(kthread),负责真正执行work
· worker pool:worker的集合。负责管理worker。
work在workqueue的处理流程Ref
__printf(1, 4)
struct workqueue_struct *alloc_workqueue(const char *fmt,
unsigned int flags,
int max_active, ...) // 申请一个workqueue
{
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
if (flags & WQ_UNBOUND) // workqueue不绑核
tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
max_active = max_active ?: WQ_DFL_ACTIVE;
max_active = wq_clamp_max_active(max_active, flags, wq->name); // 计算workqueue上的最大非delay任务数量
wq->flags = flags;
wq->saved_max_active = max_active;
if (alloc_and_link_pwqs(wq) < 0) // 绑定workqueue和worker pool
goto err_unreg_lockdep;
if (wq_online && init_rescuer(wq) < 0) // 初始化rescuer worker
goto err_destroy;
if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq)) // 如果有WQ_SYSFS标志,会在/sys/bus/workqueue/devices/下创建对应的文件
goto err_destroy;
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq); // 基于wq->saved_max_active对每个pwq的max_active做调整
}
static int alloc_and_link_pwqs(struct workqueue_struct *wq) // 生成pwq,绑定workqueue和worker pool
{
if (!(wq->flags & WQ_UNBOUND)) { // 绑核
wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
for_each_possible_cpu(cpu) { // 对应每个CPU
struct pool_workqueue *pwq =
per_cpu_ptr(wq->cpu_pwqs, cpu); // 申请pwq
struct worker_pool *cpu_pools =
per_cpu(cpu_worker_pools, cpu); // 获取当前CPU的worker pool
init_pwq(pwq, wq, &cpu_pools[highpri]); // 绑定workqueue和worker pool,绑定结果为pwq
}
return 0;
}
if (wq->flags & __WQ_ORDERED) { // 不绑核并且max_active=1
ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
/* there should only be single pwq for ordering guarantee */
WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
"ordering guarantee broken for workqueue %s\n", wq->name);
} else {
ret = apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]); // 不绑核
}
}
A. apply_workqueue_attrs -> apply_workqueue_attrs_locked -> apply_wqattrs_prepare
static struct apply_wqattrs_ctx *
apply_wqattrs_prepare(struct workqueue_struct *wq, const struct workqueue_attrs *attrs)
{
struct apply_wqattrs_ctx *ctx;
ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs); // 申请默认的pwq
for_each_node(node) { // 对于每个numa节点
if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) { // 有多个numa节点并且cpu mask满足一定条件
ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs); // 每个numa节点上要申请并绑定一个pwq
} else {
ctx->dfl_pwq->refcnt++;
ctx->pwq_tbl[node] = ctx->dfl_pwq; // 每个numa节点使用默认的pwq
}
}
}
static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs) // 申请并初始化pwq
{
struct worker_pool *pool;
struct pool_workqueue *pwq;
pool = get_unbound_pool(attrs); // 申请worker pool
pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node); // 申请pwq
init_pwq(pwq, wq, pool); // 绑定workqueue和worker pool到pwq
}
static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)// 申请一个worker pool
{
struct worker_pool *pool;
pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node) // 申请worker pool
if (!pool || init_worker_pool(pool) < 0) // 初始化worker pool
goto fail;
}
static int init_worker_pool(struct worker_pool *pool) // 初始化worker pool
{
pool->cpu = -1; // UNBOUND模式下的worker pool不绑核
pool->node = NUMA_NO_NODE; // UNBOUND模式下的worker pool不绑核
timer_setup(&pool->mayday_timer, pool_mayday_timeout, 0); // 设置mayday timer,用来检测worker是否可以成功创建,如果不能则对应worker pool的pwq被放入mayday list,让rescuer thread处理mayday list下pwq->worker_pool.worklist所包含的任务
}
B. apply_workqueue_attrs -> apply_workqueue_attrs_locked -> apply_wqattrs_commit
static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
{
for_each_node(node)
ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
ctx->pwq_tbl[node]); // 将每个numa上的pwq安装到wq->numa_pwq_tbl
swap(ctx->wq->dfl_pwq, ctx->dfl_pwq); // 为workqueue的默认pwq赋值
}
static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq, int node,
struct pool_workqueue *pwq)
{
rcu_assign_pointer(wq->numa_pwq_tbl[node], pwq); // wq->numa_pwq_tbl初始化
}
static int init_rescuer(struct workqueue_struct *wq) // 初始化rescue worker
{
if (!(wq->flags & WQ_MEM_RECLAIM)) // 普通worker创建失败才会考虑使用rescuer worker,一般都是内存原因导致,所以WQ_MEM_RECLAIM标志位影响要不要创建rescuer worker
return 0;
rescuer = alloc_worker(NUMA_NO_NODE); // 创建rescuer worker
rescuer->rescue_wq = wq; // 设置rescuer worker要拯救/服务的workqueue
rescuer->task = kthread_create(rescuer_thread, rescuer, "%s", wq->name);// 对应的rescuer thread
wq->rescuer = rescuer; // workqueue必须知道自己的rescuer是谁
wake_up_process(rescuer->task); // 唤醒rescuer thread
}
int workqueue_sysfs_register(struct workqueue_struct *wq)
{
wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL); // 申请设备
ret = device_register(&wq_dev->dev); // 注册设备
}
static void __queue_work(int cpu, struct workqueue_struct *wq,
struct work_struct *work) // 把任务work放入workqueue
{
struct pool_workqueue *pwq;
struct list_head *worklist;
if (!(wq->flags & WQ_UNBOUND)) // 创建workqueue时没有设置UNBOUND
pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); // 选定对应cpu的pwq
else
pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu)); // 选择一个numa node上的pwq,如果没有就用默认的pwq
if (likely(pwq->nr_active < pwq->max_active)) { // 每个pool workqueue都有任务数量限制,如果没超过就放到对应的worker pool的任务列表worklist中
pwq->nr_active++; // 增加当前pwq上的任务数量
worklist = &pwq->pool->worklist;
} else { // 如果超过任务数量限制,放入delay链表中,等process_one_work执行结束会将delay list中的work刷到worklist中
work_flags |= WORK_STRUCT_DELAYED;
worklist = &pwq->delayed_works;
}
insert_work(pwq, work, worklist, work_flags); // 插入任务到指定worker pool的worklist
}
static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
struct list_head *head, unsigned int extra_flags) // 插入任务work到worker pool的worklist/delay list
struct worker_pool *pool = pwq->pool;
set_work_pwq(work, pwq, extra_flags); // 设置work的worker pool
list_add_tail(&work->entry, head); // 将work放入worker pool的worklist
get_pwq(pwq); // 对应pwq的引用计数因为有任务加入而+1
}
static bool manage_workers(struct worker *worker) // 被worker thread调用
{
struct worker_pool *pool = worker->pool;
pool->manager = worker; // worker是worker pool的管理者
maybe_create_worker(pool); // 管理worker pool,是否需要创建worker
pool->manager = NULL;
}
static void maybe_create_worker(struct worker_pool *pool) // 是否应该创建新的worker,worker pool中的manger worker会调用,根据当前pool worker中的任务数量动态创建worker
{
restart:
mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT); // 设置mayday_timer
while (true) { // 如果create worker一直失败,会一直循环无法执行del_timer_sync,mayday timer就会触发调用pool_mayday_timeout将worker pool中的worklist的所有任务让rescuer thread执行
if (create_worker(pool) || !need_to_create_worker(pool)) // 创建worker,会生成对应的worker thread
break;
}
del_timer_sync(&pool->mayday_timer); // 删除mayday timer
if (need_to_create_worker(pool)) // 如果需要创建worker
goto restart;
}
static void pool_mayday_timeout(struct timer_list *t) // mayday timer执行的函数,调用send_mayday
{
struct worker_pool *pool = from_timer(pool, t, mayday_timer); // 找到对应mayday timer的worker pool
if (need_to_create_worker(pool)) { // 当前确实需要更多的worker
list_for_each_entry(work, &pool->worklist, entry) // 对于每个worker pool的worklist中的work
send_mayday(work); // 把work所在pwq放入mayday list
}
mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INTERVAL); // 重置mayday timer
}
static void send_mayday(struct work_struct *work) // 将work所在的pwq放入mayday list,代表该pwq上的work list应当被对应workqueue的rescuer thread逐个处理
{
struct pool_workqueue *pwq = get_work_pwq(work);
struct workqueue_struct *wq = pwq->wq;
if (!wq->rescuer)
return; // 该work所在的workqueue必须有rescuer
if (list_empty(&pwq->mayday_node)) { // 不能对已经在mayday list的pwq重复加入maydaylist
get_pwq(pwq); // 加入mayday list会增加pwq引用计数
list_add_tail(&pwq->mayday_node, &wq->maydays); // 把pwq放入workqueue的mayday list
wake_up_process(wq->rescuer->task); // 唤醒rescuer thread
}
}
static int worker_thread(void *__worker) // 普通worker的执行线程
{
if (unlikely(!may_start_working(pool)) && manage_workers(worker)) // 每个worker pool中会有一个worker管理worker pool
goto recheck;
do {
struct work_struct *work = list_first_entry(&pool->worklist,
struct work_struct, entry); // 从worker pool中的worklist中拿取一个work
if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
/* optimization path, not strictly necessary */
process_one_work(worker, work); // 执行任务
if (unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
} else {
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker); // 执行任务
}
} while (keep_working(pool));
}
// process_scheduled_works -> process_one_work
static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{ // 使用worker处理任务work
/* 准备执行work,设置当前worker状态 */
worker->current_work = work;
worker->current_func = work->func;
worker->current_pwq = pwq
list_del_init(&work->entry); // work可以从worklist中删除
worker->current_func(work); // 执行work
/* 执行work完毕,重置当前worker状态 */
worker->current_work = NULL;
worker->current_func = NULL;
worker->current_pwq = NULL;
pwq_dec_nr_in_flight(pwq, work_color); // 处理完后的pwq计数释放操作
}
static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, int color) // 处理完work的释放操作
{
pwq->nr_active--; // 已经处理完一个任务,可以将nr_active - 1
if (!list_empty(&pwq->delayed_works)) { // 如果delay list中存在任务
/* one down, submit a delayed one */
if (pwq->nr_active < pwq->max_active)
pwq_activate_first_delayed(pwq); // 将第一个delayed work从worker pool的delay list移入worklist
}
put_pwq(pwq); // 每处理完任务,pwq的引用计数-1
}
static int rescuer_thread(void *__rescuer) // rescuer worker的执行线程
{
while (!list_empty(&wq->maydays)) { // 确认当前workqueue有需要营救的pwq
struct pool_workqueue *pwq = list_first_entry(&wq->maydays,
struct pool_workqueue, mayday_node) // 拿取第一个在mayday list上的pwq
struct worker_pool *pool = pwq->pool; // 找到对应pwq的worker pool
list_del_init(&pwq->mayday_node); // 将pwq从mayday list删除
list_for_each_entry_safe(work, n, &pool->worklist, entry) { // 将worker pool的worklist中对应pwq的任务移到rescuer worker的任务列表
if (get_work_pwq(work) == pwq) {
if (first)
pool->watchdog_ts = jiffies;
move_linked_works(work, scheduled, &n);
}
first = false;
}
if (!list_empty(scheduled)) { // 如果rescuer worker的任务列表中有work
process_scheduled_works(rescuer); // 执行任务
if (need_to_create_worker(pool)) { // 当前仍然需要创建worker,仍然可以将pwq重新放回mayday list
if (wq->rescuer && list_empty(&pwq->mayday_node)) {
get_pwq(pwq);
list_add_tail(&pwq->mayday_node, &wq->maydays); // 重新放入mayday list
}
}
}
put_pwq(pwq); // 这里的put释放的是pwq加入mayday list的引用计数
}
#include <linux/init.h>
#include <linux/module.h>
#include <linux/kthread.h>
#include <linux/err.h>
#include <linux/delay.h>
#define N 100
#define WORKQUEUE_NAME "my-test-wq"
struct workqueue_struct *my_workqueue;
struct work_struct my_work[N];
void do_work(struct work_struct *work) // 需要异步执行的任务
{
printk("DO WORK!\n");
msleep(20);
}
int test_init(void)
{
int i;
my_workqueue = alloc_workqueue(WORKQUEUE_NAME, WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_SYSFS, 0); // 创建workqueue同时允许rescuer和sys文件的创建,不绑核
if (!my_workqueue) {
printk("[WARN] alloc workqueue failed.\n");
return 0;
}
for (i = 0; i < N; ++i) {
INIT_WORK(&my_work[i], do_work); // 初始化任务my_work[i]
queue_work_on(i % 2, my_workqueue, &my_work[i]); // 将my_work[i]加入workqueue,建议处理的pwq对应numa节点为i % 2
// queue_work(my_workqueue, &my_work[i]);
}
return 0;
}
void test_exit(void)
{
destroy_workqueue(my_workqueue); // 销毁workqueue
}
module_init(test_init);
module_exit(test_exit);
操作
$ insmod test_workqueue.ko
DO WORK!
DO WORK!
DO WORK!
...
$ ls /sys/bus/workqueue/devices/my-test-wq
cpumask max_active nice numa per_cpu pool_ids power subsystem uevent