C与多线程

比赛好多地方都讲了这个概念,而且加上最近开发的运用,看起来是时候是学习一波了

C与多线程

首先这里补充一下多进程的相关知识:

为什么有线程

当我们fork一个进程的时候,操作系统会为我们分配一个独立的.text, .data,.stack和.heap(好像是这么叫的?),并且由于进程之间的地址空间独立,进程与进程之间的通信也得是用IPC(inner-process communication),于是相对应的共享部分资源的线程就诞生了。

线程的基本结构

虽然线程并不会拷贝完整的地址空间,但是线程本身也有线程上下文(thread context),包括

  • 唯一的整数线程ID
  • 栈、栈指针
  • 程序计数器
  • 通用目的寄存器
  • 条件码

线程的调度

鲜花曾由内核自动调度,并且通过内核的唯一整数ID来识别线程。同时线程运行在单一进程的上下文中,共享这个进程虚拟地址空间的整个内容 – 代码,数据,堆栈,共享库和打开的文件
线程存在主线程这种说法,也就是最初存在进程中的线程。线程不存在父子层次,而是按照对等线程的概念创建的,也就是说,主线程会维护一个线程池,对等线程之间可以互相杀死对方,并且可以互相共享数据。

线程相关代码

创建线程:

1
int pthread_create((pthread_t *tid, pthread_attr_t *attr, void *func(void*), void *arg)

创建线程

  • pthread_t:记录了线程的基本标识符。
  • attr:修改当前线程的属性。
  • func: 目标函数的地址。
  • arg: 需要传递的参数的指针,如果有多个参数的话,我们就将参数写在一个结构体内进行传输。

终止线程:
由于主线程的存在,当主线程中之后,所有的线程会隐式的终止。
或者,我们使用下面的函数显示终止一条线程。如果终止的是主线程,那么会等待其他线程执行完成再终止。

1
void pthread_exit(void *thread_return)

注意,如果我们在某条线程中调用exit,则整条进程都会关闭。

回收线程:

1
pthread_join(pthread_t tid, void** thread_return);

  • thread_return: 函数的返回值
    分离线程:
    任何时候,线程都是可分离或者结合的。可结合的线程可以被其他线程杀死回收资源,但是分离的线程不可以,它的资源只能在它终止后回收存储资源。
    1
    int pthread_detach(pthread_t tid);

使用场景:当我们想让我们的线程在完成整个执行过程再被销毁的时候,会使用分离线程

线程与竞争

线程由于共享的内容比较多,所以及其容易发生竞争的问题,竞争其实是一个比较隐晦的问题,我们这里主要讨论一下:
下列是一个例子(从CSAPP中抄来的。经测试,似乎和书上写的不一样,但是发现20000次执行的话, 也只是发生50次左右的错误)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>
#define N 2
volatile int cont = 0;
void* thread(void* arg){
int niters = *((int*)arg);
int i = 0;
for(i = 0; i < niters; i++){
cont++;
}
// return NULL;
}
int main(int argc, char* argv[]){
pthread_t id1, id2;
if(argc!=2){
printf("usage: thread <count>\n");
exit(0);
}
int niters = atoi(argv[1]);
int i,ret;
pthread_create(&id1, NULL, thread, (void*)&niters);
pthread_create(&id2, NULL, thread, (void*)&niters);
pthread_join(id1, NULL);
pthread_join(id2, NULL);
printf("finally cont = %d\n", cont);
return 0;
}

代码的大致逻辑就是让cnt自增1,由于我们在两条线程中执行了这段代码,答案应该就是niters * 2,但是实际上(书上的例子)是:

1
2
3
4
linux> ./badcnt 200000
finally cont = 144508
linux> ./badcnt 200000
finally cont = 144803

本机上测试了200000次之后,出现了大致50次错误:

这个出错的原因要追溯到汇编代码上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
movl (%rdi), %ecx |
movl $0, %edx |
cmpl %ecx, %edx | Hi
jge .L13 |
-------------------------
.L11:
movl cnt(%rip), %eax| Li:Load cnt
incl %eax | Ui:Update cnt
movl %eax, cnt(%rip)| Si:Store cnt
-------------------------
incl %edx |
cmpl %ecx, %edx | Ti
jl .L11 |
.L13

Hi和Ti分别是for语句的开头和结尾,不是重点,重点在于L,U,S三个过程。当程序并不是按照顺序去执行这句话的,于是当整个程序并发执行的时候,有时候会发生如下的过程:
H2, L2, H1, L1, U1, S1, T1, U2, S2, T2
重点就在L2和L1以及U1发生的时机,L2发生load cnt的时候,L1也开始进行了,此时进入了U1,更新了当前的eax,然而我们原来的认为中,L1和L2所在线程都应该会导致一次eax的增加,然而由于L2在L1之前将cnt读取出来,于是两者中其实相当于只有一次发生了数值的增加。
上述情况就是典型的竞争

竞争的抽象判断方法

对于竞争的判断,我们可以使用一种叫做进度图的方式去图形化这个进程的过程:

进度图通过将线程抽象成笛卡尔坐标系中的轴进行模拟。图中点(L1, S2)表示此时完成了L1和S2两个状态。合法的状态转换是只能往坐标系的正坐标方向前进。
我们从之前的状态观察可以知道,某一些状态的发生其实是无所谓的,比如Hi和Ti。而其他三个是事件必须要同时发生,否则的话就会发生错误。拥有共享变量的访问我们称为互斥的访问。Li, Ui, Si称为临界区。临界区的交集称为不安全区,在图片中表现如下:

当我们走到(T1, T2)的时候,程序就会结束。同时我们如果进过不安全区的边界,也不会触发竞争。所以相当于找一条从远点到(T1, T2)的路线就是一个安全的撞他。

竞争的解决办法–信号

为了保证临界区的状态能够在同一个时间段内进行,也就是解决同步不同执行线程问题,也就是信号量。信号量是具有非负整数值的全局变量,只能由两种操作处理

  • P(s): s非零的时候,会将s-1,并且立刻返回,将控制返回给调用者。如果s为零,则一直等待,直到s不等于0
  • V(s): s将s加1, 如果有P正在等待s变成非零, 那么V就会重启其中这些线程中的其中一个。(注意,这个V是不能预测V操作要重启的线程)
    这两个操作是元操作,也就是说这两个操作的中间是不能中断的。

信号量提供了一种思想来处理这种情况,基本思想就是将每个共享变量和一个信号量s(初始量为1)联系起来,然后用P(s)和V(s)操作将相应的临界区包围起来。以提供互斥为目的的二次元信号量叫做互斥量(mutex)。此时P叫做加锁,V叫做解锁。互斥锁加了锁但是还没有解锁的县城称为占用这个互斥锁。哟组可用资源的计数器的信号量叫做计数信号量

对于之前的例子,我们能够进行如下改正:

1
2
3
4
5
6
7
8
9
10
11
12
volatile int cnt = 0;
sem_t mutex;
...
// 然后在主历程中将mutex初始化为1
sem_init(&mutex, 0, 1);
...
// 最后将线程用例中将这个操作用P和V包括
for(i = 0; i < niters; i++){
P(&mutex);// C语言叫做sem_wait
cont++;
V(&mutex);// C语言叫做sem_post
}

线程里面的经典问题

1. 生产者 – 消费者问题


其实是一个模拟的过程

  • 生产者反复产生项目(item)塞入缓冲区
  • 消费者不断从缓冲去取出项目并且使用
    由于两个过程都涉及更新共享变量,所以我们必须保证这两个过程是互斥的。同时,我们还需要调度对缓冲区的访问,比如缓冲满了的时候,生产者必须一直等待直到一个槽位变成可用。缓冲空的时候,消费者必须一直等待缓冲变满。

实际中比如多媒体系统的编码视频与解码,生产者是视频编码, 消费者是解码并且显示。缓冲区则是为了减小视频的抖动时数据的差异而创建的。又或者图形用户接口,生产者是检测鼠标和键盘事件,消费者是处理事件并且显示,而缓冲就是事件优先队列

为了处理上述的问题,首先要考虑两个地方的阻塞:

  • 关于生产者/消费者在操作缓存的时候,另一方不能进行操作
  • 当缓存为满/空的时候,生产者/消费者不能操作

于是可以写出如下结构体:

1
2
3
4
5
6
7
8
9
typedef struct{
int *buf; //缓存区
int n;// 缓存区的大小
int front;// 第一个item,消费者使用
int rear;// 最后一个item,生产者使用
sem_t mutex;//用于限制缓存读写的信号量
sem_t slots;//用于限制生产者可写入大小的信号量
sem_t items;//用于限制消费者可取出大小的信号量
}sbuf_t;

每次开始这个模型处理之前,我们进行初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
/*
* 初始化各种信号量
* sp表示传入的信号量控制变量
* n是缓存区的大小,同时也是此时可以先写入的最大信号量
*/
void sbuf_init(sbuf_t *sp, int n){
sp->n = n;
sp ->rear = sp->front = 0;
sp->buf = (int*)malloc(sizeof(int)*n);
sem_init(&sp->mutex, 1); // 读写每次只允许一次
sem_init(&sp->slots, n); // 允许发生n次写入
sem_init(&sp->items, 0); // 未发生过写入,于是此时不允许取出
}

通过初始化,我们能够确定下列的表格:
信号 |初始化值 | 变化时机
————|——————-|——————————
mutex |1 |写入/取出发生前置-1,发生后变为+1
slots |n |写入一次-1,取出一次+1
items |0 |取出一次-1,写入一次+1

于是我们可以将消费者参与生产者参与两个过程抽象成如下函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 生产者插入数据
void sbuf_insert(sbuf_t *sp, int item){
// 发生写入,减少slots
P(&sp->slots);
// 锁上缓存
P(&sp->mutex);
// 开始写入
sp->buf[(++sp->rear)%(sp->n)] = item;
// 完成写入,打开锁
V(&sp->mutex);
// 增加一次读出的机会
V(&sp->items);
}
// 消费者取出数据
int sbuf_remove(sbuf_t *sp){
// 发生取出,减少items
P(&sp->items);
// 锁上缓存
P(&sp->mutex);
// 开始读取
int item = sp->buf[(++sp->front)%(sp->n)];
// 完成读取,打开锁
V(&sp->mutex);
// 增加一次写入的机会
V(&sp->slots);
}

讨论:这个锁mutex是不是真的需要?

如果n = 1的时候,这些锁是不是真的需要呢?
我们来看,当n = 1的时候,一旦写入,就相当于缓冲区满了,此时生产者由于sp->slots限制无法再次往缓冲区里面写入数据,因此不需要mutex来限制这个过程。

读者 - 写者问题

这其实是一个互斥问题的概括。比如说同时对一个数据库进行读写。显然,读取的时候可以多个角色一起读,但是写的时候只能一个个写,并且每一个写者都要拥有独占的访问

对于这类问题,其实可以分成两个问题来考虑

  • 读者优先,也就是说当读者前来访问的时候,除非此时正在写数据,否则就让读者开始读。也就是说,如果此时有写者和读者同时等待这个资源的使用,那么资源释放后第一个交给读者
  • 写者优先,也就是说,当可写资源准备好后,立刻交给写者去写。也即是有写者和读者同时等待这个资源的使用,那么资源释放后第一个交给写者
    以下给出第一种问题的解决办法(注意这种写法也有一点问题,可能会导致饥饿(starvation),也就是线程无限期地阻塞,无法进展。)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    int readcnt;
    sem_t mutex,w;// both initialize 1
    void read(void){
    while(1){
    // 首先例牌禁止其他读写
    P(&mutex);
    // 模拟读者到来
    readcnt++;
    // 如果是第一位读者,那么此时锁上写入的过程
    if (readcnt == 1)
    P(&w);
    // 读取完成后解锁
    V(&mutex);
    // **** 进入临界区 ****
    // 离开临界区
    P(&mutex);
    // 模拟读者离开
    readcnt --;
    // 如果此时所有读者都离开了,那么激活写入
    if (readcnt == 0)
    V(&w);
    V(&mutex);
    }
    }
    void write(void){
    while(1){
    // 等待资源,有资源后才能写
    P(&w);
    // **** 进入临界区 ****
    // 离开临界区,完成写入后解除写入限制
    V(&w);
    }
    }

6.19 更新

写作业在设计多线程的时候有点迷糊,这里记录一下思考过程:

我们知道,多线程设计存在两种方式 – 条件变量和信号量两种,现在我们根据一个例题来思考两者的区别;

题目

  • 系统中有3个线程:生产者、计算者、消费者
  • 系统中有2个容量为4的缓冲区:buffer1、buffer2
  • 生产者生产’a’、’b’、’c’、‘d’、’e’、’f’、’g’、’h’八个字符,放入到buffer1
  • 计算者从buffer1取出字符,将小写字符转换为大写字符,放入到buffer2
  • 消费者从buffer2取出字符,将其打印到屏幕上

基本思路

存在缓冲,所以我们首先要保证下列目标:

  • 生产者不能在缓冲区满了的时候继续生产。
  • 计算者不能计算还未生产的数据,在缓冲区为空的时候也不能计算。
  • 消费者不能输出还未计算的数据,也不能在空缓冲区的时候进行消费。

同时,我们能够得下列的关系图:

这里注意到,有一些条件本身是有包含关系的:

  • 消费者在满足【不输出未计算的数据】时,必然满足【不输出空缓存区】
  • 计算者在满足【不能计算未生产的数据】的时候,必然满足【在缓冲区为空的时候不计算】

因此我们可以确定我们需要满足的互相依赖关系为:

为了完成我们的目的,显然是要通过循环来实现的。但是我们不能保证这三个循环哪个顺序执行,所以要有一个限制条件来限制执行的过程:

(正确流程)

(错误流程)
如何来限制循环呢?我们这里采取先前【读者 – 写者】的模型来实现。我们使用一个循环来劫持程序流,同时保证

  • 在三者交互中,前者未完成任务时,这个程序流在循环中陷入信号等待。
  • 当前者完成任务时,激活其中一个线程,并且让其访问循环中的条件时不能成立,从而离开当前循环,而其他线程继续等待
  • 完成一个任务后,能够向下一个任务发送相应的信号,从而让下一个任务的某一条线程能够激活。

为了模拟这个缓冲区,我们使用两个数组来描述:

1
char buffer1[CAPACITY], buffer2[CAPACITY];

然后,我们要模拟【生产者生产】,【计算者计算】和【消费者消费】三个过程,于是这里的使用in指向生产者的当前数据cal指向计算者需要操作的数据out指向消费者需要输出的数据.
三个过程抽象成逻辑分别如下:

保证生产者在缓冲区写满的时候不再写入数据,也就是说,in指针+1要永远小于cal

保证计算者完成计算前,消费者不会往前继续取出数值,也就是cal+1要永远小于out
结合一下两者的要求,我们可以转换成下列的条件:

  • 当out和cal相等时候,可以认为此时所有数据被取出,为空
  • 当(in + 1)%4 == out的时候,可以认为此时的缓冲区已经写满了
  • (推论)当(cal + 1)%4 == in的时候,认为此时生产的速度已经追上了计算的速度,此时正在发生计算,不能够进行读取和写出的操作.
    于是我们就能够得到相应的限制条件.

使用信号量的解决思路

初始化的时候,将所有的信号量的初值定为0:

1
2
3
res = sem_init(&sem_producer, 1, 0); // sem_init(sem_t *信号量, int 信号本身是否在进程间共享, int 信号量初值)
res = sem_init(&sem_calculator, 1, 0);
res = sem_init(&sem_consumer, 1, 0);

然后我们在循环处,利用PV操作将计算者和消费者卡在循环处:

1
2
while(buffer_is_calculate())
sem_wait(&sem_calculator);

关键:在这之后,我们使用mutex将当前操作上锁

1
pthread_mutex_lock(&mutex);

之后我们就能够正常的完成我们的作业操.完成后,发送信号量到下一个操作对象处并且解锁:

1
2
sem_post(&sem_next_one);
pthread_mutex_unlock(&mutex);

这样就能够实现利用信号量阻塞部分操作,并且让指定操作在某些作业后之后执行

使用条件变量的解决思路

条件变量和信号量不太一样,条件变量是多线程中实现[等待->唤醒]功能的关键参数.举个例子来说:

1
2
3
4
5
6
7
8
9
10
// thread 1
pthread_mutex_lock(&mutex);
pthread_cond_t cond_mutex;
pthread_cond_wait(&cond_mutex, &mutex);
pthread_mutex_unlock(&mutex);
// thread 2
pthread_mutex_lock(&mutex);
pthread_cond_signal(&cond_mutex);
pthread_mutex_unlock(&mutex);

这里的cond_mutex就相当于是一个[带有条件的mutex],并且此时卡在当前指定的mutex上,当线程2开始执行之后,发出的信号将会重新激活thread 1,将其重新运行.这里的程序运行顺序为:

1
thread1 lock --> thread1 cond wait -- > thread1 unlock(but wait) -- > thread2 lock --> thread2 cond wait --> thread2 unlock --> thread1 lock(wait) --> thread1 unlock

这里注意到,和信号量相比最主要的区别还是在这个lock和unlock的位置.也就是说当我们使用条件变量的时候,我们的操作逻辑应该是先上锁,再设条件

1
2
3
pthread_mutex_lock(&mutex);
while(buffer_is_full())
pthread_cond_wait(&wait_full_buffer, &mutex);

之后的逻辑和信号量差不多,只是这里使用的是激活操作;

1
2
pthread_cond_signal(&wait_for_calc);
pthread_mutex_unlock(&mutex);

http://blog.csdn.net/erickhuang1989/article/details/8754357