Operating Systems

本文最后更新于:2023年5月30日 下午

Operating System

第一章 关于本书的对话

关于 Three Easy Pieces 指的是 **虚拟化(virtualization), 并发(concurrency), 持久性(persistence)**这是我们要学习的三个重要概念。

第二章 操作系统介绍

有一类软件负责让程序运行变得更加容易,允许你同时运行多个程序,允许程序共享内存,让程序能与设备进行交互,以及其他类似有趣的工作。这些软件称为 操作系统(Operating System,OS)

第三章 关于虚拟化的对话

虚拟化:给一群人一共一个桃子,但让每个人认为,他独占这个桃子。

第四章 抽象:进程

进程:运行中的程序。
时分共享CPU(time sharing):让一个进程只允许一个时间片,然后切换到其他进程。

进程 API

所有现代操作系统都以某种形式提供这些 API

  • 创建(create):操作系统包含一些创建新进程的方法。
  • 销毁(destroy):销毁进程的接口。
  • 等待(wait):等待进程停止运行。
  • 其他控制(miscellaneous control):例如暂停、恢复等操作。
  • 状态(statu):获取进制的状态信息。

进程创建

首先,将程序(代码、静态数据)加载(load)到内存中,加载到进程的地址空间中。

现代操作系统是懒加载(lazily load),即仅在程序执行期间需要的代码或数据片段,才会加载。

加载了代码与静态数据到内存后,操作系统需要为程序的运行时栈分配一些内存。
再为堆也分配一些内存。并执行一些与I/O相关的工作。

当OS终于为程序执行搭好了舞台,他还有最后一项任务:启动程序。在入口处运行,即 main()。通过跳转到 main()例程,OS 将 CPU 的控制权转移到新创建的进程中。

进程状态

进程可以处于以下三种状态之一:

  • 运行(running):运行状态下,进程在处理器上运行。
  • 就绪(ready):程序准备好运行。
  • 阻塞(blocked):阻塞状态下,一个进程执行了某种操作,直到其他事件发生才会准备运行。例如一个进程向磁盘发起 I/O 请求,它会被阻塞,其他进程可以使用处理器。

第五章 插叙:进程 API

fork()系统调用

系统调用 fork()用于创建新进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

int main(int argc, char *argv[])
{
printf("hello world (pid:%d)\n", (int) getpid());
int rc = fork();
if (rc < 0) { // fork failed; exit
fprintf(stderr, "fork failed\n");
exit(1);
} else if (rc == 0) { // child (new process)
printf("hello, I am child (pid:%d)\n", (int) getpid());
} else { // parent goes down this path (main)
printf("hello, I am parent of %d (pid:%d)\n",
rc, (int) getpid());
}
return 0;
}

输出如下:

1
2
3
4
5
prompt> ./p1
hello world (pid:29146)
hello, I am parent of 29147 (pid:29146)
hello, I am child (pid:29147)
prompt>

fork()出的子进程不会从 main()函数开始执行(因此hello world 信息只输出了一次),而是直接从 fork()系统调用返回。就好像它自己调用了 fork()。
子进程并不是完全拷贝了父进程。具体来说,虽然它拥有自己的地址空间、寄存器、程序计数器等,但它从 fork() 返回的值是不同的。父进程获得的 fork() 返回值是子进程的 pid , 而子进程获得的返回值是 0 。这个差别很重要。
且父、子进程输出是不确定的,即有可能父进程先运行,也有可能子进程先运行。
CPU 调度程序(scheduler) 决定了某个时刻哪个进程被执行。

wait() 系统调用

当父进程调用 wait() 于某处时,若父进程碰巧被先执行,也会延迟执行,等到子进程执行完毕,wait() 才返回父进程。

exec() 系统调用

exec() 给定可执行程序的名称及需要的参数后,exec() 会从可执行程序中加载代码和静态数据,并用它覆写自己的代码段,堆、栈及其他内存空间也会被重新初始化。子进程执行exec()之后,几乎就像 p3.c 未曾运行过一样,即对 exec() 的成功调用永远不会返回

如下例中,子进程调用 exec() 后,exec调用的程序执行完以后,子进程内未进行的代码段不会进行,而是直接去进行排队等待的父进程去了。

关于管道:pipe() 系统调用。这种情况下,一个进程的输出被链接到了一个内核管道(pipe)上(队列), 另一个进程的输入也被链接到同一个管道上。因此,前一个进程的输出可以无缝地作为后一个进程的输入,许多命令可以用这种方式串联起来,完成某项任务。(23.03.23更)

第五章课后习题

1.编写一个调用 fork()的程序。在调用 fork()之前,让主进程访问一个变量(例如 x) 并将其值设置为某个值(例如 100)。子进程中的变量有什么值?当子进程和父进程都改变 x 的值时,变量会发生什么?

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>

int main()
{
printf("hello world ( pid : %d )\n",(int) getpid());
int x = 100;
int rc = fork();
if(rc < 0 )
{
printf("fork failed\n");
}
else if(rc ==0 )
{
printf("hello, I am Child (pid : %d)\n",(int) getpid());
printf("The value x = %d \n",x);
x += 1;
printf("The value x has been changed to = %d \n",x);
printf("The value x = %d \n",x);
}
else
{
printf("hello, I am Parent of %d (pid : %d)\n",rc,(int) getpid());
printf("The value x = %d \n",x);
x += 10;
printf("The value x has been changed to = %d \n",x);
printf("The value x = %d \n",x);
}
return 0;
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
kawa@kawa-virtual-machine:~/Desktop$ ./h1
hello world ( pid : 3990 )
hello, I am Parent of 3991 (pid : 3990)
The value x = 100
The value x has been changed to = 110
The value x = 110
hello, I am Child (pid : 3991)
The value x = 100
The value x has been changed to = 101
The value x = 101

可见,父、子进程中,x 初始值均为 100,因为 fork() 时,是复制的当时的程序数据,给到子进程,所以 x 初值均为 100 。

3.使用 fork()编写另一个程序。子进程应打印“hello”,父进程应打印“goodbye”。你应该尝试确保子进程始终先打印。你能否不在父进程调用 wait()而做到这一点呢?

使用vfork() 实现这一点:

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>

int main()
{
printf("hello world ( pid : %d )\n",(int) getpid());
int rc = vfork();
if(rc < 0 )
{
printf("fork failed\n");
}
else if(rc ==0 )
{

printf("hello\n");
exit(1);
}
else
{
printf("goodbye!\n");
}
return 0;
}

输出结果如下:

1
2
3
4
kawa@kawa-virtual-machine:~/Desktop$ ./h3
hello world ( pid : 5321 )
hello
goodbye!

可见,vfork() 成功完成了其使命。

5.现在编写一个程序,在父进程中使用 wait(),等待子进程完成。wait()返回什么?如果你在子进程中使用 wait()会发生什么?

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>

int main()
{
printf("hello world ( pid : %d )\n",(int) getpid());
int rc = fork();
if(rc < 0 )
{
printf("fork failed\n");
}
else if(rc ==0 )
{

printf("This is Child pid = %d \n",(int) getpid());
exit(1);
}
else
{
printf("This is parent pid = %d \n" ,(int) getpid());
int w = wait();
printf("the return value of wait() is : %d \n",w);
}
return 0;
}

结果:

1
2
3
4
5
kawa@kawa-virtual-machine:~/Desktop$ ./h5
hello world ( pid : 5479 )
This is parent pid = 5479
This is Child pid = 5480
the return value of wait() is : 5480

显然,wait() 返回值为 子进程的pid。

当子进程wait时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>

int main()
{
printf("hello world ( pid : %d )\n",(int) getpid());
int rc = fork();
if(rc < 0 )
{
printf("fork failed\n");
}
else if(rc ==0 )
{

printf("This is Child pid = %d \n",(int) getpid());
int w = wait();
printf("the return value of wait() is : %d \n",w);
exit(1);
}
else
{
printf("This is parent pid = %d \n" ,(int) getpid());
}
return 0;
}

结果如下:

1
2
3
4
5
6
kawa@kawa-virtual-machine:~/Desktop$ ./h5
hello world ( pid : 5569 )
This is parent pid = 5569
This is Child pid = 5570
the return value of wait() is : -1

可见,子进程中调用 wait,返回值为 -1。

因为wait() 返回值为成功结束的子进程,而子进程中无法收集到更下一级的子进程pid(因为不存在),就会导致回收失败,返回值为-1。

7.编写一个创建子进程的程序,然后在子进程中关闭标准输出(STDOUT_FILENO)。 如果子进程在关闭描述符后调用 printf()打印输出,会发生什么?

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>

int main()
{
int rc = fork();
if (rc < 0) {
fprintf(stderr, "fork failed");
exit(1);
} else if (rc == 0) {
close(STDOUT_FILENO);
printf("output child\n");
}
wait(NULL);
return 0;
}

输出:

无。

没有输出。

习题到此为止。

第六章 机制:受限直接执行

三种切换内核态

  • 系统调用
  • 异常
  • 中断

以下是直接运行协议:

其过程大致是:在操作系统上进行一系列准备操作(进程列表上创建条目、分配内存、加载、设置栈等等),如何程序执行交还程序本身,进行执行,最后由操作系统收尾(清除内存等)。

解决方案

引入一种新的处理器模式:用户模式(user mode);

在用户模式下,进程会受到限制,例如不能发出I/O请求。

与用户模式不同的内核模式(kernel mode),操作系统(或内核)就以这种模式运行。 在此模式下,运行的代码可以做它喜欢的事,包括特权操作。

内核通过在启动时设置陷阱表(trap table)来实现。当机器启动时,它在特权(内核)模 式下执行,因此可以根据需要自由配置机器硬件。操作系统做的第一件事,就是告诉硬件在发生某些异常事件时要运行哪些代码。

进程间切换

时钟中断(timer interrupt)

时钟设备可以编程为每隔几毫秒产生一次中断。产生中断时,当前正在运行的进 程停止,操作系统中预先配置的中断处理程序(interrupt handler)会运行。此时,操作系统 重新获得 CPU 的控制权,因此可以做它想做的事:停止当前进程,并启动另一个进程。

是继续运行当前正在运行的进程,还是切换到另一个进程。这 个决定是由调度程序(scheduler)做出的,它是操作系统的一部分。

上下文切换

操作系统要做的就是为当前正在执行的进程保存一些寄存器 的值(例如,到它的内核栈),并为即将执行的进程恢复一些寄存器的值(从它的内核栈)。

为了保存当前正在运行的进程的上下文,操作系统会执行一些底层汇编代码,来保存 通用寄存器、程序计数器,以及当前正在运行的进程的内核栈指针,然后恢复寄存器、程 序计数器,并切换内核栈,供即将运行的进程使用。

第七章 进程调度:介绍

调度策略(sheculing policy)

T 周转时间= T 完成时间−T 到达时间

FIFO先进先出

A 先运行 100s,B 或 C 才有机会运行。因此,系统的平均周转时间是比 较高的:令人不快的 110s((100 + 110 + 120)/ 3 = 110)。护航效应(convoy effect)

最短任务优先(SJF)

这个新的调度准则被称为最短任 务优先(Shortest Job First,SJF),该名称应该很容易记住,因为它完全描述了这个策略: 先运行最短的任务,然后是次短的任务,如此下去。

最短完成时间优先(STCF)

当 B 和 C 到达时,调度程序当然可以做其他事情:它可以抢占(preempt)工作 A,并 决定运行另一个工作,或许稍后继续工作 A。根据我们的定义,SJF 是一种非抢占式 (non-preemptive)调度程序

向 SJF 添加抢占,称为最短完成时间优先(Shortest Time-to-Completion First,STCF)抢占式最短作业优先(Preemptive Shortest Job First ,PSJF)调度程序

新度量指标:响应时间

T 响应时间= T 首次运行−T 到达时间

例如,如果 3 个工作同时到 达,第三个工作必须等待前两个工作全部运行后才能运行。这种方法虽然有很好的周转时 间,但对于响应时间和交互性是相当糟糕的。假设你在终端前输入,不得不等待 10s 才能看 到系统的回应,只是因为其他一些工作已经在你之前被调度:你肯定不太开心。

轮转

RR 在一个时间片(time slice,有时称为调度量子,scheduling quantum)内运行一个工作,然后切换到运行队列中的下一个任务,而不是运行一个任务直 到结束。它反复执行,直到所有任务完成。

时间片长度必须是时钟中断周期的倍数

时间片太短是有问题的:突然上下文切换的成本将影响整体性能。因此,系统设计者 需要权衡时间片的长度,使其足够长,以便摊销(amortize)上下文切换成本,而又不会使 系统不及时响应。

如果周转时间是我们的指标,那么 RR 确实是最糟糕的策略之一

RR会趋向于每个进程都要耗费数倍的时间完成。

我们开发了两种调度程序。第一种类型(SJF、STCF)优化周转时间,但对响应时间不利。第二种类型(RR)优化响应时间,但对周转时间不利

不能 “既要。。。又要。。。”

结合I/O

一种常见的方法是将 A 的每个 10ms 的子工作视为一项独立的工作。因此,当系统启动 时,它的选择是调度 10ms 的 A,还是 50ms 的 B。对于 STCF,选择是明确的:选择较短的 一个,在这种情况下是 A。然后,A 的工作已完成,只剩下 B,并开始运行。然后提交 A 的一个新子工作,它抢占 B 并运行 10ms。这样做可以实现重叠(overlap),一个进程在等 待另一个进程的 I/O 完成时使用 CPU,系统因此得到更好的利用

第八章 调度:多级反馈队列

MLFQ:基本规则

例如,如果一个工作不断放弃 CPU 去等待键盘输入,这是交互型进程的可能行为,MLFQ 因此会让它保持高优先级。相 反,如果一个工作长时间地占用 CPU,MLFQ 会降低其优先级。通过这种方式,MLFQ 在 进程运行过程中学习其行为,从而利用工作的历史来预测它未来的行为。

至此,我们得到了 MLFQ 的两条基本规则。

规则 1:如果 A 的优先级 > B 的优先级,运行 A(不运行 B)。

规则 2:如果 A 的优先级 = B 的优先级,轮转运行 A 和 B。

改变优先级

我们必须决定,在一个工作的生命周期中,MLFQ 如何改变其优先级(在哪个队列中)。 要做到这一点,我们必须记得工作负载:既有运行时间很短、频繁放弃 CPU 的交互型工作, 也有需要很多 CPU 时间、响应时间却不重要的长时间计算密集型工作。下面是我们第一次 尝试优先级调整算法。

规则 3:工作进入系统时,放在最高优先级(最上层队列)。

规则 4a:工作用完整个时间片后,降低其优先级(移入下一个队列)。

规则 4b:如果工作在其时间片以内主动释放 CPU, 则优先级不变。

如果不知道工作是短工 作还是长工作,那么就在开始的时候假设其是短工作,并赋予最高优先级。如果确实是短 工作,则很快会执行完毕,否则将被慢慢移入低优先级队列,而这时该工作也被认为是长 工作了。通过这种方式,MLFQ 近似于 SJF。

有I/O

如果进程在时间片用完之前主动放弃 CPU, 则保持它的优先级不变。这条规则的意图很简单:假设交互型工作中有大量的 I/O 操作(比 如等待用户的键盘或鼠标输入),它会在时间片用完之前放弃 CPU。在这种情况下,我们不 想处罚它,只是保持它的优先级不变。

关于MLFQ的问题:

首先,会有饥饿(starvation)问题。如果系统有“太多”交互型工作,就会不断占用 CPU,导致长工作永远无法得到 CPU(它们饿死了)。即使在这种情况下,我们希望这些长 工作也能有所进展。

其次,聪明的用户会重写程序,愚弄调度程序(game the scheduler)。愚弄调度程序指 的是用一些卑鄙的手段欺骗调度程序,让它给你远超公平的资源。上述算法对如下的攻击 束手无策:进程在时间片用完之前,调用一个 I/O 操作(比如访问一个无关的文件),从而 主动释放 CPU。如此便可以保持在高优先级,占用更多的 CPU 时间。做得好时(比如,每 运行 99%的时间片时间就主动放弃一次 CPU),工作可以几乎独占 CPU。

尝试提高优先级

规则 5:经过一段时间 S,就将系统中所有工作重新加入最高优先级队列。

尝试更好的计时方法

这里的解决方案,是为 MLFQ 的每层队列提供更完善的 CPU 计时方式(accounting)。 调度程序应该记录一个进程在某一层中消耗的总时间,而不是在调度时重新计时。只要进 程用完了自己的配额,就将它降到低一优先级的队列中去。不论它是一次用完的,还是拆 成很多次用完。因此,我们重写规则 4a 和 4b。

规则 4:一旦工作用完了其在某一层中的时间配额(无论中间主动放弃了多少次 CPU),就降低其优先级(移入低一级队列)。

1
2
3
4
5
6
7
//小结:
规则 1:如果 A 的优先级 > B 的优先级,运行 A(不运行 B)。
规则 2:如果 A 的优先级 = B 的优先级,轮转运行 A 和 B。
规则 3:工作进入系统时,放在最高优先级(最上层队列)。
规则 4:一旦工作用完了其在某一层中的时间配额(无论中间主动放弃了多少次
CPU),就降低其优先级(移入低一级队列)。
规则 5:经过一段时间 S,就将系统中所有工作重新加入最高优先级队列。

第九章 调度:比例份额

在本章中,我们来看一个不同类型的调度程序——比例份额(proportional-share)调度 程序,有时也称为公平份额(fair-share)调度程序。比例份额算法基于一个简单的想法:调 度程序的最终目标,是确保每个工作获得一定比例的 CPU 时间,而不是优化周转时间和响 应时间。

基本概念:彩票数表示份额

下面来看一个例子。假设有两个进程 A 和 B,A 拥有 75 张彩票,B 拥有 25 张。因此 我们希望 A 占用 75%的 CPU 时间,而 B 占用 25%。

通过不断定时地(比如,每个时间片)抽取彩票,彩票调度从概率上(但不是确定的) 获得这种份额比例。抽取彩票的过程很简单:调度程序知道总共的彩票数(在我们的例子 中,有 100 张)。调度程序抽取中奖彩票,这是从 0 和 99①之间的一个数,拥有这个数对应 的彩票的进程中奖。假设进程 A 拥有 0 到 74 共 75 张彩票,进程 B 拥有 75 到 99 的 25 张, 中奖的彩票就决定了运行 A 或 B。调度程序然后加载中奖进程的状态,并运行它。

第11章 关于 CPU 虚拟化的总结

第12章 关于内存虚拟化的对话

第14章 插叙:内存操作 API

第 15 章 机制:地址转换

受限直接访问(Limited Direct Execution,LDE)::让程序运行的大部分指令直接访问硬件,只在 一些关键点(如进程发起系统调用或发生时钟中断)由操作系统介入来确保“在正确时间, 正确的地点,做正确的事”。

静态重定位

基本技术被称为静态重定位(static relocation),其中一个名为加载程序(loader)的软件接手将要运行的可执行程序,将它的地址重写到物理内存中期望的偏移位置。

例如,程序中有一条指令是从地址 1000 加载到寄存器(即 movl 1000,%eax),当整个程序的地址 空间被加载到从 3000(不是程序认为的 0)开始的物理地址中,加载程序会重写指令中的地址(即 movl 4000, %eax),从而完成简单的静态重定位

也就是直接改写代码中的地址,完成重定位

动态重定位

基址(base)寄存器界限(bound)寄存器,程序执行时,操作系统会决定其在物理内存中的实际加载地址,并将起始地址记录在基址寄存器中。

现在,该进程产生的所有内存引用,都会被处理器通过以下方式转换为物理地址:

1
physical address = virtual address + base

进程中使用的内存引用都是虚拟地址(virtual address),硬件接下来将虚拟地址加上基址寄存器中的内容,得到物理地址(physical address),再发给内存系统。

将虚拟地址转换为物理地址,这正是所谓的地址转换(address translation)技术。也就是说,硬件取得进程认为它要访问的地址,将它转换成数据实际位于的物理地址。

界限寄存器:硬件在将虚拟地址与基址寄存器内容求和前,就检查这个界限。如果超出界限,就产生错误。

第十六章 分段

显示引用段

例如使用14位虚拟地址的前两位来标识。

第13、12位(第0位开始)作为段标识,若这两位是00,则硬件知道,这是属于代码段的地址,便使用代码段的基址和界限来重定位到正确的物理地址。若前两位是 01,则是堆地址,使用堆的基址和界限。

如某一个虚拟地址:

01 0000 0110 1000

其前2位表示这是引用的堆段,后12位是段内偏移。偏移量与基址寄存器相加,硬件就得到了最终的物理地址。

请注意,偏移量也简化了对段边界的判断。我们只要检查偏移量是否小于界限,大于界限的为非法地址。

隐式引用段

利用地址是如何产生的,来区分不同的段

在隐式(implicit)方式中,硬件通过地址产生的方式来确定段。例如,如果地址由程序计数器产生(即它是指令获取),那么地址在代码段。如果基于栈或基址指针,它一定在栈段。其他地址则在堆段。

栈是反向生长的

首先,我们需要一点硬件支持。除了基址和界限外,硬件还需要知道段的增长方向,使用一位区分,比如 1 代表自小而大增长,0 反之。

支持共享

为了支持共享,需要一些额外的硬件支持,这就是保护位(protection bit)

为每个段增加了几个位,标识程序是否能够读写该段,或执行其中的代码。

粗粒度与细粒度

到目前为止,我们的例子大多针对只有很少的几个段的系统(即代码、栈、堆)。

我们可以认为这种分段是粗粒度的(coarse-grained);

但是,一些早期系统(如 Multics[CV65, DD68])更灵活,允许将地址空间划分为大量较小的段,这被称为细粒度(fine-grained)分段。

第 17 章 空闲空间管理

外部碎片问题

外部碎片(external fragmentation)的问题:空闲空间被分割成不同大小的小块,成为碎片,后续的请求可能失败,因为没有一块足够大的连续空闲空间,即使这时总的空闲空间超出了请求的大小。

空闲列表(free list):在堆上管理空闲空间的数据结构通常称为空闲列表(free list);

内部碎片

如果分配程序给出的内存块超出请求的大小,在这种块中超出请求的空间(因此而未使用)就被认为是内部碎片(因为浪费发生在已分配单元的内部),这是另一种形式的空间浪费。

底层机制

分割与合并

比如某 30 字节的堆,其 10 ~ 20 字节被占用,则空闲列表会有两个元素,一个描述第一个 10 字节的空闲区域(0 ~ 9),另一个描述另一个空闲区域(字节 20~29)

通过上面的介绍可以看出,任何大于 10 字节的分配请求都会失败(返回 NULL),因为没有足够的连续可用空间。而恰好 10 字节的需求可以由两个空闲块中的任何一个满足。但是,如果申请小于 10 字节空间,会发生什么?

假设我们只申请一个字节的内存。这种情况下,分配程序会执行所谓的分割(splitting)动作:它找到一块可以满足请求的空闲空间,将其分割,第一块返回给用户,第二块留在空闲列表中。

而当 free 掉中间的拦路虎,这个 10byte 的空间时,空闲列表会将空闲空间合并(coalescing),而不是在原位置增加一个【addr :10,len :10】

通过合并,我们会获得一个更大的连续的可分配空间。

追踪已分配空间的大小

对于 free(void *ptr)函数调用,这个接口没有块大小的参数。

要完成这个任务,大多数分配程序都会在头块(header)中保存一点额外的信息,它在内存中,通常就在返回的内存块之前。

我们检查一个 20 字节的已分配块,由 ptr 指着,设想用户调用了 malloc(),并将结果保存在 ptr 中:ptr = malloc(20)。

该头块中至少包含所分配空间的大小(这个例子中是 20)。

它也可能包含一些额外的指针来加速空间释放,包含一个幻数来提供完整性检查,以及其他信息。

上面的例子看起来会像图 17.2 的样子。用户调用 free(ptr) 时,库会通过简单的指针运算得到头块的位置:

1
2
3
void free(void *ptr) {
header_t *hptr = (void *)ptr - sizeof(header_t);
}

也就是说,指针指向完整的一块空间,头块是分配地址时,于这块空间之前,分配的一个结构体,其内部存储了一些信息,比如接下来的分配了的空间的大小。

实际释放的是头块大小加上分配给用户的空间的大小。因此,如果用户请求 N 字节的内存,库不是寻找大小为 N 的空闲块,而是寻找 N 加上头块大小的空闲块。

嵌入空闲列表

类似分配的头块一样,空闲空间的头部,也会有“空闲头块”,来指明其后续空间有多少可用,下一块空闲空间在哪。

当内存中,前三块分配了100+8(head)的空间后,又将中间那块内存给释放掉,就会出现下图的变化:

让堆增长

当堆的空间用完后,有以下两种应对方式:

  • 直接返回错误/NULL;
  • 使用某些系统调用(如UNIX中的 sbrk),让堆增长。最开始分配一个小的堆空间,随着使用时的不够用,再增大。

基本策略

最优匹配

最优匹配(best fit)策略非常简单:首先遍历整个空闲列表,找到和请求大小一样或更大的空闲块,然后返回这组候选者中最小的一块。这就是所谓的最优匹配(也可以称为最小匹配)。只需要遍历一次空闲列表,就足以找到正确的块并返回。

其可尽量返回最接近用户需求大小的块,尽量避免空间浪费。

最差匹配

尽量找最大的块,从中间分一部分给用户的请求。最差匹配尝试在空闲列表中保留较大的块,而不是向最优匹配那样可能剩下很多难以利用的小块。但是,最差匹配同样需要遍历整个空闲列表。更糟糕的是,大多数研究表明它的表现非常差,导致过量的碎片,同时还有很高的开销。

首次匹配

首次匹配(first fit)策略就是找到第一个足够大的块,将请求的空间返回给用户。同样, 剩余的空闲空间留给后续请求。

首次匹配有速度优势,但有时会让空闲列表开头的部分有很多小块。

下次匹配

不同于首次匹配每次都从列表的开始查找,下次匹配(next fit)算法多维护一个指针,指向上一次查找结束的位置。其想法是将对空闲空间的查找操作扩散到整个列表中去,避免对列表开头频繁的分割。

其它方式

分离空闲列表

一直以来有一种很有趣的方式叫作分离空闲列表(segregated list)。基本想法很简单: 如果某个应用程序经常申请一种(或几种)大小的内存空间,那就用一个独立的列表,只管理这样大小的对象。其他大小的请求都一给更通用的内存分配程序。

这种方法的好处显而易见。通过拿出一部分内存专门满足某种大小的请求,碎片就不再是问题了。而且,由于没有复杂的列表查找过程,这种特定大小的内存分配和释放都很快。

伙伴系统

因为合并对分配程序很关键,所以人们设计了一些方法,让合并变得简单,一个好例子就是二分伙伴分配程序(binary buddy allocator)

在这种系统中,空闲空间首先从概念上被看成大小为 2N 的大空间。当有一个内存分配请求时,空闲空间被递归地一分为二,直到刚好可以满足请求的大小(再一分为二就无法满足)。这时,请求的块被返回给用户。在下面的例子中,一个 64KB 大小的空闲空间被切分,以便提供 7KB 的块:

在这个例子中,最左边的 8KB 块被分配给用户(如上图中深灰色部分所示)。请注意, 这种分配策略只允许分配 2 的整数次幂大小的空闲块,因此会有内部碎片(internal fragment) 的麻烦。

伙伴系统的漂亮之处在于块被释放时。如果将这个 8KB 的块归还给空闲列表,分配程序会检查“伙伴”8KB 是否空闲。如果是,就合二为一,变成 16KB 的块。然后会检查这个 16KB 块的伙伴是否空闲,如果是,就合并这两块。这个递归合并过程继续上溯,直到合并整个内存区域,或者某一个块的伙伴还没有被释放。

伙伴系统运转良好的原因,在于很容易确定某个块的伙伴。怎么找?仔细想想上面例子中的各个块的地址。如果你想得够仔细,就会发现每对互为伙伴的块只有一位不同,正是这一位决定了它们在整个伙伴树中的层次。现在你应该已经大致了解了二分伙伴分配程序的工作方式。

第 18 章 分页:介绍

将空间分割成固定长度的分片。在虚拟内存中,我们称这种思想为分页,可以追溯到一个早期的重要系统,Atlas[KE+62, L78]。分页不是将一个进程的地址空间分割成几个不同长度的逻辑段(即代码、堆、段),而是分割成固定大小的单元,每个单元称为一页。相应地,我们把物理内存看成是定长槽块的阵列,叫作页帧(page frame)。每个这样的页帧包含一个虚拟内存页。

为了记录地址空间的每个虚拟页放在物理内存中的位置,操作系统通常为每个进程保存一个数据结构,称为页表(page table)。页表的主要作用是为地址空间的每个虚拟页面保存地址转换(address translation),从而让我们知道每个页在物理内存中的位置。

页表因此具有以下 4 个条目:(虚拟页 0→物理帧 3)、(VP 1→PF 7)、 (VP 2→PF 5)和(VP 3→PF 2)。

重要的是要记住,这个页表是一个每进程的数据结构(我们讨论的大多数页表结构都是每进程的数据结构,我们将接触的一个例外是倒排页表,inverted page table)

现在,我们了解了足够的信息,可以完成一个地址转换的例子。设想拥有这个小地址 空间(64 字节)的进程正在访问内存:

1
movl <virtual Address> , %eax

具体来说,注意从地址到寄存器 eax 的数据显式加载(因此忽略之前肯定会发生的指令获取)。

为了转换(translate)该过程生成的虚拟地址,我们必须首先将它分成两个组件:虚拟页面号(virtual page number,VPN)和页内的偏移量(offset)。对于这个例子,因为进程的虚拟地址空间是 64 字节,我们的虚拟地址总共需要 6 位(26 = 64)。因此,虚拟地址可以表示如下:

Va5 Va4 Va3 Va2 Va1 Va0

在该图中,Va5 是虚拟地址的最高位,Va0 是最低位。因为我们知道页的大小(16 字节), 所以可以进一步划分虚拟地址,如下所示:

页面大小为 16 字节,位于 64 字节的地址空间。因此我们需要能够选择 4 个页,地址 的前 2 位就是做这件事的。因此,我们有一个 2 位的虚拟页号(VPN)。其余的位告诉我们, 感兴趣该页的哪个字节,在这个例子中是 4 位,我们称之为偏移量。 当进程生成虚拟地址时,操作系统和硬件必须协作,将它转换为有意义的物理地址。 例如,让我们假设上面的加载是虚拟地址 21:

movl 21 , %eax

将“21”变成二进制形式,是“010101”,因此我们可以检查这个虚拟地址,看看它是 如何分解成虚拟页号(VPN)和偏移量的:

因此,虚拟地址“21”在虚拟页“01”(或 1)的第 5 个(“0101”)字节处。通过虚拟 页号,我们现在可以检索页表,找到虚拟页 1 所在的物理页面。在上面的页表中,物理帧号(PFN)(有时也称为物理页号,physical page number 或 PPN)是 7(二进制 111)。因此, 我们可以通过用 PFN 替换 VPN 来转换此虚拟地址, 然后将载入发送给物理内存(见图 18.3)。

请注意,偏移量保持不变(即未翻译),因为偏 移量只是告诉我们页面中的哪个字节是我们想要的。 我们的最终物理地址是 1110101(十进制 117),正是 我们希望加载指令(见图 18.2)获取数据的地方。

页表存在哪里

稍后我们会看到,很多 操作系统内存本身都可以虚拟化,因此页表可以存储在操 作系统的虚拟内存中(甚至可以交换到磁盘上)

列表中有什么

页表就是一种数据结构,用于将虚拟地址(或者实际上,是虚拟页号)映射到物理地址(物理帧号)。

最简单的形式称为线性页表(linear page table),就是一个数组。操作系统通过虚拟页号(VPN)检索该数组,并在该索引处查找页表项(PTE),以便找到期望的物理帧号(PFN)。

至于每个 PTE 的内容,我们在其中有许多不同的位,值得有所了解。有效位(valid bit) 通常用于指示特定地址转换是否有效。例如,当一个程序开始运行时,它的代码和堆在其地址空间的一端,栈在另一端。所有未使用的中间空间都将被标记为无效(invalid),如果进程尝试访问这种内存,就会陷入操作系统,可能会导致该进程终止。

我们还可能有保护位(protection bit),表明页是否可以读取、写入或执行。同样,以这 些位不允许的方式访问页,会陷入操作系统

还有其他一些重要的部分,但现在我们不会过多讨论。存在位(present bit)表示该页 是在物理存储器还是在磁盘上(即它已被换出,swapped out)。当我们研究如何将部分地址 空间交换(swap)到磁盘,从而支持大于物理内存的地址空间时,我们将进一步理解这一 机制。交换允许操作系统将很少使用的页面移到磁盘,从而释放物理内存。脏位(dirty bit) 也很常见,表明页面被带入内存后是否被修改过。

参考位(reference bit,也被称为访问位,accessed bit)有时用于追踪页是否被访问,也 用于确定哪些页很受欢迎,因此应该保留在内存中。这些知识在页面替换(page replacement) 时非常重要,我们将在随后的章节中详细研究这一主题。

分页:也很慢

页表基址寄存器(page-table base register)包含页表的起始位置的物理地址。

为了找到想要的 PTE 的位置,硬件将执行以下功能:

1
2
VPN = (VirtualAddress & VPN_MASK) >> SHIFT
PTEAddr = PageTableBaseRegister + (VPN * sizeof(PTE))

在我们的例子中,VPN MASK 将被设置为 0x30(十六进制 30,或二进制 110000),它 从完整的虚拟地址中挑选出 VPN 位;SHIFT 设置为 4(偏移量的位数),这样我们就可以将 VPN 位向右移动以形成正确的整数虚拟页码。例如,使用虚拟地址 21(010101),掩码将 此值转换为 010000,移位将它变成 01,或虚拟页 1,正是我们期望的值。然后,我们使用 该值作为页表基址寄存器指向的 PTE 数组的索引。

一旦知道了这个物理地址,硬件就可以从内存中获取 PTE,提取 PFN,并将它与来自 虚拟地址的偏移量连接起来,形成所需的物理地址。具体来说,你可以想象 PFN 被 SHIFT 左移,然后与偏移量进行逻辑或运算,以形成最终地址

1
2
offset = VirtualAddress & OFFSET_MASK
PhysAddr = (PFN << SHIFT) | offset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1 // Extract the VPN from the virtual address
2 VPN = (VirtualAddress & VPN_MASK) >> SHIFT
3
4 // Form the address of the page-table entry (PTE)
5 PTEAddr = PTBR + (VPN * sizeof(PTE))
6
7 // Fetch the PTE
8 PTE = AccessMemory(PTEAddr)
9
10 // Check if process can access the page
11 if (PTE.Valid == False)
12 RaiseException(SEGMENTATION_FAULT)
13 else if (CanAccess(PTE.ProtectBits) == False)
14 RaiseException(PROTECTION_FAULT)
15 else
16 // Access is OK: form physical address and fetch it
17 offset = VirtualAddress & OFFSET_MASK
18 PhysAddr = (PTE.PFN << PFN_SHIFT) | offset
19 Register = AccessMemory(PhysAddr)

对于每个内存引用(无论是取指令还是显式加载或存储),分页都需要我们执行一个 额外的内存引用,以便首先从页表中获取地址转换。工作量很大!额外的内存引用开销很 大,在这种情况下,可能会使进程减慢两倍或更多。

内存追踪

略(就是小结)

第 19 章 分页:快速地址转换(TLB)

虚拟地址转换的映射信息,往往储存在物理内存中,故转换虚拟地址时,分页逻辑上需要额外一次的内存访问。这个会带来较高的性能开销。

于是我们需要思考,如何加速地址转换。

TLB(translation-lookaside buffer, TLB),即地址转换旁路缓冲储存器

它就是频繁发生的虚拟到物理地址转换的硬件缓存(cache)

对每次内存访问,硬件先检查 TLB,看看其中是否有期望的转换映射,如果有,就完成转换(很快),不用访问页表(其中有全部的转换映射)。

TLB 的基本算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1  VPN = (VirtualAddress & VPN_MASK) >> SHIFT
2 (Success, TlbEntry) = TLB_Lookup(VPN)
3 if (Success == True) // TLB Hit
4 if (CanAccess(TlbEntry.ProtectBits) == True)
5 Offset = VirtualAddress & OFFSET_MASK
6 PhysAddr = (TlbEntry.PFN << SHIFT) | Offset
7 AccessMemory(PhysAddr)
8 else
9 RaiseException(PROTECTION_FAULT)
10 else // TLB Miss
11 PTEAddr = PTBR + (VPN * sizeof(PTE))
12 PTE = AccessMemory(PTEAddr)
13 if (PTE.Valid == False)
14 RaiseException(SEGMENTATION_FAULT)
15 else if (CanAccess(PTE.ProtectBits) == False)
16 RaiseException(PROTECTION_FAULT)
17 else
18 TLB_Insert(VPN, PTE.PFN, PTE.ProtectBits)
19 RetryInstruction()

硬件算法的大体流程如下:

  • 首先从虚拟地址中提取页号(VPN)(见图 19.1 第 1 行), 然后检查 TLB 是否有该 VPN 的转换映射(第 2 行)。如果有,我们有了 TLB 命中(TLB hit), 这意味着 TLB 有该页的转换映射。成功!
  • 接下来我们就可以从相关的 TLB 项中取出页帧号(PFN),与原来虚拟地址中的偏移量组合形成期望的物理地址(PA),并访问内存(第 5~7 行),假定保护检查没有失败(第 4 行)。
  • 如果 CPU 没有在 TLB 中找到转换映射(TLB 未命中),我们有一些工作要做。在本例中,硬件访问页表来寻找转换映射(第 11~12 行),并用该转换映射更新 TLB(第 18 行), 假设该虚拟地址有效,而且我们有相关的访问权限(第 13、15 行)。上述系列操作开销较大,主要是因为访问页表需要额外的内存引用(第 12 行)。最后,当 TLB 更新成功后,系统会重新尝试该指令,这时 TLB 中有了这个转换映射,内存引用得到很快处理。

尽可能利用缓存:缓存是计算机系统在最基本的性能改进技术之一,一次又一次地用于让常见的情况更快

其背后的思想是利用指令和数据引用的局部性(如数组的密集存放,往往不命中的只有开头的那一个,随后几个便会命中)

时间局部性:最近访问过的指令或数据项可能很快会再次访问。想想循环中的循环变量或指令,它们被多次反复访问。

空间局部性:当程序访问内存地址 x 时,可能很快会访问邻近 x 的内存。想想遍历某种数组,访问一个接一个的元素。当然,这些性质取决于程序的特点,并不是绝对的定律,而更像是一种经验法则。

谁来处理 TLB 未命中

是硬件

为了做到这一点,硬件必须知道页表在内存中的确切位置,以及页表的确切格式。发生未命中时,硬件会“遍历”页表,找到正确的页表项,取出想要的转换映射,用它更新 TLB,并重试该指令。这种“旧”体系结构有硬件管理的 TLB,一个例子是 x86 架构,它采用固定的多级页表(multi-level page table)

更现代的体系结构(例如,MIPS;Sun 公司的 SPARC,都是RISC),有所谓的软件管理 TLB(software-managed TLB),发生 TLB 未命中时,硬件系统会抛出一个异常,这会暂停当前的指令流,将特权级提升至内核模式,跳转至陷阱处理程序(trap handler)。接下来,这个陷阱处理程序是操作系统的一段代码,用于处理 TLB 未命中。

这段代码在运行时,会查找页表中的转换映射,然后用特别的“特权”指令更新 TLB,并从陷阱返回。此时,硬件会重试该指令(导致 TLB 命中)。

几个重要的细节

  • 首先,这里的从陷阱返回指令稍稍不同于之前提到的服务于系统调用的从陷阱返回。在后一种情况下,从陷阱返回应该继续执行陷入操作系统之后那条指令,就像从函数调用返回后,会继续执行此次调用之后的语句。

  • 在前一种情况下,在从 TLB 未命中的陷阱返回后,硬件必须从导致陷阱的指令继续执行 。这次重试因此导致该指令再次执行,但这次会命中 TLB。因此,根据陷阱或异常的原因,系统在陷入内核时,必须保存不同的程序计数器,以便将来能够正确地继续执行。

  • 第二,在运行 TLB 未命中处理代码时,操作系统需要格外小心避免引起 TLB 未命中的无限递归。有很多解决方案,例如,可以把 TLB 未命中陷阱处理程序直接放到物理内存中[它们没有映射过(unmapped),不用经过地址转换]

  • 或者在 TLB 中保留一些项,记录永久有效的地址转换,并将其中一些永久地址转换槽块留给处理代码本身,这些被监听的(wired) 地址转换总是会命中 TLB。

TLB 的内容

我们来详细看一下硬件 TLB 中的内容。典型的 TLB 有 32 项、64 项或 128 项,并且是 全相联的(fully associative)。基本上,这就意味着一条地址映射可能存在 TLB 中的任意位置,硬件会并行地查找 TLB,找到期望的转换映射。

一条 TLB 项内容可能像下面这样:

VPN | PFN | 其他位

注意,VPN 和 PFN 同时存在于 TLB 中,因为一条地址映射可能出现在任意位置(用硬件的术语,TLB 被称为全相联的(fully-associative)缓存)。

注意:

  • TLB 的有效位,只是指出 TLB 项是不是有效的地址映射。
  • 页表的有效位,表示该页有没有被进程申请使用。

更有趣的是“其他位”。例如,TLB 通常有一个有效(valid)位,用来标识该项是不是有效地转换映射。通常还有一些保护(protection)位,用来标识该页是否有访问权限。例如,代码页被标识为可读和可执行,而堆的页被标识为可读和可写。还有其他一些位,包括地址空间标识符(address-space identifier)脏位(dirty bit)等。下面会介绍更多信息。

上下文切换时对 TLB 的处理

有了 TLB,在进程间切换时(因此有地址空间切换),会面临一些新问题。具体来说, TLB 中包含的虚拟到物理的地址映射只对当前进程有效,对其他进程是没有意义的。所以在发生进程切换时,硬件或操作系统(或二者)必须注意确保即将运行的进程不要误读了之前进程的地址映射。

  • 一种方法是在上下文切换时,简单地清空(flush)TLB,这样在新进程运行前 TLB 就变成了空的。如果是软件管理 TLB 的系统,可以在发生上下文切换时,通过一条显式(特权)指令来完成。如果是硬件管理 TLB,则可以在页表基址寄存器内容发生变化时清空 TLB(注意,在上下文切换时,操作系统必须改变页表基址寄存器(PTBR) 的值)。不论哪种情况,清空操作都是把全部有效位(valid)置为 0,本质上清空了 TLB。
  • 为了减少每次清空以后,之后多次不命中的开销,一些系统增加了硬件支持,实现跨上下文切换的 TLB 共享。比如 有的系统在 TLB 中添加了一个地址空间标识符(Address Space Identifier,ASID)
  • 可以把 ASID 看作是进程标识符(Process Identifier,PID),但通常比 PID 位数少(PID 一般 32 位, ASID 一般是 8 位)。

TLB 替换策略

TLB 和其他缓存一样,还有一个问题要考虑,即缓存替换(cache replacement)。具体来说,向 TLB 中插入新项时,会替换(replace)一个旧项,这样问题就来了:应该替换那一个?

  • 替换最近最少使用(least-recently-used,LRU)的项;
  • 随机(random)策略,即随机选择一项换出去。

实际系统的 TLB 表项

  • VPN:虚拟页号;
  • G:全局位(Global,G),用来指示这个页是不是所有进程全局共享的;
  • ASID:地址空间标识符;
  • PFN:物理帧号;
  • C:一致性位(Coherence,C),决定硬件如何缓存该页;
  • D:脏位,表示该页是否被写入新数据;
  • V:有效位,告诉硬件该项的地址映射是否有效。

第 20 章 分页:较小的表

多级页表

多级页表(multi-level pagetable)

多级页表的基本思想很简单。首先,将页表分成页大小的单元。然后,如果整页的页表项(PTE)无效,就完全不分配该页的页表。为了追踪页表的页是否有效(以及如果有效,它在内存中的位置),使用了名为页目录(page directory)的新结构。页目录因此可以告诉你页表的页在哪里,或者页表的整个页不包含有效页。

例子如下:

可见,左边的线性页表,多分配了两个空白的页,而右边的多级页表,其中两个完全无数据的空白页并未被分配。

应该指出,多级页表是有成本的。在 TLB 未命中时,需要从内存加载两次,才能从页表中获取正确的地址转换信息(一次用于页目录,另一次用于 PTE 本身)。

因此,多级表是一个时间—空间折中(time-space trade-off)的小例子。

另一个明显的缺点是复杂性。无论是硬件还是操作系统来处理页表查找(在 TLB 未命中时),这样做无疑都比简单的线性页表查找更复杂。

多级页表例子

如图,这是一个页目录(左边大竖列)以及页表(右边两大竖列)

最后,让我们用这些信息来进行地址转换。这里是一个地址,指向 VPN 254 的第 0 个字节:0x3F80,或二进制的 11 1111 1000 0000。

回想一下,我们将使用 VPN 的前 4 位来索引页目录。因此,1111 会从上面的页目录中选择最后一个(第 15 个,如果你从第 0 个开始)。

  • 这就指向了位于地址 101 的页表的有效页。
  • 然后,我们使用 VPN 的下 4 位(1110)来索引页表的那一页并找到所需的 PTE。1110 是页面中的倒数第二(第 14 个)条,并告诉我们虚拟地址空间的页 254 映射到物理页 55。
  • 通过连接 PFN = 55(或十六进制 0x37)和 offset = 000000,可以形成我们想要的物理地址,并向内存系统发出请求:PhysAddr =(PTE.PFN << SHIFT)+ offset = 00 1101 1100 0000 = 0x0DC0

第 26 章 并发:介绍

单个线程的状态与进程状态非常类似。线程有一个程序计数器(PC),记录程序从哪里获取指令。每个线程有自己的一组用于计算的寄存器。所以,如果有两个线程运行在一个处理器上,从运行一个线程(T1)切换到另一个线程(T2)时,必定发生上下文切换(context switch)。线程之间的上下文切换类似于进程间的上下文切换。对于进程,我们将状态保存到进程控制块(Process Control Block,PCB)。现在,我们需要一个或多个线程控制块(Thread Control Block,TCB),保存每个线程的状态。但是,与进程相比,线程之间的上下文切换有一点主要区别:地址空间保持不变(即不需要切换当前使用的页表)。

如图,线程也会让内存空间中出现多个栈。

线程创建

这是一个简单线程创建代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1 #include <stdio.h>
2 #include <assert.h>
3 #include <pthread.h>
4
5 void *mythread(void *arg) {
6 printf("%s\n", (char *) arg);
7 return NULL;
8 }
9
10 int
11 main(int argc, char *argv[]) {
12 pthread_t p1, p2;
13 int rc;
14 printf("main: begin\n");
15 rc = pthread_create(&p1, NULL, mythread, "A"); assert(rc == 0);
16 rc = pthread_create(&p2, NULL, mythread, "B"); assert(rc == 0);
17 // join waits for the threads to finish
18 rc = pthread_join(p1, NULL); assert(rc == 0);
19 rc = pthread_join(p2, NULL); assert(rc == 0);
20 printf("main: end\n");
21 return 0;
22 }

关于其中的 p1 p2 线程,其打印的顺序是不确定的。

竞态条件

竞态条件(race condition)

例如:两个线程都想让一个全局变量 count + 1;但两者执行到写入寄存器后,没来得及写入内存,就调用了另一个线程。

会导致最终程序达不到想要的效果。

第 27 章 插叙:线程API

关于如何使用线程的API

线程创建

1
2
3
4
pthread_create( pthread_t * thread,
const pthread_attr_t * attr,
void * (*start_routine)(void*),
void * arg);
  • 第一个参数 thread 是指向 pthread_t 结构体的指针;
  • 第二个参数 attr 用于指定该线程可能具有的任何属性。包括设置栈大小,或关于该线程调度优先级的信息。
  • 第三个参数,指这个线程应该在哪个函数中运行。这是一个函数指针(function pointer),这个指针告诉我们需要以下内容,一个函数名称(start_routine),它被传入一个类型为 void *的参数(start_routine 后面的括号表明了这一点),并且它返回一个 void *类型的值(即一个 void 指针)。
  • 第四个参数 arg 是要传递给线程开始执行的函数的参数。

为什么我们需要这些 void 指针?好吧,答案很简单:将 void 指针作为函数的参数 start_routine,允许我们传入任何类型的参数,将它作为返回值,允许线程返回任何类型的结果。

线程完成

具体来说,你必须调用函数 pthread_join();

该函数有两个参数。第一个是 pthread_t 类型,用于指定要等待的线程。这个变量是由线程创建函数初始化的(当你将一个指针作为参数传递给 pthread_create()时)。如果你保留了它,就可以用它来等待该线程终止。

第二个参数是一个指针,指向你希望得到的返回值。因为函数可以返回任何东西,所以它被定义为返回一个指向 void 的指针。因为 pthread_join()函数改变了传入参数的值,所以你需要传入一个指向该值的指针,而不只是该值本身。

除了线程创建和 join 之外,POSIX 线程库提供的最有用的函数集,可能是通过锁(lock)来提供互斥进入临界区的那些函数。这方面最基本的一对函数是:

1
2
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

锁的初始化

一种方法是使用 PTHREAD_MUTEX_ INITIALIZER

1
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

这样做会将锁设置为默认值,从而使锁可用。

初始化的动态方法(即在运行时)是调用 pthread_mutex_init(),如下所示:

1
2
int rc = pthread_mutex_init(&lock, NULL);
assert(rc == 0); // always check success!

此函数的第一个参数是锁本身的地址,而第二个参数是一组可选属性。

传入 NULL 就是使用默认值。

请注注,当你用完锁时,还应该相应地调用 pthread_mutex_destroy()

上述代码的第二个问题是在调用获取锁和释放锁时没有检查错误代码。

1
2
3
4
void Pthread_mutex_lock(pthread_mutex_t *mutex) {
int rc = pthread_mutex_lock(mutex);
assert(rc == 0);
}

条件变量

所有线程库还有一个主要组件,就是存在一个条件变量(condition variable)

当线程之间必须发生某种信号时,如果一个线程在等待另一个线程继续执行某些操作,条件变量就很有用。

希望以这种方式进行交互的程序使用两个主要函数:

1
2
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);

要使用条件变量,必须另外有一个与此条件相关的锁。在调用上述任何一个函数时,应该持有这个锁。

第一个函数 pthread_cond_wait()使调用线程进入休眠状态,因此等待其他线程发出信号,通常当程序中的某些内容发生变化时,现在正在休眠的线程可能会关心它。典型的用法如下所示:

1
2
3
4
5
6
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
Pthread_mutex_lock(&lock);
while (ready == 0)
Pthread_cond_wait(&cond, &lock);
Pthread_mutex_unlock(&lock);

在这段代码中,在初始化相关的锁和条件之后,一个线程检查变量 ready 是否已经被设置 为零以外的值。如果没有,那么线程只是简单地调用等待函数以便休眠,直到其他线程唤醒它。

唤醒线程的代码运行在另外某个线程中,像下面这样:

1
2
3
4
Pthread_mutex_lock(&lock);
ready = 1;
Pthread_cond_signal(&cond);
Pthread_mutex_unlock(&lock);

最后一点需要注注:等待线程在 while 循环中重新检查条件,而不是简单的 if 语句

第 28 章 锁

锁的基本思想

举个例子,假设临界区像这样,典型的更新共享变量:

1
balance = balance + 1;

当然,其他临界区也是可能的,比如为链表增加一个元素,或对共享结构的复杂更新 操作。为了使用锁,我们给临界区增加了这样一些代码:

1
2
3
4
5
lock_t mutex; // some globally-allocated lock 'mutex'
...
lock(&mutex);
balance = balance + 1;
unlock(&mutex);

锁就是一个变量,因此我们需要声明一个某种类型的锁变量(lock variable,如上面的 mutex),才能使用。这个锁变量(简称锁)保存了锁在某一时刻的状态。它要么是可用的 (available,或 unlocked,或 free),表示没有线程持有锁,要么是被占用的(acquired,或 locked, 或 held),表示有一个线程持有锁,正处于临界区。

  • 使用lock()尝试获取锁,进入临界区。

  • 如果另外一个线程对相同的锁变量(本例中的 mutex)调用 lock(),因为锁被另一线程持有, 该调用不会返回。这样,当持有锁的线程在临界区时,其他线程就无法进入临界区。

  • 锁的持有者一旦调用**unlock()**,锁就变成可用了。如果没有其他等待线程(即没有其他 线程调用过 lock()并卡在那里),锁的状态就变成可用了。

Pthread 锁

POSIX 库将锁称为互斥量(mutex),因为它被用来提供线程之间的互斥。即当一个线程在临界区,它能够阻止其他线程进入直到本线程离开临界区。因此,如果你看到下面的 POSIX 线程代码,应该理解它和上面的代码段执行相同的任务(我们再次使用了包装函数来检查获取锁和释放锁时的错误)。

1
2
3
4
5
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

Pthread_mutex_lock(&lock); // wrapper for pthread_mutex_lock()
balance = balance + 1;
Pthread_mutex_unlock(&lock);

你可能还会注意到,POSIX 的 lock 和 unlock 函数会传入一个变量,因为我们可能用不同的锁来保护不同的变量。这样可以增加并发:不同于任何临界区都使用同一个大锁(粗粒度的锁策略),通常大家会用不同的锁保护不同的数据和结构,从而允许更多的线程进入临界区(细粒度的方案)。

评价锁

  • 完成基本任务:提供互斥(mutual exclusion),能够阻止多个线程 进入临界区;
  • 公平性(fairness):每一个竞争线程有公平的机会抢到锁,防止饿死(starve)即:一直无法获得锁;
  • 性能(performance):使用锁之后增加的时间开销。

控制中断

最早提供的互斥解决方案之一,就是在临界区关闭中断。

1
2
3
4
5
6
void lock() {
DisableInterrupts();
}
void unlock() {
EnableInterrupts();
}

缺点如下:

  • 这种方法要求我们允许所有调用线程执行特权操作(打开关闭中断),即信任这种机制不会被滥用。

  • 这种方案不支持多处理器。如果多个线程运行在不同的 CPU 上,每个线程都试 图进入同一个临界区,关闭中断也没有作用。线程可以运行在其他处理器上,因此能够进 入临界区。多处理器已经很普遍了,我们的通用解决方案需要更好一些。

  • 关闭中断导致中断丢失,可能会导致严重的系统问题。

测试并设置指令(原子交换)

最简单的硬件支持是测试并设置指令(test-and-set instruction),也叫作原子交换(atomic exchange)

实现可用的自旋锁

一些系统提供了这一指令,支持基于这种概念创建简单的锁。这个更强大的指令有不同的名字:在 SPARC 上,这个指令叫 ldstub(load/store unsigned byte,加载/保存无符号字节);在 x86 上,是 xchg (atomic exchange,原子交换)指令。但它们基本上在不同的平台上做同样的事,通常称为测试并设置指令(test-and-set)。我们用如下的 C 代码片段来定义测试并设置指令做了什么:

1
2
3
4
5
int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr; // fetch old value at old_ptr
*old_ptr = new; // store 'new' into old_ptr
return old; // return the old value
}

测试并设置指令做了下述事情。它返回 old_ptr 指向的旧值,同时更新为 new 的新值。 当然,关键是这些代码是原子地(atomically)执行。因为既可以测试旧值,又可以设置新值,所以我们把这条指令叫作“测试并设置”。这一条指令完全可以实现一个简单的自旋锁 (spin lock)

很重要的是,上面的C代码做到的事,硬件提供了其原子性实现的功能!

以下是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1 typedef struct lock_t {
2 int flag;
3 } lock_t;
4
5 void init(lock_t *lock) {
6 // 0 indicates that lock is available, 1 that it is held
7 lock->flag = 0;
8 }
9
10 void lock(lock_t *lock) {
11 while (TestAndSet(&lock->flag, 1) == 1)
12 ; // spin-wait (do nothing)
13 }
14
15 void unlock(lock_t *lock) {
16 lock->flag = 0;
17 }

我们来确保理解为什么这个锁能工作。首先假设一个线程在运行,调用 lock(),没有其他线程持有锁,所以 flag 是 0。当调用 TestAndSet(flag, 1)方法,返回 0,线程会跳出 while 循环,获取锁。同时也会原子的设置 flag 为 1,标志锁已经被持有。当线程离开临界区,调用 unlock()将 flag 清理为 0。

评价自旋锁

  • 正确性:√
  • 公平性:–(无法保证)
  • 性能:单CPU:不行;多CPU:还行。

比较并交换

比较并交换指令(SPARC 系统中是 compare-and-swap, x86 系统是 compare-and-exchange)。

1
2
3
4
5
6
int CompareAndSwap(int *ptr, int expected, int new) {
int actual = *ptr;
if (actual == expected)
*ptr = new;
return actual;
}

比较并交换的基本思路是检测 ptr 指向的值是否和 expected 相等;如果是,更新 ptr 所 指的值为新值。否则,什么也不做。不论哪种情况,都返回该内存地址的实际值,让调用者能够知道执行是否成功。

最后,你可能会发现,比较并交换指令比测试并设置更强大。当我们在将来简单探讨无等待同步(wait-free synchronization)时,会用到这条指令的强大之处。然而,如果 只用它实现一个简单的自旋锁,它的行为等价于上面分析的自旋锁。

链接的加载和条件式存储指令

链接的加载 (load-linked)条件式存储(store-conditional)可以用来配合使用,实现其他并发结构。

以下是这些指令的 C 语言伪代码。

1
2
3
4
5
6
7
8
9
10
11
12
1 int LoadLinked(int *ptr) {
2 return *ptr;
3 }
4
5 int StoreConditional(int *ptr, int value) {
6 if (no one has updated *ptr since the LoadLinked to this address) {
7 *ptr = value;
8 return 1; // success!
9 } else {
10 return 0; // failed to update
11 }
12 }

链接的加载指令和典型加载指令类似,都是从内存中取出值存入一个寄存器。关键区别来自条件式存储(store-conditional)指令,只有上一次加载的地址在期间都没有更新时, 才会成功,(同时更新刚才链接的加载的地址的值)。成功时,条件存储返回 1,并将 ptr 指 的值更新为 value。失败时,返回 0,并且不会更新值。

请注意条件式存储失败是如何发生的。一个线程调用 lock(),执行了链接的加载指令, 返回 0。在执行条件式存储之前,中断产生了,另一个线程进入 lock 的代码,也执行链接式加载指令,同样返回 0。现在,两个线程都执行了链接式加载指令,将要执行条件存储。重点是只有一个线程能够成功更新标志为 1,从而获得锁;第二个执行条件存储的线程会失败(因为另一个线程已经成功执行了条件更新),必须重新尝试获取锁。

获取并增加

最后一个硬件原语是获取并增加(fetch-and-add)指令,它能原子地返回特定地址的旧 值,并且让该值自增一。获取并增加的 C 语言伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1 int FetchAndAdd(int *ptr) {
2 int old = *ptr;
3 *ptr = old + 1;
4 return old;
5 }
1 typedef struct lock_t {
2 int ticket;
3 int turn;
4 } lock_t;
5
6 void lock_init(lock_t *lock) {
7 lock->ticket = 0;
8 lock->turn = 0;
9 }
10
11 void lock(lock_t *lock) {
12 int myturn = FetchAndAdd(&lock->ticket);
13 while (lock->turn != myturn)
14 ; // spin
15 }
16
17 void unlock(lock_t *lock) {
18 FetchAndAdd(&lock->turn);
19 }

不是用一个值,这个解决方案使用了 ticket 和 turn 变量来构建锁。基本操作也很简单: 如果线程希望获取锁,首先对一个 ticket 值执行一个原子的获取并相加指令。这个值作为该线程的“turn”(顺位,即 myturn)。根据全局共享的 lock->turn 变量,当某一个线程的(myturn == turn)时,则轮到这个线程进入临界区。unlock 则是增加 turn,从而下一个等待线程可以 进入临界区。

不同于之前的方法:本方法能够保证所有线程都能抢到锁。只要一个线程获得了 ticket 值,它最终会被调度。

自选过多带来的性能低下

即除了当前持有锁的那个线程外,其余线程的时间片,均在自旋;

考虑如果有 N 个线程去竞争一个锁,情况很糟糕。同样的场景下,会浪费 N−1 个时间片,只是自旋并等待一个线程释放该锁。

简单方法

第一种简单友好的方法就是,在要自旋的时候,放弃 CPU。

1
2
3
4
5
6
7
8
9
10
11
12
1 void init() {
2 flag = 0;
3 }
4
5 void lock() {
6 while (TestAndSet(&flag, 1) == 1)
7 yield(); // give up the CPU
8 }
9
10 void unlock() {
11 flag = 0;
12 }

考虑在单 CPU 上运行两个线程。在这个例子中,基于 yield 的方法十分有效。一个线程调用 lock(),发现锁被占用时,让出 CPU,另外一个线程运行,完成临界区。在这个简单的例子中,让出方法工作得非常好。

现在来考虑许多线程(例如 100 个)反复竞争一把锁的情况。在这种情况下,一个线程持有锁,在释放锁之前被抢占,其他 99 个线程分别调用 lock(),发现锁被抢占,然后让出 CPU。假定采用某种轮转调度程序,这 99 个线程会一直处于运行—让出这种模式,直到持有锁的线程再次运行。虽然比原来的浪费 99 个时间片的自旋方案要好,但这种方法仍然成本很高,上下文切换的成本是实实在在的,因此浪费很大。

使用队列:休眠代替自选

前面一些方法的真正问题是存在太多的偶然性。调度程序决定如何调度。如果调度不 合理,线程或者一直自旋(第一种方法),或者立刻让出 CPU(第二种方法)。无论哪种方 法,都可能造成浪费,也能防止饿死。

因此,我们必须显式地施加某种控制,决定锁释放时,谁能抢到锁。为了做到这一点, 我们需要操作系统的更多支持,并需要一个队列来保存等待锁的线程。

简单起见,我们利用 Solaris 提供的支持,它提供了两个调用:park()能够让调用线程休 眠,unpark(threadID)则会唤醒 threadID 标识的线程。可以用这两个调用来实现锁,让调用 者在获取不到锁时睡眠,在锁可用时被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1 typedef struct lock_t {
2 int flag;
3 int guard;
4 queue_t *q;
5 } lock_t;
6
7 void lock_init(lock_t *m) {
8 m->flag = 0;
9 m->guard = 0;
10 queue_init(m->q);
11 }
12
13 void lock(lock_t *m) {
14 while (TestAndSet(&m->guard, 1) == 1)
15 ; //acquire guard lock by spinning
16 if (m->flag == 0) {
17 m->flag = 1; // lock is acquired
18 m->guard = 0;
19 } else {
20 queue_add(m->q, gettid());
21 m->guard = 0;
22 park();
23 }
24 }
25
26 void unlock(lock_t *m) {
27 while (TestAndSet(&m->guard, 1) == 1)
28 ; //acquire guard lock by spinning
29 if (queue_empty(m->q))
30 m->flag = 0; // let go of lock; no one wants it
31 else
32 unpark(queue_remove(m->q)); // hold lock (for next thread!)
33 m->guard = 0;
34 }

在这个例子中,我们做了两件有趣的事。首先,我们将之前的测试并设置和等待队列 结合,实现了一个更高性能的锁。其次,我们通过队列来控制谁会获得锁,避免饿死。

你可能注意到,guard 基本上起到了自旋锁的作用,围绕着 flag 和队列操作。因此,这 个方法并没有完全避免自旋等待。线程在获取锁或者释放锁时可能被中断,从而导致其他 线程自旋等待。但是,这个自旋等待时间是很有限的(不是用户定义的临界区,只是在 lock 和 unlock 代码中的几个指令),因此,这种方法也许是合理的。

第二点,你可能注意到在 lock()函数中,如果线程不能获取锁(它已被持有),线程会 把自己加入队列(通过调用 gettid()获得当前的线程 ID),将 guard 设置为 0,然后让出 CPU。

你还可能注意到了很有趣一点,当要唤醒另一个线程时,flag 并没有设置为 0。为什么 呢?其实这不是错,而是必须的!线程被唤醒时,就像是从 park()调用返回。但是,此时它 没有持有 guard,所以也不能将 flag 设置为 1。因此,我们就直接把锁从释放的线程传递给 下一个获得锁的线程,期间 flag 不必设置为 0。

最后,你可能注意到解决方案中的竞争条件,就在 park()调用之前。如果不凑巧,一个 线程将要 park,假定它应该睡到锁可用时。这时切换到另一个线程(比如持有锁的线程), 这可能会导致麻烦。比如,如果该线程随后释放了锁。接下来第一个线程的 park 会永远睡 下去(可能)。这种问题有时称为唤醒/等待竞争(wakeup/waiting race)。为了避免这种情况, 我们需要额外的工作。

Solaris 通过增加了第三个系统调用 separk()来解决这一问题。通过 setpark(),一个线程 表明自己马上要 park。如果刚好另一个线程被调度,并且调用了 unpark,那么后续的 park 调用就会直接返回,而不是一直睡眠。lock()调用可以做一点小修改:

1
2
3
1 queue_add(m->q, gettid());
2 setpark(); // new code
3 m->guard = 0;

第 29 章 基于锁的并发数据结构

并发计数器

懒惰计数器

  • 有一个全局计数器;
  • 有多个局部计数器;
  • 有一个S阈值,每当局部计数器达到 S ,向全局计数器加,然后该局部计时器置零。
  • 每个局部计数器有一个锁,全局计数器有一个。

并发链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1 void List_Init(list_t *L) {
2 L->head = NULL;
3 pthread_mutex_init(&L->lock, NULL);
4 }
5
6 void List_Insert(list_t *L, int key) {
7 // synchronization not needed
8 node_t *new = malloc(sizeof(node_t));
9 if (new == NULL) {
10 perror("malloc");
11 return;
12 }
13 new->key = key;
14
15 // just lock critical section
16 pthread_mutex_lock(&L->lock);
17 new->next = L->head;
18 L->head = new;
19 pthread_mutex_unlock(&L->lock);
20 }
21
22 int List_Lookup(list_t *L, int key) {
23 int rv = -1;
24 pthread_mutex_lock(&L->lock);
25 node_t *curr = L->head;
26 while (curr) {
27 if (curr->key == key) {
28 rv = 0;
29 break;
30 }
31 curr = curr->next;
32 }
33 pthread_mutex_unlock(&L->lock);
34 return rv; // now both success and failure
35 }

过手锁(hand-over-hand locking,也叫作锁耦合, lock coupling)

每个节点都有一个锁,替代之前整个链表一个锁。遍历链表的时候,首先抢占下一个节点的锁,然后释放当前节点的锁。

从概念上说,过手锁链表有点道理,它增加了链表操作的并发程度。但是实际上,在遍历的时候,每个节点获取锁、释放锁的开销巨大,很难比单锁的方法快。即使有大量的线程和很大的链表,这种并发的方案也不一定会比单锁的方案快。

并发队列

并发散列表

第 30 章 条件变量

到目前为止,我们已经形成了锁的概念,看到了如何通过硬件和操作系统支持的正确组合来实现锁。然而,锁并不是并发程序设计所需的唯一原语。

具体来说,在很多情况下,线程需要检查某一条件(condition)满足之后,才会继续运行。

定义

线程可以使用条件变量(condition variable),来等待一个条件变成真。条件变量是一个显式队列,当某些执行状态(即条件,condition)不满足时,线程可以把自己加入队列,等待(waiting)该条件。另外某个线程,当它改变了上述状态时,就可以唤醒一个或者多个等待线程(通过在该条件上发信号),让它们继续执行。

要声明这样的条件变量,只要像这样写:**pthread_cond_t c;,这里声明 c 是一个条件变量(注意:还需要适当的初始化)。条件变量有两种相关操作:wait()和 signal()**。线程要睡眠的时候,调用 wait()。当线程想唤醒等待在某个条件变量上的睡眠线程时,调用 **signal()**。

如父进程等待子进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1 int done = 0;
2 pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
3 pthread_cond_t c = PTHREAD_COND_INITIALIZER;
4
5 void thr_exit() {
6 Pthread_mutex_lock(&m);
7 done = 1;
8 Pthread_cond_signal(&c);
9 Pthread_mutex_unlock(&m);
10 }
11
12 void *child(void *arg) {
13 printf("child\n");
14 thr_exit();
15 return NULL;
16 }
17
18 void thr_join() {
19 Pthread_mutex_lock(&m);
20 while (done == 0)
21 Pthread_cond_wait(&c, &m);
22 Pthread_mutex_unlock(&m);
23 }
24
25 int main(int argc, char *argv[]) {
26 printf("parent: begin\n");
27 pthread_t p;
28 Pthread_create(&p, NULL, child, NULL);
29 thr_join();
30 printf("parent: end\n");
31 return 0;
32 }

有两种情况需要考虑。第一种情况是父线程创建出子线程,但自己继续运行(假设只有一个处理器),然后马上调用 thr_join()等待子线程。在这种情况下,它会先获取锁,检查子进程是否完成(还没有完成),然后调用 wait(),让自己休眠。子线程最终得以运行,打印出“child”,并调用 thr_exit()函数唤醒父进程,这段代码会在获得锁后设置状态变量 done,然后向父线程发信号唤醒它。最后,父线程会运行(从 wait()调用返回并持有锁),释放锁,打印出“parent:end”。

第二种情况是,子线程在创建后,立刻运行,设置变量 done 为 1,调用 signal 函数唤醒其他线程(这里没有其他线程),然后结束。父线程运行后,调用 thr_join()时,发现 done 已经是 1 了,就直接返回。

最后一点说明:你可能看到父线程使用了一个 while 循环,而不是 if 语句来判断是否需要等待。虽然从逻辑上来说没有必要使用循环语句,但这样做总是好的

如果没有变量 done:

1
2
3
4
5
6
7
8
9
10
11
1 void thr_exit() {
2 Pthread_mutex_lock(&m);
3 Pthread_cond_signal(&c);
4 Pthread_mutex_unlock(&m);
5 }
6
7 void thr_join() {
8 Pthread_mutex_lock(&m);
9 Pthread_cond_wait(&c, &m);
10 Pthread_mutex_unlock(&m);
11 }

这段代码是有问题的。假设子线程立刻运行,并且调用 thr_exit()。在这种情况下,子 线程发送信号,但此时却没有在条件变量上睡眠等待的线程。父线程运行时,就会调用 wait 并卡在那里,没有其他线程会唤醒它。通过这个例子,你应该认识到变量 done 的重要性, 它记录了线程有兴趣知道的值。睡眠、唤醒和锁都离不开它。

如果没有锁:

1
2
3
4
5
6
7
8
9
1 void thr_exit() {
2 done = 1;
3 Pthread_cond_signal(&c);
4 }
5
6 void thr_join() {
7 if (done == 0)
8 Pthread_cond_wait(&c);
9 }

这里的问题是一个微妙的竞态条件。具体来说,如果父进程调用 thr_join(),然后检查完 done 的值为 0,然后试图睡眠。但在调用 wait 进入睡眠之前,父进程被中断。子线程修改变量 done 为 1,发出信号,同样没有等待线程。父线程再次运行时,就会长眠不醒,这就惨了。

消费者/生产者(有界缓冲区)问题

第一版:唤醒后,运行前,缓冲区状态改变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1 cond_t cond;
2 mutex_t mutex;
3
4 void *producer(void *arg) {
5 int i;
6 for (i = 0; i < loops; i++) {
7 Pthread_mutex_lock(&mutex); // p1
8 if (count == 1) // p2
9 Pthread_cond_wait(&cond, &mutex); // p3
10 put(i); // p4
11 Pthread_cond_signal(&cond); // p5
12 Pthread_mutex_unlock(&mutex); // p6
13 }
14 }
15
16 void *consumer(void *arg) {
17 int i;
18 for (i = 0; i < loops; i++) {
19 Pthread_mutex_lock(&mutex); // c1
20 if (count == 0) // c2
21 Pthread_cond_wait(&cond, &mutex); // c3
22 int tmp = get(); // c4
23 Pthread_cond_signal(&cond); // c5
24 Pthread_mutex_unlock(&mutex); // c6
25 printf("%d\n", tmp);
26 }
27 }

我们来理解第一个问题,它与等待之前的 if 语句有关。假设有两个消费者(Tc1 和 Tc2), 一个生产者(Tp)。首先,一个消费者(Tc1)先开始执行,它获得锁(c1),检查缓冲区是 否可以消费(c2),然后等待(c3)(这会释放锁)。

接着生产者(Tp)运行。它获取锁(p1),检查缓冲区是否满(p2),发现没满就给缓冲 区加入一个数字(p4)。然后生产者发出信号,说缓冲区已满(p5)。关键的是,这让第一 个消费者(Tc1)不再睡在条件变量上,进入就绪队列。Tc1 现在可以运行(但还未运行)。 生产者继续执行,直到发现缓冲区满后睡眠(p6,p1-p3)。

这时问题发生了:另一个消费者(Tc2)抢先执行,消费了缓冲区中的值(c1,c2,c4,c5,c6, 跳过了 c3 的等待,因为缓冲区是满的)。现在假设 Tc1 运行,在从 wait 返回之前,它获取了锁,然后返回。然后它调用了 get() (p4),但缓冲区已无法消费!断言触发,代码不能像预 期那样工作。

Mesa语义

发信号给线程只是唤醒它们,暗示状态发生了变化(在这个例子中,就是值 已被放入缓冲区),但并不会保证在它运行之前状态一直是期望的情况。

第二版:使用 while 代替 if

把 if 语句改为 while。当消费者 Tc1 被唤 醒后,立刻再次检查共享变量(c2)。如果缓冲区此时为空,消费者就会回去继续睡眠(c3)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1 cond_t cond;
2 mutex_t mutex;
3
4 void *producer(void *arg) {
5 int i;
6 for (i = 0; i < loops; i++) {
7 Pthread_mutex_lock(&mutex); // p1
8 while (count == 1) // p2
9 Pthread_cond_wait(&cond, &mutex); // p3
10 put(i); // p4
11 Pthread_cond_signal(&cond); // p5
12 Pthread_mutex_unlock(&mutex); // p6
13 }
14 }
15
16 void *consumer(void *arg) {
17 int i;
18 for (i = 0; i < loops; i++) {
19 Pthread_mutex_lock(&mutex); // c1
20 while (count == 0) // c2
21 Pthread_cond_wait(&cond, &mutex); // c3
22 int tmp = get(); // c4
23 Pthread_cond_signal(&cond); // c5
24 Pthread_mutex_unlock(&mutex); // c6
25 printf("%d\n", tmp);
26 }
27 }

由于 Mesa 语义,我们要记住一条关于条件变量的简单规则:总是使用 while 循环(always use while loop)。虽然有时候不需要重新检查条件,但这样做总是安全的,做了就开心了。

还有的问题是:

使用两个条件变量,而不是一个,以便正确地发出信号,在系统 状态改变时,哪类线程应该唤醒。

假设两个消费者(Tc1 和 Tc2)先运行,都睡眠了(c3)。 生产者开始运行,在缓冲区放入一个值,唤醒了一个消费者(假定是 Tc1),并开始睡眠。现 在是一个消费者马上要运行(Tc1),两个线程(Tc2 和 Tp)都等待在同一个条件变量上。问题马上就要出现了:让人感到兴奋! 消费者 Tc1 醒过来并从 wait()调用返回(c3),重新检查条件(c2),发现缓冲区是满的, 消费了这个值(c4)。这个消费者然后在该条件上发信号(c5),唤醒一个在睡眠的线程。但 是,应该唤醒哪个线程呢? 因为消费者已经清空了缓冲区,很显然,应该唤醒生产者。但是,如果它唤醒了 Tc2(这 绝对是可能的,取决于等待队列是如何管理的),问题就出现了。具体来说,消费者 Tc2 会醒过来,发现队列为空(c2),又继续回去睡眠(c3)。生产者 Tp 刚才在缓冲区中放了一个值,现在在睡眠。另一个消费者线程 Tc1 也回去睡眠了。3 个线程都在睡眠,显然是一个缺陷。由表 30.2 可以看到这个可怕灾难的步骤。

第三版:两个条件变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1 cond_t empty, fill;
2 mutex_t mutex;
3
4 void *producer(void *arg) {
5 int i;
6 for (i = 0; i < loops; i++) {
7 Pthread_mutex_lock(&mutex);
8 while (count == 1)
9 Pthread_cond_wait(&empty, &mutex);
10 put(i);
11 Pthread_cond_signal(&fill);
12 Pthread_mutex_unlock(&mutex);
13 }
14 }
15
16 void *consumer(void *arg) {
17 int i;
18 for (i = 0; i < loops; i++) {
19 Pthread_mutex_lock(&mutex);
20 while (count == 0)
21 Pthread_cond_wait(&fill, &mutex);
22 int tmp = get();
23 Pthread_cond_signal(&empty);
24 Pthread_mutex_unlock(&mutex);
25 printf("%d\n", tmp);
26 }
27 }

在上述代码中,生产者线程等待条件变量 empty,发信号给变量 fill。相应地,消费者 线程等待 fill,发信号给 empty。这样做,从设计上避免了上述第二个问题:消费者再也不 会唤醒消费者,生产者也不会唤醒生产者。

最终方案

我们现在有了可用的生产者/消费者方案,但不太通用。我们最后的修改是提高并发和 效率。具体来说,增加更多缓冲区槽位,这样在睡眠之前,可以生产多个值。同样,睡眠 之前可以消费多个值。单个生产者和消费者时,这种方案因为上下文切换少,提高了效率。 多个生产者和消费者时,它甚至支持并发生产和消费,从而提高了并发。幸运的是,和现 有方案相比,改动也很小。

第一处修改是缓冲区结构本身,以及对应的 put()和 get()方法(见图 30.9)。我们还稍稍 修改了生产者和消费者的检查条件,以便决定是否要睡眠。图 30.10 展示了最终的等待和信 号逻辑。生产者只有在缓冲区满了的时候才会睡眠(p2),消费者也只有在队列为空的时候 睡眠(c2)。至此,我们解决了生产者/消费者问题。

最终的get和put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1 int buffer[MAX];
2 int fill = 0;
3 int use = 0;
4 int count = 0;
5
6 void put(int value) {
7 buffer[fill] = value;
8 fill = (fill + 1) % MAX;
9 count++;
10 }
11
12 int get() {
13 int tmp = buffer[use];
14 use = (use + 1) % MAX;
15 count--;
16 return tmp;
17 }

最终有效方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1 cond_t empty, fill;
2 mutex_t mutex;
3
4 void *producer(void *arg) {
5 int i;
6 for (i = 0; i < loops; i++) {
7 Pthread_mutex_lock(&mutex); // p1
8 while (count == MAX) // p2
9 Pthread_cond_wait(&empty, &mutex); // p3
10 put(i); // p4
11 Pthread_cond_signal(&fill); // p5
12 Pthread_mutex_unlock(&mutex); // p6
13 }
14 }
15
16 void *consumer(void *arg) {
17 int i;
18 for (i = 0; i < loops; i++) {
19 Pthread_mutex_lock(&mutex); // c1
20 while (count == 0) // c2
21 Pthread_cond_wait(&fill, &mutex); // c3
22 int tmp = get(); // c4
23 Pthread_cond_signal(&empty); // c5
24 Pthread_mutex_unlock(&mutex); // c6
25 printf("%d\n", tmp);
26 }
27 }

覆盖条件

考虑以下场景。假设目前没有空闲内存,线程 Ta 调用 allocate(100),接着线程 Tb 请求 较少的内存,调用 allocate(10)。Ta和 Tb 都等待在条件上并睡眠,没有足够的空闲内存来满 足它们的请求。 这时,假定第三个线程 Tc调用了 free(50)。遗憾的是,当它发信号唤醒等待线程时,可 能不会唤醒申请 10 字节的 Tb 线程。而 Ta 线程由于内存不够,仍然等待。因为不知道唤醒 哪个(或哪些)线程,所以图中代码无法正常工作。

Lampson 和 Redell 的解决方案也很直接:用 pthread_cond_broadcast()代替上述代码中的 pthread_cond_signal(),唤醒所有的等待线程。这样做,确保了所有应该唤醒的线程都被唤 醒。当然,不利的一面是可能会影响性能,因为不必要地唤醒了其他许多等待的线程,它 们本来(还)不应该被唤醒。这些线程被唤醒后,重新检查条件,马上再次睡眠。

Lampson 和 Redell 把这种条件变量叫作覆盖条件(covering condition),因为它能覆盖所有需要唤醒线程的场景(保守策略)。

第 31 章 信号量

定义

信号量是有一个整数值的对象,可以用两个函数来操作它。在 POSIX 标准中,是 sem_wait()和 sem_post()①。因为信号量的初始值能够决定其行为,所以首先要初始化信号量, 才能调用其他函数与之交互

1
2
3
1 #include <semaphore.h>
2 sem_t s;
3 sem_init(&s, 0, 1);

其中申明了一个信号量 s,通过第三个参数,将它的值初始化为 1。sem_init()的第二个 参数,在我们看到的所有例子中都设置为 0,表示信号量是在同一进程的多个线程共享的。 读者可以参考手册,了解信号量的其他用法(即如何用于跨不同进程的同步访问),这要求 第二个参数用不同的值。

若信号量小于0,则等待

wait会让信号量加一;

post会减一。

二值信号量(锁)

我们直接把临界区用一对 sem_wait()/sem_post()环绕。但是,为 了使这段代码正常工作,信号量 m 的初始值(图中初始化为 X)是至关重要的。X 应该是多少呢?

1
2
3
4
5
6
1 sem_t m;
2 sem_init(&m, 0, X); // initialize semaphore to X; what should X be?
3
4 sem_wait(&m);
5 // critical section here
6 sem_post(&m);

回顾 sem_wait()和 sem_post()函数的定义,我们发现初值应该是 1。

  • 为了说明清楚,我们假设有两个线程的场景。第一个线程(线程 0)调用了 sem_wait(), 它把信号量的值减为 0。

  • 然后,它只会在值小于 0 时等待。因为值是 0,调用线程从函数返 回并继续,线程 0 现在可以自由进入临界区。

  • 线程 0 在临界区中,如果没有其他线程尝试 获取锁,当它调用 sem_post()时,会将信号量重置为 1(因为没有等待线程,不会唤醒其他 线程)。

如果线程 0 持有锁(即调用了 sem_wait()之后,调用 sem_post()之前),另一个线程(线 程 1)调用 sem_wait()尝试进入临界区,那么更有趣的情况就发生了。这种情况下,线程 1 把信号量减为−1,然后等待(自己睡眠,放弃处理器)。线程 0 再次运行,它最终调用 sem_post(),将信号量的值增加到 0唤醒等待的线程(线程 1),然后线程 1 就可以获取锁。 线程 1 执行结束时,再次增加信号量的值,将它恢复为 1。

用信号量做条件变量

初始值为0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1 sem_t s;
2
3 void *
4 child(void *arg) {
5 printf("child\n");
6 sem_post(&s); // signal here: child is done
7 return NULL;
8 }
9
10 int
11 main(int argc, char *argv[]) {
12 sem_init(&s, 0, X); // what should X be?
13 printf("parent: begin\n");
14 pthread_t c;
15 Pthread_create(c, NULL, child, NULL);
16 sem_wait(&s); // wait here for child
17 printf("parent: end\n");
18 return 0;
19 }

有两种情况需要考虑。第一种,父线程创建了子线程,但是子线程并没有运行。

  • 父线程调用 sem_wait()会先于子线程调用 sem_post()。
  • 我们希望父线程等待子线程运行。
  • 为此,唯一的办法是让信号量的值 不大于 0。因此,0 为初值。父线程运行,将信号量减为−1,然后睡眠等待
  • 子线程运行的 时候,调用 sem_post(),信号量增加为 0,唤醒父线程,父线程然后从 sem_wait()返回,完 成该程序。

第二种,子线程直接运行

  • 在这种情况下,子线程会先调用 sem_post(),将信号量从 0 增加到 1。
  • 然后当父线程有机会运行时,会调用 sem_wait(),发现信号量的值为 1。
  • 于是父线程将信号量从 1 减为 0,没有等待,直接从 sem_wait()返回,也达到了预期效果。

生产者/消费者(有界缓冲区)问题

第一次尝试

第一次尝试解决该问题时,我们用两个信号量 empty 和 full 分别表示缓冲区空或者满。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1 int buffer[MAX];
2 int fill = 0;
3 int use = 0;
4
5 void put(int value) {
6 buffer[fill] = value; // line f1
7 fill = (fill + 1) % MAX; // line f2
8 }
9
10 int get() {
11 int tmp = buffer[use]; // line g1
12 use = (use + 1) % MAX; // line g2
13 return tmp;
14 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1 sem_t empty;
2 sem_t full;
3
4 void *producer(void *arg) {
5 int i;
6 for (i = 0; i < loops; i++) {
7 sem_wait(&empty); // line P1
8 put(i); // line P2
9 sem_post(&full); // line P3
10 }
11 }
12
13 void *consumer(void *arg) {
14 int i, tmp = 0;
15 while (tmp != -1) {
16 sem_wait(&full); // line C1
17 tmp = get(); // line C2
18 sem_post(&empty); // line C3
19 printf("%d\n", tmp);
20 }
21 }
22
23 int main(int argc, char *argv[]) {
24 // ...
25 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with...
26 sem_init(&full, 0, 0); // ... and 0 are full
27 // ...
28 }

本例中,生产者等待缓冲区为空,然后加入数据。类似地,消费者等待缓冲区变成有数据的状态,然后取走数据。我们先假设 MAX=1(数组中只有一个缓冲区),验证程序是否有效。

假设有两个线程,一个生产者和一个消费者。我们来看在一个 CPU 上的具体场景。消费者先运行,执行到 C1 行,调用 sem_wait(&full)。因为 full 初始值为 0,wait 调用会将 full 减为−1,导致消费者睡眠,等待另一个线程调用 sem_post(&full),符合预期。

假设生产者然后运行。执行到 P1 行,调用 sem_wait(&empty)。不像消费者,生产者将继续执行,因为 empty 被初始化为 MAX(在这里是 1)。因此,empty 被减为 0,生产者向缓冲区中加入数据,然后执行 P3 行,调用 sem_post(&full),把 full 从−1 变成 0,唤醒消费者(即将它从阻塞变成就续)。

在这种情况下,可能会有两种情况。如果生产者继续执行,再次循环到 P1 行,由于 empty 值为 0,它会阻塞。如果生产者被中断,而消费者开始执行,调用 sem_wait(&full)(c1 行), 发现缓冲区确实满了,消费它。这两种情况都是符合预期的。

你可以用更多的线程来尝试这个例子(即多个生产者和多个消费者)。它应该仍然正常运行

我们现在假设 MAX 大于 1(比如 MAX=10)。对于这个例子,假定有多个生产者,多个消费者。现在就有问题了:竞态条件

假设两个生产者(Pa 和 Pb)几乎同时调用 put()。当 Pa 先运 行,在 f1 行先加入第一条数据(fill=0),假设 Pa 在将 fill 计数器更新为 1 之前被中断,Pb 开始运行,也在 f1 行给缓冲区的 0 位置加入一条数据,这意味着那里的老数据被覆盖!这 可不行,我们不能让生产者的数据丢失。

就是说,锁的约束性不够了!

哪怕都能写,但是写的位置是相同的,这个是不行的。

解决方案:增加互斥

写入这个操作,再加一个锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
5 void *producer(void *arg) {
6 int i;
7 for (i = 0; i < loops; i++) {
8 sem_wait(&mutex); // line p0 (NEW LINE)
9 sem_wait(&empty); // line p1
10 put(i); // line p2
11 sem_post(&full); // line p3
12 sem_post(&mutex); // line p4 (NEW LINE)
13 }
14 }
15
16 void *consumer(void *arg) {
17 int i;
18 for (i = 0; i < loops; i++) {
19 sem_wait(&mutex); // line c0 (NEW LINE)
20 sem_wait(&full); // line c1
21 int tmp = get(); // line c2
22 sem_post(&empty); // line c3
23 sem_post(&mutex); // line c4 (NEW LINE)
24 printf("%d\n", tmp);
25 }
26 }

同时只能有一个生产者进行写入。

避免死锁

消 费者首先运行,获得锁(c0 行),然后对 full 信号量执行 sem_wait() (c1 行)。因为还没有 数据,所以消费者阻塞,让出 CPU。但是,重要的是,此时消费者仍然持有锁。

然后生产者运行。假如生产者能够运行,它就能生产数据并唤醒消费者线程。遗憾的是, 它首先对二值互斥信号量调用 sem_wait()(p0 行)。锁已经被持有,因此生产者也被卡住。

解决方案:紧邻临界区

把获取和释放互斥量的操作调整为紧挨着临界区,把 full、empty 的唤醒和等待操作调整到锁 外面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1 sem_t empty;
2 sem_t full;
3 sem_t mutex;
4
5 void *producer(void *arg) {
6 int i;
7 for (i = 0; i < loops; i++) {
8 sem_wait(&empty); // line p1
9 sem_wait(&mutex); // line p1.5 (MOVED MUTEX HERE...)
10 put(i); // line p2
11 sem_post(&mutex); // line p2.5 (... AND HERE)
12 sem_post(&full); // line p3
13 }
14 }
15
16 void *consumer(void *arg) {
17 int i;
18 for (i = 0; i < loops; i++) {
19 sem_wait(&full); // line c1
20 sem_wait(&mutex); // line c1.5 (MOVED MUTEX HERE...)
21 int tmp = get(); // line c2
22 sem_post(&mutex); // line c2.5 (... AND HERE)
23 sem_post(&empty); // line c3
24 printf("%d\n", tmp);
25 }
26 }
27
28 int main(int argc, char *argv[]) {
29 // ...
30 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with...
31 sem_init(&full, 0, 0); // ... and 0 are full
32 sem_init(&mutex, 0, 1); // mutex=1 because it is a lock
33 // ...
34 }

读者-写者锁

读者—写者锁(reader-writer lock)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1 typedef struct _rwlock_t {
2 sem_t lock; // binary semaphore (basic lock)
3 sem_t writelock; // used to allow ONE writer or MANY readers
4 int readers; // count of readers reading in critical section
5 } rwlock_t;
6
7 void rwlock_init(rwlock_t *rw) {
8 rw->readers = 0;
9 sem_init(&rw->lock, 0, 1);
10 sem_init(&rw->writelock, 0, 1);
11 }
12
13 void rwlock_acquire_readlock(rwlock_t *rw) {
14 sem_wait(&rw->lock);
15 rw->readers++;
16 if (rw->readers == 1)
17 sem_wait(&rw->writelock); // first reader acquires writelock
18 sem_post(&rw->lock);
19 }
20
21 void rwlock_release_readlock(rwlock_t *rw) {
22 sem_wait(&rw->lock);
23 rw->readers--;
24 if (rw->readers == 0)
25 sem_post(&rw->writelock); // last reader releases writelock
26 sem_post(&rw->lock);
27 }
28
29 void rwlock_acquire_writelock(rwlock_t *rw) {
30 sem_wait(&rw->writelock);
31 }
32
33 void rwlock_release_writelock(rwlock_t *rw) {
34 sem_post(&rw->writelock);
35 }

哲学家就餐问题

  • Dijkstra 提出的有名问题;
  • 假定 有 5 位“哲学家”围着一个圆桌。每两位哲学家之间有一把餐叉(一共 5 把)。哲学家有时要思考一会,不需要餐叉;有时又要就餐。而一位哲学家只有同时拿到了左手边和右手边的两把餐叉,才能吃到东西。

哲学家基本循环:

1
2
3
4
5
6
7
while(1)
{
think();
getforks();
eat();
putforks();
}

关键的挑战就是如何实现 getforks()和 putforks()函数,保证没有死锁,没有哲学家饿死,并且并发度更高(尽可能让更多哲学家同时吃东西)。

辅助函数:

1
2
int left(int p) { return p; }
int right(int p) { return (p + 1) % 5; }

用于获取左手、右手边的叉子;

可能的死锁:所有人都占用了左边的叉子。

解决方案:令某一位的取叉顺序改变一下即可。(先右后左或相反)

用锁和条件变量实现信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1 typedef struct _Zem_t {
2 int value;
3 pthread_cond_t cond;
4 pthread_mutex_t lock;
5 } Zem_t;
6
7 // only one thread can call this
8 void Zem_init(Zem_t *s, int value) {
9 s->value = value;
10 Cond_init(&s->cond);
11 Mutex_init(&s->lock);
12 }
13
14 void Zem_wait(Zem_t *s) {
15 Mutex_lock(&s->lock);
16 while (s->value <= 0)
17 Cond_wait(&s->cond, &s->lock);
18 s->value--;
19 Mutex_unlock(&s->lock);
20 }
21
22 void Zem_post(Zem_t *s) {
23 Mutex_lock(&s->lock);
24 s->value++;
25 Cond_signal(&s->cond);
26 Mutex_unlock(&s->lock);
27 }

第 32 章 常见并发问题

非死锁

违反原子性缺陷

1
2
3
4
5
6
7
8
9
1 Thread 1::
2 if (thd->proc_info) {
3 ...
4 fputs(thd->proc_info, ...);
5 ...
6 }
7
8 Thread 2::
9 thd->proc_info = NULL;

如上例

两个线程都要访问 thd 结构中的成员 proc_info。第一个线程检查 proc_info 非空,然后打印出值;第二个线程设置其为空。显然,当第一个线程检查之后,在 fputs() 调用之前被中断,第二个线程把指针置为空;当第一个线程恢复执行时,由于引用空指针, 导致程序奔溃。

解决方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1 pthread_mutex_t proc_info_lock = PTHREAD_MUTEX_INITIALIZER;
2
3 Thread 1::
4 pthread_mutex_lock(&proc_info_lock);
5 if (thd->proc_info) {
6 ...
7 fputs(thd->proc_info, ...);
8 ...
9 }
10 pthread_mutex_unlock(&proc_info_lock);
11
12 Thread 2::
13 pthread_mutex_lock(&proc_info_lock);
14 thd->proc_info = NULL;
15 pthread_mutex_unlock(&proc_info_lock);

proc_info的访问加锁

违反顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
1 Thread 1::
2 void init() {
3 ...
4 mThread = PR_CreateThread(mMain, ...);
5 ...
6 }
7
8 Thread 2::
9 void mMain(...) {
10 ...
11 mState = mThread->State;
12 ...
13 }

线程 2 的代码中似乎假定变量 mThread 已经被初始化了(不为空)。 然而,如果线程 1 并没有首先执行,线程 2 就可能因为引用空指针奔溃

修复:使用条件变量

强制顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1 pthread_mutex_t mtLock = PTHREAD_MUTEX_INITIALIZER;
2 pthread_cond_t mtCond = PTHREAD_COND_INITIALIZER;
3 int mtInit = 0;
4
5 Thread 1::
6 void init() {
7 ...
8 mThread = PR_CreateThread(mMain, ...);
9
10 // signal that the thread has been created...
11 pthread_mutex_lock(&mtLock);
12 mtInit = 1;
13 pthread_cond_signal(&mtCond);
14 pthread_mutex_unlock(&mtLock);
15 ...
16 }
17
18 Thread 2::
19 void mMain(...) {
20 ...
21 // wait for the thread to be initialized...
22 pthread_mutex_lock(&mtLock);
23 while (mtInit == 0)
24 pthread_cond_wait(&mtCond, &mtLock);
25 pthread_mutex_unlock(&mtLock);
26
27 mState = mThread->State;
28 ...
29 }

只要myInit变量为0时,main就不继续进行,而是等待其初始化完成。

死锁

产生死锁的四个条件:

  • 互斥:线程对于需要的资源进行互斥的访问(例如一个线程抢到锁)。
  • 持有并等待:线程持有了资源(例如已将持有的锁),同时又在等待其他资源(例如,需要获得的锁)。
  • 非抢占:线程获得的资源(例如锁),不能被抢占。
  • 循环等待:线程之间存在一个环路,环路上每个线程都额外持有一个资源,而这个资源又是下一个线程要申请的。

四个条件全部满足,才会产生死锁。

预防

循环等待

也许最实用的预防技术(当然也是经常采用的),就是让代码不会产生循环等待。最直接的方法就是获取锁时提供一个全序(total ordering)。假如系统共有两个锁(L1 和 L2), 那么我们每次都申请 L1 然申请 L2,就可以避免死锁。这样严格的顺序避免了循环等待, 也就不会产生死锁。

当然,更复杂的系统中不会只有两个锁,锁的全序可能很难做到。因此,偏序(partial ordering)可能是一种有用的方法,安排锁的获取并避免死锁。Linux 中的内存映射代码就 是一个偏序锁的好例子[T+94]。代码开头的注释表明了 10 组不同的加锁顺序,包括简单的 关系,比如 i_mutex 早于 i_mmap_mutex,也包括复杂的关系,比如 i_mmap_mutex 早于 private_lock,早于 swap_lock,早于 mapping->tree_lock。

持有并等待

原子地抢锁

先抢一个全局锁

1
2
3
4
5
1 lock(prevention);
2 lock(L1);
3 lock(L2);
4 ...
5 unlock(prevention);

先抢到 prevention 这个锁之后,代码保证了在抢锁的过程中,不会有不合时宜的线程切换,从而避免了死锁。

非抢占

在调用 unlock 之前,都认为锁是被占有的,多个抢锁操作通常会带来麻烦,因为我们 等待一个锁时,同时持有另一个锁。很多线程库提供更为灵活的接口来避免这种情况。具 体来说,**trylock()**函数会尝试获得锁,或者返回−1,表示锁已经被占有。你可以稍后重试 一下。

1
2
3
4
5
6
1 top:
2 lock(L1);
3 if (trylock(L2) == -1) {
4 unlock(L1);
5 goto top;
6 }

注意,另一个线程可以使用相同的加锁方式,但是不同的加锁顺序(L2 然后 L1),程 序仍然不会产生死锁。但是会引来一个新的问题:活锁(livelock)

两个线程有可能一直重复这一序列,又同时都抢锁失败。这种情况下,系统一直在运行这段代码(因此不是死 锁),但是又不会有进展,因此名为活锁。也有活锁的解决方法:例如,可以在循环结束的时候,先随机等待一个时间,然后再重复整个动作,这样可以降低线程之间的重复互相干扰。

互斥

最后的预防方法是完全避免互斥。通常来说,代码都会存在临界区,因此很难避免互斥。那么我们应该怎么做呢?

Herlihy 提出了设计各种无等待(wait-free)数据结构的思想。

想法很简单:通过强大的硬件指令,我们可以构造出不需要锁的数据结构。

通过调度避免死锁

除了死锁预防,某些场景更适合死锁避免(avoidance)

例如,假设我们需要在两个处理器上调度 4 个线程。更进一步,假设我们知道线程 1 (T1)需要用锁 L1 和 L2,T2 也需要抢 L1 和 L2,T3 只需要 L2,T4 不需要锁。我们用表 32.2 来表示线程对锁的需求。

一种比较聪明的调度方式是,只要 T1 和 T2 不同时运行,就不会产生死锁。

检查和恢复

最后一种常用的策略就是允许死锁偶尔发生,检查到死锁时再采取行动。举个例子, 如果一个操作系统一年死机一次,你会重启系统,然后愉快地(或者生气地)继续工作。

如果死锁很少见,这种不是办法的办法也是很实用的。

Not everything worth doing is worth doing well


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!