linux 同步机制之complete

mac2024-10-28  11

在Linux内核中,completion是一种简单的同步机制,标志"things may proceed"。

要使用completion,必须在文件中包含<linux/completion.h>,同时创建一个类型为struct completion的变量。

1 结构与初始化

Completion在内核中的实现基于等待队列,completion结构很简单:

struct completion { unsigned int done;/*用于同步的原子量*/ wait_queue_head_t wait;/*等待事件队列*/ };

初始化分为静态初始化和动态初始化两种情况:

静态初始化:

#define COMPLETION_INITIALIZER(work) \ { 0, __WAIT_QUEUE_HEAD_INITIALIZER((work).wait) } #define DECLARE_COMPLETION(work) \ struct completion work = COMPLETION_INITIALIZER(work)

动态初始化:

static inline void init_completion(struct completion *x) { x->done = 0; init_waitqueue_head(&x->wait); }

可见,两种初始化都将用于同步的done原子量置位了0,后面我们会看到,该变量在wait相关函数中减一,在complete系列函数中加一。

2 实现

2.1 等待该Completion完成

void __sched wait_for_completion(struct completion *x) {     wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_UNINTERRUPTIBLE); } static long __sched wait_for_common(struct completion *x, long timeout, int state) { return __wait_for_common(x, schedule_timeout, timeout, state); } static inline long __sched __wait_for_common(struct completion *x, long (*action)(long), long timeout, int state) { might_sleep();//在这边如果需要调度,则先调度 spin_lock_irq(&x->wait.lock); timeout = do_wait_for_common(x, action, timeout, state); spin_unlock_irq(&x->wait.lock); return timeout; }

可以看到wait_for_completion的核心函数是do_wait_for_common,action是schedule_timeout,timeout为MAX_SCHEDULE_TIMEOUT,state是TASK_UNINTERRUPTIBLE

#define DECLARE_WAITQUEUE(name, tsk) \ wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk) static inline long __sched do_wait_for_common(struct completion *x, long (*action)(long), long timeout, int state) { if (!x->done) { DECLARE_WAITQUEUE(wait, current);//声明一个wait_queue_t的实例,并且把current赋值给其private成员 __add_wait_queue_tail_exclusive(&x->wait, &wait);//把该实例添加到completion的wait队列上 do { if (signal_pending_state(state, current)) { timeout = -ERESTARTSYS; break; } __set_current_state(state); spin_unlock_irq(&x->wait.lock); timeout = action(timeout); spin_lock_irq(&x->wait.lock); } while (!x->done && timeout);//如果事件没有完成,或者时间还没有到,则继续循环 __remove_wait_queue(&x->wait, &wait);//把wait实例添加到completion的wait if (!x->done) return timeout; } x->done--; return timeout ?: 1; }

先设置进程TASK_UNINTERRUPTIBLE状态,可以看到主要函数是在action,即schedule_timeout函数,由于我们这边设置了MAX_SCHEDULE_TIMEOUT,所以schedule_timeout的作用就是直接调用schedule进行调度,然后下次被唤醒,就把MAX_SCHEDULE_TIMEOUT返回,所以其实在do_wait_for_common函数中会一直在while中循环,直到done变为1为止。

2.2 通知Completion完成

void complete(struct completion *x) { unsigned long flags; spin_lock_irqsave(&x->wait.lock, flags); x->done++; //设置completion完成 __wake_up_common(&x->wait, TASK_NORMAL, 1, 0, NULL); spin_unlock_irqrestore(&x->wait.lock, flags); } static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int wake_flags, void *key) { wait_queue_t *curr, *next; list_for_each_entry_safe(curr, next, &q->task_list, task_list) { //遍历completion 的task_list链表 unsigned flags = curr->flags; if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) break; } }

curr->func对应哪个函数呢,上面DECLARE_WAITQUEUE(wait, current);的时候会设置该函数:

#define __WAITQUEUE_INITIALIZER(name, tsk) { \ .private = tsk, \ .func = default_wake_function, \ .task_list = { NULL, NULL } } #define DECLARE_WAITQUEUE(name, tsk) \ wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)

所以调用default_wake_function

int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags, void *key) { //private数据结构存储的是要唤醒的那个任务的任务描述符task_struct return try_to_wake_up(curr->private, mode, wake_flags); } static int try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags) { unsigned long flags; int cpu, success = 0; smp_wmb(); raw_spin_lock_irqsave(&p->pi_lock, flags); if (!(p->state & state)) goto out; success = 1; /* we're going to change ->state */ cpu = task_cpu(p); if (p->on_rq && ttwu_remote(p, wake_flags)) goto stat; #ifdef CONFIG_SMP /* * If the owning (remote) cpu is still in the middle of schedule() with * this task as prev, wait until its done referencing the task. */ while (p->on_cpu) cpu_relax(); /* * Pairs with the smp_wmb() in finish_lock_switch(). */ smp_rmb(); p->sched_contributes_to_load = !!task_contributes_to_load(p); p->state = TASK_WAKING; if (p->sched_class->task_waking) p->sched_class->task_waking(p); cpu = select_task_rq(p, SD_BALANCE_WAKE, wake_flags); if (task_cpu(p) != cpu) { wake_flags |= WF_MIGRATED; set_task_cpu(p, cpu); } #endif /* CONFIG_SMP */ ttwu_queue(p, cpu); stat: ttwu_stat(p, cpu, wake_flags); out: raw_spin_unlock_irqrestore(&p->pi_lock, flags); return success; }

try_to_wake_up函数中会把之前等待的进程状态设置为TASK_RUNNING,并移入调度队列,等待调度并唤醒等待线程。

3 运用

#include <linux/module.h> #include <linux/init.h> #include <linux/sched.h> #include <linux/kernel.h> #include <linux/fs.h> #include <linux/types.h> #include <linux/completion.h> MODULE_LICENSE("GPL"); static int complete_major=250; DECLARE_COMPLETION(comp); ssize_t complete_read(struct file *filp,char __user *buf,size_t count,loff_t *pos) { printk(KERN_ERR "process %i (%s) going to sleep\n",current->pid,current->comm); wait_for_completion(&comp); printk(KERN_ERR "awoken %i (%s)\n",current->pid,current->comm); return 0; } ssize_t complete_write(struct file *filp,const char __user *buf,size_t count,loff_t *pos) { printk(KERN_ERR "process %i (%s) awakening the readers...\n",current->pid,current->comm); complete(&comp); return count; } struct file_operations complete_fops={ .owner=THIS_MODULE, .read=complete_read, .write=complete_write, }; int complete_init(void) { int result; result=register_chrdev(complete_major,"complete",&complete_fops); if(result<0) return result; if(complete_major==0) complete_major=result; return 0; } void complete_cleanup(void) { unregister_chrdev(complete_major,"complete"); } module_init(complete_init); module_exit(complete_cleanup);

 

最新回复(0)