良许Linux教程网 干货合集 Linux内核工作队列

Linux内核工作队列

一、Linux内核工作队列

Linux内核中,工作队列作为一种异步处理机制,用于延迟执行那些需要在进程上下文中完成的任务。通常由内核模块或驱动程序使用,以确保不会在中断上下文中执行长时间运行的操作。

当中断需要执行大量任务时,就需要将其分为上半部和下半部。上半部分是指中断服务函数,在这里需要尽快完成以避免干扰系统中其他中断的处理,而下半部分则在稍后执行,用于处理中断中尚未完成的耗时任务。

上半部分在中断上下文中执行,而下半部分则在进程上下文中执行。工作队列便是下半部分机制的一种实现方式,本质上是一种代码推迟执行的机制。

  • 将代码推迟到工作队列中,可以获得进程上下文的所有优点。
  • 工作队列可以进行调度,也可以进入睡眠状态。

换言之,如果需要推迟执行的代码需要进入睡眠状态,则选择工作队列

二、工作队列的优势

使用工作队列的主要优势之一是避免在中断上下文中执行长时间运行的操作。中断上下文应尽可能短暂,以免影响系统的响应速度和稳定性。使用工作队列可以将长时间运行的操作推迟到稍后执行,从而避免在中断上下文中执行。

另一个优势是工作队列可以在多个CPU上并行执行任务,这可以提高系统的吞吐量和响应速度。

总的来说,工作队列是Linux内核中非常实用的异步处理机制。它可以帮助内核模块和驱动程序避免在中断上下文中执行长时间运行的操作,并提高系统的吞吐量和响应速度。

三、 工作队列的使用(使用系统自带的工作队列)

初始化work_struct

静态方法:

DECLARE_WORK(work,work_fn);

动态方法:

INIT_WORK(work,work_fn)

所谓静态就是在编译时候就得到了work_struct的空间,跟随着ko,会增大ko的大小,最后被放到内存上。

所谓的动态就是在运行时候才得到了work_srtuct的空间,这时候是在堆上,本质上也是在内存上。

调度work_struct 到workqueue

schedule_work(struct work_struct *work)
schedule_work_on(int cpu, struct work_struct *work)

一个work可以被调度到系统的任意一个CPU,也可以被调度在指定的CPU,这就是以上两个API的区别。

取消一个work_struct

cancel_work(struct work_struct *work);
cancel_work_sync(struct work_struct *work);

取消一个work的时候,有可能他正在进行,如果需要等待它执行完,可以使用cancel_work_sync,等待其执行完毕再取消.

强制work的执行

可能有多种原因,当你想work执行的那一刻,工作队列正在执行别的work或者在睡眠,那么你的work不能如期执行,这时候可以强制让work执行。

一个工作队列上是可以有多个work的,当然也可以针对一个工作队列来强制执行上面的多个work,使用flush_scheduled_work和flush_workqueue。flush_scheduled_work不带参数,特指system_wq,这是一个系统自带的工作队列。

flush_work(struct work_struct *work);
flush_scheduled_work()
flush_workqueue(struct workqueue_struct *_wq)

四、工作队列demo(使用系统自带的工作队列)

以下驱动demo,特意使用了schedule_work_on的API,证明一个work是可以指定CPU运行的,通过写/dev/work_queue设备节点,触发调度work,在work回调里执行计算密集型任务,即可观察指定的CPU的使用率,最后卸载模块的时候,特意使用了带sync的API,等待work执行完毕,因此可以看到rmmod模块时会阻塞掉一段时间。

#include 
#include 
#include 
#include 
#include 
#include 

struct my_work_info{
 dev_t dev;
 struct cdev chrdev;
 struct class *work_class;
 struct device *device;
 struct kobject *sysfs_obj;
};

struct my_work_info *g_data;
volatile int etx_value = 0;

void mywork_fn(struct work_struct *work)
{
 int  i = 5;
 unsigned long long cont;
 while(i--)
 {
  int i, j;
  if(work_busy(work))
   printk(KERN_INFO "busy\n");
     for (i = 0; i for (j = 0; j "cont(%d)\n",cont);
         }
     }
  
 }
}

static DECLARE_WORK(mywork,mywork_fn);

ssize_t work_queue_dev_read(struct file *filep, char __user *ubuf, size_t len, loff_t *offset)
{
 printk(KERN_INFO "Read function\n");
 return 0;
}

ssize_t work_queue_dev_write (struct file *filep, const char __user *ubuf, size_t len, loff_t *offset)
{
  printk(KERN_INFO "Write Function\n");
  if(etx_value)
   schedule_work_on(1,&mywork);
  else
   cancel_work_sync(&mywork);
  return len;
 

}

int work_queue_dev_open (struct inode *inode, struct file *filep)
{
 printk(KERN_INFO "Device File Opened...!!!\n");
 return 0;

}

int work_queue_dev_close (struct inode *inode, struct file *filep)
{
 printk(KERN_INFO "Device File Closed...!!!\n");
 return 0;
}

static struct file_operations work_queue_fops = {
 .open = work_queue_dev_open,
 .release = work_queue_dev_close,
 .read = work_queue_dev_read,
 .write = work_queue_dev_write,
};

static ssize_t sysfs_show(struct kobject *kobj, 
                struct kobj_attribute *attr, char *buf)
{
        printk(KERN_INFO "Sysfs - Read!!!\n");
        return sprintf(buf, "%d", etx_value);
}
    
/*
** This function will be called when we write the sysfsfs file
*/
static ssize_t sysfs_store(struct kobject *kobj, 
                struct kobj_attribute *attr,const char *buf, size_t count)
{
        printk(KERN_INFO "Sysfs - Write!!!\n");
        sscanf(buf,"%d",&etx_value);
        return count;
}

 
struct kobj_attribute etx_attr = __ATTR(etx_value, 0660, sysfs_show, sysfs_store);

static int __init work_queue_init(void)
{
 struct my_work_info*wk_info;
 int ret = 0;
 wk_info = kmalloc(sizeof(struct my_work_info), GFP_KERNEL);
 
 ret = alloc_chrdev_region(&wk_info->dev, 0, 1, "myworkqueue");
 if(ret "Cannot allocate major number\n");
   return ret;
 }

 printk(KERN_INFO "Major = %d Minor = %d \n",MAJOR(wk_info->dev),
    MINOR(wk_info->dev));

 cdev_init(&wk_info->chrdev, &work_queue_fops);

 ret = cdev_add(&wk_info->chrdev, wk_info->dev, 1);
 if(ret){
  pr_err(KERN_INFO "Cannot add the device to the system\n");
  goto err_add;
 }
 
 wk_info->work_class = class_create(THIS_MODULE, "myworkqueue");
 if(IS_ERR(wk_info->work_class)){
  printk(KERN_INFO "Cannot create the struct class\n");
  goto err_add; 
 }

 wk_info->device = device_create(wk_info->work_class, NULL, wk_info->dev, 
     NULL, "workqueuedev");
 if(IS_ERR(wk_info->device)){
  printk(KERN_INFO "Cannot create the struct device\n");
  goto err_device; 
 }

 wk_info->sysfs_obj = kobject_create_and_add("myworkqueuesysfs", 
             kernel_kobj);
 
 if(!wk_info->sysfs_obj){
  printk(KERN_INFO "Cannot create the sysfs dir\n");
  goto err_device; 
 }

 if(sysfs_create_file(wk_info->sysfs_obj ,&etx_attr.attr)){
     printk(KERN_INFO"Cannot create sysfs file......\n");
        goto err_sysfs;
    }
 g_data = wk_info;
 
 return 0;

err_sysfs:
 kobject_put(wk_info->sysfs_obj);
 sysfs_remove_file(wk_info->sysfs_obj,&etx_attr.attr);
 
err_device:
 class_destroy(wk_info->work_class);
 
err_add:
 unregister_chrdev_region(wk_info->dev,1);
 cdev_del(&wk_info->chrdev);
 return ret;
}

static void __exit work_queue_exit(void)
{
 struct my_work_info*wk_info = g_data;
 cancel_work(&mywork);
 kobject_put(wk_info->sysfs_obj); 
    sysfs_remove_file(wk_info->sysfs_obj, &etx_attr.attr);
 device_destroy(wk_info->work_class, wk_info->dev);

 class_destroy(wk_info->work_class);

 cdev_del(&wk_info->chrdev);

 unregister_chrdev_region(wk_info->dev,1);
}

module_init(work_queue_init);
module_exit(work_queue_exit);

MODULE_LICENSE("GPL");
MODULE_AUTHOR("lzy");
MODULE_DESCRIPTION("Simple Linux device driver (work_queue)");
MODULE_VERSION("1.6");

加载驱动后触发:

 echo 1 > /sys/kernel/myworkqueuesysfs/etx_value
echo 1 > /dev/workqueuedev
image-20240408220046173
image-20240408220046173
image-20240408220049374
image-20240408220049374

可以观察到使用top的时候,看到整体CPU使用率时25%左右,因为我的是4核CPU,htop可以观察到CPU1已经达到了100%的使用率。

五、自定义的工作队列

一般来说,使用自带的工作队列就能满足使用要求。但是我们可以为自己的工作创建一个独有的工作队列。工作队列使用**workqueue_struct** 表示。

创建工作队列

struct workqueue_struct *create_workqueue(name)
struct workqueue_struct *create_singlethread_workqueue(name)

如果错误,返回NULL

销毁工作队列

void destroy_workqueue( struct workqueue_struct * );

调度work_struct 到自定义workqueue

初始化好我们自己的工作队列后,就可以将work调度到工作队列上面,这一点和使用自带工作队列是没什么区别。

int queue_work( struct workqueue_struct *wq, struct work_struct *work );
int queue_work_on( int cpu, struct workqueue_struct *wq, struct work_struct *work );

queue_work会先提交到当前CPU,如果这个CPU出现故障,则会转移给别的CPU

queue_work_on直接指定在哪个CPU上面执行work

返回值false=0表示work已经在工作队列上面。

六、自定义工作队列demo

#include 
#include 
#include 
#include 
#include 
#include 

#define CUSTOM_WORKQUEUE
struct my_work_info{
 dev_t dev;
 struct cdev chrdev;
 struct class *work_class;
 struct device *device;
 struct kobject *sysfs_obj;
 struct workqueue_struct *own_workqueue;
};
void mywork_fn(struct work_struct *work);

struct my_work_info *g_data;
volatile int etx_value = 0;
static DECLARE_WORK(mywork,mywork_fn);

void mywork_fn(struct work_struct *work)
{
 int  i = 5;
 unsigned long long cont;
 while(i--)
 {
  int i, j;
  if(work_busy(work))
   printk(KERN_INFO "busy\n");
     for (i = 0; i for (j = 0; j "cont(%d)\n",cont);
         }
     }
  
 }
 queue_work( g_data->own_workqueue, &mywork );
 
}

ssize_t work_queue_dev_read(struct file *filep, char __user *ubuf, size_t len, loff_t *offset)
{
 printk(KERN_INFO "Read function\n");
 return 0;
}

ssize_t work_queue_dev_write (struct file *filep, const char __user *ubuf, size_t len, loff_t *offset)
{
 int ret = -1;
  printk(KERN_INFO "Write Function\n");
  if(etx_value)
#if defined(CUSTOM_WORKQUEUE)
  printk(KERN_INFO "own workqueue\n");
  ret = queue_work( g_data->own_workqueue, &mywork );
  if(ret){
   printk("queue work err\n");
   return ret;
  }
#else
   schedule_work_on(1,&mywork);
#endif
  else
   cancel_work_sync(&mywork);
  return len;
 

}

int work_queue_dev_open (struct inode *inode, struct file *filep)
{
 printk(KERN_INFO "Device File Opened...!!!\n");
 return 0;

}

int work_queue_dev_close (struct inode *inode, struct file *filep)
{
 printk(KERN_INFO "Device File Closed...!!!\n");
 return 0;
}

static struct file_operations work_queue_fops = {
 .open = work_queue_dev_open,
 .release = work_queue_dev_close,
 .read = work_queue_dev_read,
 .write = work_queue_dev_write,
};

static ssize_t sysfs_show(struct kobject *kobj, 
                struct kobj_attribute *attr, char *buf)
{
        printk(KERN_INFO "Sysfs - Read!!!\n");
        return sprintf(buf, "%d", etx_value);
}
    
/*
** This function will be called when we write the sysfsfs file
*/
static ssize_t sysfs_store(struct kobject *kobj, 
                struct kobj_attribute *attr,const char *buf, size_t count)
{
        printk(KERN_INFO "Sysfs - Write!!!\n");
        sscanf(buf,"%d",&etx_value);
        return count;
}

 
struct kobj_attribute etx_attr = __ATTR(etx_value, 0660, sysfs_show, sysfs_store);

static int __init work_queue_init(void)
{
 struct my_work_info*wk_info;
 int ret = 0;
 wk_info = kmalloc(sizeof(struct my_work_info), GFP_KERNEL);
 
 ret = alloc_chrdev_region(&wk_info->dev, 0, 1, "myworkqueue");
 if(ret "Cannot allocate major number\n");
   return ret;
 }

 printk(KERN_INFO "Major = %d Minor = %d \n",MAJOR(wk_info->dev),
    MINOR(wk_info->dev));

 cdev_init(&wk_info->chrdev, &work_queue_fops);

 ret = cdev_add(&wk_info->chrdev, wk_info->dev, 1);
 if(ret){
  pr_err(KERN_INFO "Cannot add the device to the system\n");
  goto err_add;
 }
 
 wk_info->work_class = class_create(THIS_MODULE, "myworkqueue");
 if(IS_ERR(wk_info->work_class)){
  printk(KERN_INFO "Cannot create the struct class\n");
  goto err_add; 
 }

 wk_info->device = device_create(wk_info->work_class, NULL, wk_info->dev, 
     NULL, "workqueuedev");
 if(IS_ERR(wk_info->device)){
  printk(KERN_INFO "Cannot create the struct device\n");
  goto err_device; 
 }

 wk_info->sysfs_obj = kobject_create_and_add("myworkqueuesysfs", 
             kernel_kobj);
 
 if(!wk_info->sysfs_obj){
  printk(KERN_INFO "Cannot create the sysfs dir\n");
  goto err_device; 
 }

 if(sysfs_create_file(wk_info->sysfs_obj ,&etx_attr.attr)){
     printk(KERN_INFO"Cannot create sysfs file......\n");
        goto err_sysfs;
    }
#if defined(CUSTOM_WORKQUEUE)
 wk_info->own_workqueue = create_workqueue("own_lzy");
 if(!wk_info->own_workqueue){
  printk(KERN_INFO"Cannot create workqueue......\n");
  goto err_sysfs;
 }
#endif 

 g_data = wk_info;
 
 return 0;

err_sysfs:
 kobject_put(wk_info->sysfs_obj);
 sysfs_remove_file(wk_info->sysfs_obj,&etx_attr.attr);
 
err_device:
 class_destroy(wk_info->work_class);
 
err_add:
 unregister_chrdev_region(wk_info->dev,1);
 cdev_del(&wk_info->chrdev);
 return ret;
}

static void __exit work_queue_exit(void)
{
 struct my_work_info*wk_info = g_data;
 
 cancel_work(&mywork);
 destroy_workqueue(wk_info->own_workqueue);
 kobject_put(wk_info->sysfs_obj); 
    sysfs_remove_file(wk_info->sysfs_obj, &etx_attr.attr);
 device_destroy(wk_info->work_class, wk_info->dev);

 class_destroy(wk_info->work_class);

 cdev_del(&wk_info->chrdev);

 unregister_chrdev_region(wk_info->dev,1);
}

module_init(work_queue_init);
module_exit(work_queue_exit);

MODULE_LICENSE("GPL");
MODULE_AUTHOR("lzy");
MODULE_DESCRIPTION("Simple Linux device driver (work_queue)");
MODULE_VERSION("1.6");

触发同上

七、初始化代码过程

kernel在启动过程中,会初始化工作队列这个基础设施,通过两个函数:workqueue_init_early和workqueue_init,分别是第一和第二阶段的工作队列初始化。

/*init/main.c*/

start_kernel()
 ...
 workqueue_init_early();
 ...
 arch_call_rest_init();
  rest_init();
   kernel_init();
    kernel_init_freeable();
     workqueue_init();
     

workqueue_init_early

这是一个linux上电过程初始化整个workqueue的两个步骤中的第一个。

1.在最初,系统对每个CPU都定义了2个worker_pool,这个是per-cpu的,per-cpu的意思就是每个CPU都会有一份,假如芯片有4个CPU,那就初始化了总共8个worker_pool。可以看到下面定义了的类型是struct worker_pool []的,名字是cpu_worker_pools的变量,每个CPU都会有这样的数组,NR_STD_WORKER_POOLS是2,当前内核决定的。

static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
image-20240408220057196
image-20240408220057196

对每个CPU的两个work_pool初始化,赋予不同的nice值,分别是0和-20。

2.使用alloc_workqueue()创建系统自带的多个工作队列。system_wq 、system_highpri_wq 、system_long_wq 、system_unbound_wq 、system_freezable_wq 、system_power_efficient_wq 、system_freezable_power_efficient_wq 都是全局的工作队列,供各个驱动模块使用,各有不同的特点。

alloc_workqueue(const char *fmt, unsigned int flags, int max_active, ...);
system_wq = alloc_workqueue("events", 0, 0);
system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0);
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
         WQ_UNBOUND_MAX_ACTIVE);
system_freezable_wq = alloc_workqueue("events_freezable",
           WQ_FREEZABLE, 0);
system_power_efficient_wq = alloc_workqueue("events_power_efficient",
           WQ_POWER_EFFICIENT, 0);
system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
           WQ_FREEZABLE | WQ_POWER_EFFICIENT,
           0);

以上可以看到创建不同的系统工作队列的时候用到了不同的flag,如WQ_HIGHPRI。

下面举例子alloc_workqueue创建”events“工作队列。系统在全局维护了一个工作队列的链表,所有创建的工作队列都要加入到这里被组织管理

/*kernel/workqueue.c*/
static LIST_HEAD(workqueues);  /* PR: list of all workqueues */
image-20240408220101361
image-20240408220101361

以上是创建完workqueue后的样子,仅仅列出了“events”工作队列。可以看到workqueue被全局链表workqueues组织起来。

这里出现了一个新的对象pool_workqueue,是worker_pool和workqueue的中介。“events”没有标志WQ_UNBOUND,因此它的pool_workqueue中介都是per-cpu的,又因为没有WQ_HIGHPRI标志,所有中介给workqueue找的worker_pool都是normal的而非-20 nice的。这些结论很重要,这就已经决定了这三者的关系不会再改变。

最终这个阶段工作队列就已经完成了系统工作队列的初始化,但是我们还没看到帮我们执行work的worker线程。

workqueue_init

遍历workqueues链表,凡是带有WQ_MEM_RECLAIM标志的工作队列,都会为其分配一个rescuer(救援)线程,线程名字就是工作队列的名字,但是系统自带的工作队列全都没有rescuer线程。

对在线的CPU ,per-cpu的worker_pool都创建woker,这里出现新的对象worker。worker表示一个执行work的线程。创建的worker是worker_pool的一部分。这里只对在线的CPU里面worker_pool创建worker,根据worker_pool的nice值,决定worker的名称。

if (pool->cpu >= 0)
  snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
    pool->attrs->nice "H" : "");
 else
  snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);

就是在系统上看到的:

image-20240408220104919
image-20240408220104919
image-20240408220108824
image-20240408220108824

到这里为止,整个工作队列系统呈现这样子。创建的worker会进入idle,worker.nr_idle表示现在有多少个worker在idle。现在我们所有的kwoker都处于idle状态,还没有work提交,也没有work能执行。

通过线程池(worker_pool)绑定的cpu信息(struct worker_pool的cpu成员)可以知道该pool是per-CPU还是unbound,对于per-CPU线程池,pool->cpu是大于等于0的。对于对于per-CPU线程池,其worker线程的名字是kworker/cpuworker id,如果是high priority的,后面还跟着一个H字符。对于unbound线程池,其worker线程的名字是kworker/u pool idworker id。

上面还可以看到又的worker带有u开头的,那表示ubound worker,表示不会固定在某个CPU执行?

  • [ ] 分析unbound wq

八、提交work的过程

一个work带有一个执行函数,驱动们提交work是提交到workqueue,但是最后却是挂载到找到的work_pool的链表上面。

当驱动们使用schedule_work()提交一个work到 “events”工作队列,即系统自带的system_wq。

/*include/linux/workqueue.h*/
static inline bool schedule_work(struct work_struct *work)
{
 return queue_work(system_wq, work);
}

static inline bool queue_work(struct workqueue_struct *wq,
         struct work_struct *work)
{
 return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}

bool queue_work_on(int cpu, struct workqueue_struct *wq,
     struct work_struct *work)

可以看到,所有面向驱动们的接口都是内联函数,最终使用的是queue_work_on()提交一个work,当驱动们没有指定CPU的时候,这个work就是WORK_CPU_UNBOUND的,定义是CPU的个数。

work最终需要找到的是worker_pool,因此要先找到pool_workqueue也就是中介。以下是执行逻辑

/*kernel/workqueue.c*/

if (wq->flags & WQ_UNBOUND) {
  if (req_cpu == WORK_CPU_UNBOUND)
   cpu = wq_select_unbound_cpu(raw_smp_processor_id());
  pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
 } else {
  if (req_cpu == WORK_CPU_UNBOUND)
   cpu = raw_smp_processor_id();
  pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
 }

判断你的工作队列是否是UNBOUND的,因为system_wq是在创建的时候是没有指定flag的,走第二个分支,再判断有没有给这个work指定CPU,没有指定的话,会选择当前提交work时候的CPU,也就是本地CPU。再根据这个CPU找到pool_workqueue,找到了中介就找到了worker_pool,就是因为是per-cpu的。整个逻辑就是用户知道提交到的工作队列,就找到了其下的所有pool_workqueue,再根据CPU确定是哪个pool_workqueue,进而就知道了work_pool。

image-20240408220113515
image-20240408220113515

当找到了worker_pool,就会使用这个线程池的线程worker来执行我们的work,我们的work可能不止提交一次,那么如果再次提交work就会倾向于使用先前的worker_pool和pool_workqueue。下面是处理的逻辑

/*kernel/workqueue.c*/
/*
  * If @work was previously on a different pool, it might still be
  * running there, in which case the work needs to be queued on that
  * pool to guarantee non-reentrancy.
  */
 last_pool = get_work_pool(work);
 if (last_pool && last_pool != pwq->pool) {
  struct worker *worker;

  raw_spin_lock(&last_pool->lock);

  worker = find_worker_executing_work(last_pool, work);

  if (worker && worker->current_pwq->wq == wq) {
   pwq = worker->current_pwq;
  }

最后work会加入到worker_pool的worklist链表,这时候work处于pengding状态,处于pending状态的work不会重复挂入workqueue,如果这时候所有worker都处于idle状态的话,那么就需要唤醒一个worker,线程池worker_pool.nr_running表示所有worker都在idle啦,没人干活啦。

/*kernel/workqueue.c*/

worklist = &pwq->pool->worklist;
static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
   struct list_head *head, unsigned int extra_flags)
{
 struct worker_pool *pool = pwq->pool;
...
 list_add_tail(&work->entry, head);
...

 if (__need_more_worker(pool))
  wake_up_worker(pool);
}
image-20240408220117923
image-20240408220117923

以上就是提交work的流程,最终就是把work挂接到线程池worker_pool的worklist链表,同时至少这个worker_pool保证有一个woker处于running状态。

九、执行work的过程

work已经存在于线程池的worklist链表了,现在就等待一个线程把他们取出来一个个执行。先前在初始化的时候已经保证了per-cpu的woker_pool至少有了一个idle woker。

/*kernel/workqueue.c*/

worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
           "kworker/%s", id_buf);

所有的worker都使用一个内核线程函数worker_thread(),主要执行逻辑

/*kernel/workqueue.c*/

do {
  struct work_struct *work =
   list_first_entry(&pool->worklist,
      struct work_struct, entry);

  pool->watchdog_ts = jiffies;

  if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
   /* optimization path, not strictly necessary */
   process_one_work(worker, work);
   if (unlikely(!list_empty(&worker->scheduled)))
    process_scheduled_works(worker);
  } else {
   move_linked_works(work, &worker->scheduled, NULL);
   process_scheduled_works(worker);
  }
 } while (keep_working(pool));

从worker_pool的worklist链表头中摘下一个work,执行process_one_work(worker,work),直到所有的work被处理完

/*kernel/workqueue.c*/

worker->current_work = work;
worker->current_func = work->func;
worker->current_pwq = pwq;
work_data = *work_data_bits(work);
worker->current_color = get_work_color(work_data);
strscpy(worker->desc, pwq->wq->name, WORKER_DESC_LEN);
list_del_init(&work->entry);
...
worker->current_func(work);
...

看到这里,应该就知道为什么工作队列work的函数可以睡眠了?因为worker(内核线程)本身就是可以睡眠的,就算这个worker睡眠了,第二个work也不会被阻塞,因为可以唤醒更多的worker来接手工作。就好比一个有个老板派了个工作需要你新买的电脑才能工作,这个电脑明天才能到,你不需要死等它,你可以去睡一觉等快递到了,至于老板其他的活,完全可以交给你同事做啊。

image-20240408220123825
image-20240408220123825

原来如此:工作队列的队列体现在worker_pool的worklist链表总是被拿走第一个work啊,这样就符合先进先出的原则了。

十、实践分析

准备工作,为了在work的回调里面知道是哪个work_pool处理的,需要将get_work_pool_id()符号导出,供我们的demo驱动模块使用。将它变成以下形式,重新编译内核,即可随时在模块中使用

/*kernel/workqueue.c*/
int get_work_pool_id(struct work_struct *work)
{
 unsigned long data = atomic_long_read(&work->data);

 if (data & WORK_STRUCT_PWQ)
  return ((struct pool_workqueue *)
   (data & WORK_STRUCT_WQ_DATA_MASK))->pool->id;

 return data >> WORK_OFFQ_POOL_SHIFT;
}
EXPORT_SYMBOL(get_work_pool_id);

下面演示的demo运行在4核soc上面

1.观察找到的work_pool,观察worker创建条件

#include 
#include 
#include 
#include 
#include 
#include 
#include "workqueue_internal.h"

MODULE_LICENSE("Dual BSD/GPL");
extern int get_work_pool_id(struct work_struct *work);
struct work_struct work0, work1, work2, work3, work4, work5, work6, work7, work8;
struct workqueue_struct *test_wq, *ub_wq1, *ub_wq2, *ub_wq3, *ub_wq4;

#define WORK_FUN(n)      \
static void work_fun##n(struct work_struct *work)  \
{        \
 int num = n;      \
       \
 pr_info("[ pool id (%d) ]I am in CPU%d in PID%d[%s] -- %d\n",  \
   get_work_pool_id(work),raw_smp_processor_id(),   \
   current->pid, current->comm, num); \
 msleep(1000);      \
}
// msleep(5000);      
/* note: msleep above only be used in case 3. in other cases, comment out */

WORK_FUN(0)
WORK_FUN(1)
WORK_FUN(2)
WORK_FUN(3)
WORK_FUN(4)
WORK_FUN(5)
WORK_FUN(6)
WORK_FUN(7)
WORK_FUN(8)

static int __init wq_init(void)
{

 INIT_WORK(&work0, work_fun0);
 INIT_WORK(&work1, work_fun1);
 INIT_WORK(&work2, work_fun2);
 INIT_WORK(&work3, work_fun3);
 INIT_WORK(&work4, work_fun4);
 INIT_WORK(&work5, work_fun5);
 INIT_WORK(&work6, work_fun6);
 INIT_WORK(&work7, work_fun7);
 

 pr_info("before queue_work: I am in CPU%d in PID%d[%s]\n",
  raw_smp_processor_id(), current->pid, current->comm);

 queue_work_on(0, system_wq, &work1);
 
 queue_work_on(0, system_highpri_wq, &work2);
 
 queue_work_on(1, system_wq, &work3);
 
 queue_work_on(1, system_highpri_wq, &work4);

 queue_work_on(2, system_wq, &work5);
 
 queue_work_on(2, system_highpri_wq, &work6);
 
 queue_work_on(3, system_wq, &work7);

 queue_work_on(3, system_highpri_wq, &work8);
 
    return 0;
}

static void __exit wq_exit(void)
{
 /* destory work queue */

 destroy_workqueue(test_wq);
 cancel_work_sync(&work1);
 cancel_work_sync(&work2);
 cancel_work_sync(&work3);
 cancel_work_sync(&work4);
 cancel_work_sync(&work5);
 cancel_work_sync(&work6);
 cancel_work_sync(&work7);
}

module_init(wq_init);
module_exit(wq_exit);
MODULE_AUTHOR("Sherlock");
MODULE_DESCRIPTION("The driver is for testing wq");

这个demo定义了9个work和对应的work function,work function里面打印work所属的work_pool id、当前执行cpu和worker的名字。使用queue_work_on函数精准控制在哪个cpu上面运行,和工作队列的属性,观察work_pool id的变化。

image-20240408220129107
image-20240408220129107

由驱动加载的log可以分析出:

1.提交到system_wq,运行在CPU0的work所属的work_pool id为0,提交到system_highpri_wq,运行在CPU0的work所属的work_pool id为1;后面一次类推。这就证明了没有WQ_UNBOUND的工作队列,找到的work_pool是per-cpu的。

2.带有WQ_HIGHPRI标志的工作队列,找到的work_pool是high nice的

3.如果work funciton被阻塞了,也就是kworker被阻塞了,那么系统会自动创建更多的kworker来执行剩下的work。对比方法是把上面的msleep变成mdelay死等函数,如果一个work死等了,系统感知不到需要创建更多的worker,就会将全部的work都使用一个wokrer来处理,严重降低系统的响应性。

image-20240408220132923
image-20240408220132923

2.观察create_singlethread_workqueue的行为

根据定义:create_singlethread_workqueue定义了这个工作队列是WQ_UNBOUND 、__WQ_ORDERED 、WQ_MEM_RECLAIM,最终使用alloc_ordered_workqueue来创建工作队列

**#define create_singlethread_workqueue(name)    \
 alloc_ordered_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM, name)

#define alloc_ordered_workqueue(fmt, flags, args...)   \
 alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED |  \
   __WQ_ORDERED_EXPLICIT | (flags), 1, ##args)**

demo:

#include 
#include 
#include 
#include 
#include 
#include 
#include "workqueue_internal.h"

MODULE_LICENSE("Dual BSD/GPL");
extern int get_work_pool_id(struct work_struct *work);
struct work_struct work0, work1, work2, work3, work4, work5, work6, work7, work8;
struct workqueue_struct *test_wq, *ub_wq1, *ub_wq2, *ub_wq3, *ub_wq4;

#define WORK_FUN(n)      \
static void work_fun##n(struct work_struct *work)  \
{        \
 int num = n;      \
       \
 pr_info("[ pool id (%d) ]I am in CPU%d in PID%d[%s] -- %d\n",  \
   get_work_pool_id(work),raw_smp_processor_id(),   \
   current->pid, current->comm, num); \
 msleep(1000);      \
}
// msleep(5000);      
/* note: msleep above only be used in case 3. in other cases, comment out */

WORK_FUN(0)
WORK_FUN(1)
WORK_FUN(2)
WORK_FUN(3)
WORK_FUN(4)
WORK_FUN(5)
WORK_FUN(6)
WORK_FUN(7)
WORK_FUN(8)

static int __init wq_init(void)
{
 /*
  * 1. unbound wq, queue_work: how to define on which cpu to run?
  *    can we bind which cpu to run by queue_work_on? 
  *
  *    seems queue_work_on can not put a work on a cpu precisely!
  *    seems queue_work_on can put a work to a cpu in a numa node.
  *
  *    seems queue_work put a work to current numa node's cpu.
  *
  * 2. per cpu wq: how to queue work, which API to use? 
  *
  *    schedule_work can queue work to current cpu's normal per cpu:
         *
  *    taskset -c 7 insmod wq_test.ko
  *    [ 6690.409802] before queue_work: I am in CPU7 in PID1735[insmod]
  *    [ 6691.444386] I am in CPU7 in PID90[kworker/7:1]
  *    [ 6692.468339] I am in CPU7 in PID90[kworker/7:1]
  *    [ 6693.492337] I am in CPU7 in PID90[kworker/7:1]
  *
  *    how to put work to cpu's high pri per cpu??
  *    can we do this by: queue_work_on(cpu, system_highpri_wq, worker)?
  *    queue_work_on(cpumask_first(&hdev->affinity_mask), system_wq, worker);
  *
  *    it depends on if sleep in work function, if no sleep there, multiple
  *    queue_work_on system_wq will be in same worker, if sleep, multiple
  *    kworkers will be created in same CPU.
  *
  * 3. unbound wq, in which case new kthread will be added to system?
  *    e.g. in current mainline zip driver, one qp allocate one unbound
  *         queue, seems one unbound queue will add one kthread?
  *   
  *    one new unbound queue will not surely add on kthread, one new
  *    unbound queue will firstly find an already same work pool, if
  *    we have a same work pool(param: nice, cpumask, no numa), use it.
  *    if we do not have, create a new work pool, which will surely add
  *    a new kthread.
  *
  *   if using an old work pool, it may add new kthread, only all workers
  *   in this pool are blocked, it will add a new kthread.
  *
  * 4. can we alloc a bound work queue? what is difference with system
  *    work queue.
  *
  * 5. add WQ_SYSFS, it can register related wq to sysfs.
  */
 test_wq = create_singlethread_workqueue("test_wq");
 //test_wq = alloc_workqueue("test_wq", WQ_UNBOUND | WQ_SYSFS, 0);
 if (!test_wq)
  return -1;

 INIT_WORK(&work0, work_fun0);
 INIT_WORK(&work1, work_fun1);
 INIT_WORK(&work2, work_fun2);
 INIT_WORK(&work3, work_fun3);
 INIT_WORK(&work4, work_fun4);
 INIT_WORK(&work5, work_fun5);
 INIT_WORK(&work6, work_fun6);
 INIT_WORK(&work7, work_fun7);
 INIT_WORK(&work8, work_fun8);

 pr_info("before queue_work: I am in CPU%d in PID%d[%s]\n",
  raw_smp_processor_id(), current->pid, current->comm);

 /* case 1 */
 /* can not run in cpu 5 everytime, but always run in cpu of same numa node */
// queue_work_on(5, test_wq, &work1);

 /* case 2 */
// schedule_work(&work1);

 /* case 3: let's put many works to one wq to see if new worker created */
/*
 queue_work(test_wq, &work0);
 queue_work(test_wq, &work1);
 queue_work(test_wq, &work2);
 queue_work(test_wq, &work3);
 queue_work(test_wq, &work4);
 queue_work(test_wq, &work5);
 queue_work(test_wq, &work6);
 queue_work(test_wq, &work7);
 queue_work(test_wq, &work8);
*/
 /* case 4 */
/*
 schedule_work(&work0);
 schedule_work(&work1);
 schedule_work(&work2);
 schedule_work(&work3);
 schedule_work(&work4);
 schedule_work(&work5);
 schedule_work(&work6);
 schedule_work(&work7);
 schedule_work(&work8);
*/
 /* case 5 */
 /* this case shows that it can put works in cpu5 always */
/*
 schedule_work_on(5, &work0);
 schedule_work_on(5, &work1);
 schedule_work_on(5, &work2);
 schedule_work_on(5, &work3);
 schedule_work_on(5, &work4);
 schedule_work_on(5, &work5);
 schedule_work_on(5, &work6);
 schedule_work_on(5, &work7);
 schedule_work_on(5, &work8);
*/
 /* case 6 */
 /*
  * will put below different worker in same unbound pool, however, may
  * put works in different worker event without "sleep", I do not know
  * why??
  *
  * however, if we queue_work_on same work to same cpu and wq, it will
  * alway on one worker.(note: add sleep so that all work1 can be put
  * into wq)
  */
#if 0
 ub_wq1 = alloc_workqueue("ub_wq1", WQ_UNBOUND | WQ_HIGHPRI | WQ_SYSFS, 0);
 ub_wq2 = alloc_workqueue("ub_wq2", WQ_UNBOUND | WQ_HIGHPRI | WQ_SYSFS, 0);
 ub_wq3 = alloc_workqueue("ub_wq3", WQ_UNBOUND | WQ_HIGHPRI | WQ_SYSFS, 0);
 ub_wq4 = alloc_workqueue("ub_wq4", WQ_UNBOUND | WQ_HIGHPRI | WQ_SYSFS, 0);
 if (!ub_wq1 || !ub_wq2 || !ub_wq3 || !ub_wq4)
  return -2;

 /* how to put these work in one same kworker */
 queue_work_on(0, ub_wq1, &work1);
// msleep(10);
 queue_work_on(0, ub_wq2, &work2);
// msleep(10);
 queue_work_on(0, ub_wq2, &work3);
// msleep(10);
 queue_work_on(0, ub_wq3, &work4);
#endif
 /* case 7 */
 queue_work(test_wq, &work1);
 
 queue_work( test_wq, &work2);
 
 queue_work( test_wq, &work3);
 
 queue_work( test_wq, &work4);

 queue_work( test_wq, &work5);
 
 queue_work( test_wq, &work6);
 
 queue_work( test_wq, &work7);

 queue_work( test_wq, &work8);
 
 

    return 0;
}

static void __exit wq_exit(void)
{
 /* destory work queue */

 destroy_workqueue(test_wq);
 cancel_work_sync(&work1);
 cancel_work_sync(&work2);
 cancel_work_sync(&work3);
 cancel_work_sync(&work4);
 cancel_work_sync(&work5);
 cancel_work_sync(&work6);
 cancel_work_sync(&work7);
 cancel_work_sync(&work8);
/*
 destroy_workqueue(ub_wq1);
 destroy_workqueue(ub_wq2);
 destroy_workqueue(ub_wq3);
 destroy_workqueue(ub_wq4);
*/
}

module_init(wq_init);
module_exit(wq_exit);
MODULE_AUTHOR("Sherlock");
MODULE_DESCRIPTION("The driver is for testing wq");
image-20240408220138558
image-20240408220138558

从结果上看,有以下结论:

1.定义了WQ_UNBOUND 的工作队列找到的work_pool不再使用per-cpu的了,而是id为8的work_pool(核心为4的最大per-cpu work_pool 的id是7)。

2.定义了**__WQ_ORDERED 的工作队列,始终使用同一个worker来处理那么多的work,效果就是msleep编程了mdelay的效果,work是一个一个执行的。哪怕我在work function里面主动睡眠让出cpu,企图让系统给我创建更多的work来并行处理剩下的work,这一点和普通的工作队列不一样。**

2.定义了WQ_MEM_RECLAIM的工作队列,会产生一个救援线程

ps -e

419 root     [test_wq]

本笔记对于ubound的工作队列和work_pool是如何创建工作的暂未涉及,因为涉及哈希的数据结构。但是对于面向驱动的接口理解其背后原理已经不成问题了。

疑问:

经过上面分析发现,无论是自己创建的工作队列还是系统自带的工作队列,最后都是交给一个worker_pool来处理,那么他们之间到底有什么本质区别呢?希望读者能告诉我

以上就是良许教程网为各位朋友分享的Linu系统相关内容。想要了解更多Linux相关知识记得关注公众号“良许Linux”,或扫描下方二维码进行关注,更多干货等着你 !

137e00002230ad9f26e78-265x300
本文由 良许Linux教程网 发布,可自由转载、引用,但需署名作者且注明文章出处。如转载至微信公众号,请在文末添加作者公众号二维码。
良许

作者: 良许

良许,世界500强企业Linux开发工程师,公众号【良许Linux】的作者,全网拥有超30W粉丝。个人标签:创业者,CSDN学院讲师,副业达人,流量玩家,摄影爱好者。
上一篇
下一篇

发表评论

联系我们

联系我们

公众号:良许Linux

在线咨询: QQ交谈

邮箱: yychuyu@163.com

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

关注微博
返回顶部