multi_threaed

概述:本文记录了并发编程的一些知识,参考极客时间相关文章

并发编程Bug的源头

并发程序的背后

计算机发展(速度的提升):CPU、内存、I/O设备不断变得更快

核心矛盾:三者之间的速度差异 CPU > 内存 >> I/O设备

程序里大部分语句都要访问内存,有些还要访问I/O,而程序整体的性能取决于最慢的操作——读写I/O设备,也就是说单方面提高CPU性能是无效的。

为了合理利用CPU的高性能,平衡这三者的速度差异,计算机体系结构、操作系统、编译程序都做出了贡献,主要体现为:

  1. CPU增加了缓存,以均衡与内存的速度差异;
  2. 操作系统增加了进程、线程,以分时复用CPU,进而均衡CPU与I/O设备的速度差异;
  3. 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。

现在我们几乎所有的程序都默默地享受着这些成果,但并发程序很多诡异问题的根源也在这里。

源头之一:缓存导致的可见性问题

在单核时代,所有的线程都是在一颗CPU上执行,CPU缓存与内存的数据一致性容易解决。因为所有线程都是操作同一个CPU的缓存,一个线程对缓存的写,对另外一个线程来说一定是可见的。例如在下面的图中,线程A和线程B都是操作同一个CPU里面的缓存,所以线程A更新了变量V的值,那么线程B之后再访问变量V,得到的一定是V的最新值(线程A写过的值)。

CPU缓存与内存的关系图

一个线程对共享变量的修改,另外一个线程能够立刻看到,我们称为可见性

多核时代,每颗CPU都有自己的缓存,这时CPU缓存与内存的数据一致性就没那么容易解决了,当多个线程在不同的CPU上执行时,这些线程操作的是不同的CPU缓存。比如下图中,线程A操作的是CPU-1上的缓存,而线程B操作的是CPU-2上的缓存,很明显,这个时候线程A对变量V的操作对于线程B而言就不具备可见性了。这个就属于硬件程序员给软件程序员挖的“坑”。

多核CPU的缓存与内存关系图

下面我们再用一段代码来验证一下多核场景下的可见性问题。下面的代码,每执行一次add10K()方法,都会循环10000次count+=1操作。在calc()方法中我们创建了两个线程,每个线程调用一次add10K()方法,我们来想一想执行calc()方法得到的结果应该是多少呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Test {
private long count = 0;
private void add10K() {
int idx = 0;
while(idx++ < 10000) {
count += 1;
}
}
public static long calc() {
final Test test = new Test();
// 创建两个线程,执行add()操作
Thread th1 = new Thread(()->{
test.add10K();
});
Thread th2 = new Thread(()->{
test.add10K();
});
// 启动两个线程
th1.start();
th2.start();
// 等待两个线程执行结束
th1.join();
th2.join();
return count;
}
}

直觉告诉我们应该是20000,因为在单线程里调用两次add10K()方法,count的值就是20000,但实际上calc()的执行结果是个10000到20000之间的随机数。为什么呢?

我们假设线程A和线程B同时开始执行,那么第一次都会将 count=0 读到各自的CPU缓存里,执行完 count+=1 之后,各自CPU缓存里的值都是1,同时写入内存后,我们会发现内存中是1,而不是我们期望的2。之后由于各自的CPU缓存里都有了count的值,两个线程都是基于CPU缓存里的 count 值来计算,所以导致最终count的值都是小于20000的。这就是缓存的可见性问题。

循环10000次count+=1操作如果改为循环1亿次,你会发现效果更明显,最终count的值接近1亿,而不是2亿。如果循环10000次,count的值接近20000,原因是两个线程不是同时启动的,有一个时差。

变量count在CPU缓存和内存的分布图

源头之二:线程切换带来的原子性问题

由于IO太慢,早期的操作系统就发明了多进程,即便在单核的CPU上我们也可以一边听着歌,一边写Bug,这个就是多进程的功劳。

操作系统允许某个进程执行一小段时间,例如50毫秒,过了50毫秒操作系统就会重新选择一个进程来执行(我们称为“任务切换”),这个50毫秒称为“时间片”。

线程切换示意图

在一个时间片内,如果一个进程进行一个IO操作,例如读个文件,这个时候该进程可以把自己标记为“休眠状态”并出让CPU的使用权,待文件读进内存,操作系统会把这个休眠的进程唤醒,唤醒后的进程就有机会重新获得CPU的使用权了。

这里的进程在等待IO时之所以会释放CPU使用权,是为了让CPU在这段等待时间里可以做别的事情,这样一来CPU的使用率就上来了;此外,如果这时有另外一个进程也读文件,读文件的操作就会排队,磁盘驱动在完成一个进程的读操作后,发现有排队的任务,就会立即启动下一个读操作,这样IO的使用率也上来了。

是不是很简单的逻辑?但是,虽然看似简单,支持多进程分时复用在操作系统的发展史上却具有里程碑意义,Unix就是因为解决了这个问题而名噪天下的。

早期的操作系统基于进程来调度CPU,不同进程间是不共享内存空间的,所以进程要做任务切换就要切换内存映射地址,而一个进程创建的所有线程,都是共享一个内存空间的,所以线程做任务切换成本就很低了。现代的操作系统都基于更轻量的线程来调度,现在我们提到的“任务切换”都是指“线程切换”。

Java并发程序都是基于多线程的,自然也会涉及到任务切换,也许你想不到,任务切换竟然也是并发编程里诡异Bug的源头之一。任务切换的时机大多数是在时间片结束的时候,我们现在基本都使用高级语言编程,高级语言里一条语句往往需要多条CPU指令完成,例如上面代码中的count += 1,至少需要三条CPU指令。

  • 指令1:首先,需要把变量count从内存加载到CPU的寄存器;
  • 指令2:之后,在寄存器中执行+1操作;
  • 指令3:最后,将结果写入内存(缓存机制导致可能写入的是CPU缓存而不是内存)。

操作系统做任务切换,可以发生在任何一条CPU指令执行完,是的,是CPU指令,而不是高级语言里的一条语句。对于上面的三条指令来说,我们假设count=0,如果线程A在指令1执行完后做线程切换,线程A和线程B按照下图的序列执行,那么我们会发现两个线程都执行了count+=1的操作,但是得到的结果不是我们期望的2,而是1。

非原子操作的执行路径示意图

我们潜意识里面觉得count+=1这个操作是一个不可分割的整体,就像一个原子一样,线程的切换可以发生在count+=1之前,也可以发生在count+=1之后,但就是不会发生在中间。我们把一个或者多个操作在CPU执行的过程中不被中断的特性称为原子性CPU能保证的原子操作是CPU指令级别的,而不是高级语言的操作符,这是违背我们直觉的地方。因此,很多时候我们需要在高级语言层面保证操作的原子性。

源头之三:编译优化带来的有序性问题

有序性指的是程序按照代码的先后顺序执行。编译器为了优化性能,有时候会改变程序中语句的先后顺序,例如程序中:“a=6;b=7;”编译器优化后可能变成“b=7;a=6;”,在这个例子中,编译器调整了语句的顺序,但是不影响程序的最终结果。不过有时候编译器及解释器的优化可能导致意想不到的Bug。

在Java领域一个经典的案例就是利用双重检查创建单例对象,例如下面的代码:在获取实例getInstance()的方法中,我们首先判断instance是否为空,如果为空,则锁定Singleton.class并再次检查instance是否为空,如果还为空则创建Singleton的一个实例。

1
2
3
4
5
6
7
8
9
10
11
12
public class Singleton {
static Singleton instance;
static Singleton getInstance(){
if (instance == null) {
synchronized(Singleton.class) {
if (instance == null)
instance = new Singleton();
}
}
return instance;
}
}

假设有两个线程A、B同时调用getInstance()方法,他们会同时发现 instance == null ,于是同时对Singleton.class加锁,此时JVM保证只有一个线程能够加锁成功(假设是线程A),另外一个线程则会处于等待状态(假设是线程B);线程A会创建一个Singleton实例,之后释放锁,锁释放后,线程B被唤醒,线程B再次尝试加锁,此时是可以加锁成功的,加锁成功后,线程B检查 instance == null 时会发现,已经创建过Singleton实例了,所以线程B不会再创建一个Singleton实例。

这看上去一切都很完美,无懈可击,但实际上这个getInstance()方法并不完美。问题出在哪里呢?出在new操作上,我们以为的new操作应该是:

  1. 分配一块内存M;
  2. 在内存M上初始化Singleton对象;
  3. 然后M的地址赋值给instance变量。

但是实际上优化后的执行路径却是这样的:

  1. 分配一块内存M;
  2. 将M的地址赋值给instance变量;
  3. 最后在内存M上初始化Singleton对象。

优化后会导致什么问题呢?我们假设线程A先执行getInstance()方法,当执行完指令2时恰好发生了线程切换,切换到了线程B上;如果此时线程B也执行getInstance()方法,那么线程B在执行第一个判断时会发现 instance != null ,所以直接返回instance,而此时的instance是没有初始化过的,如果我们这个时候访问 instance 的成员变量就可能触发空指针异常。

双重检查创建单例的异常执行路径

正确做法:

添加volatile禁止指令重排

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Singleton {
private volatile static Singleton instance = null;
public static Singleton getInstance() {
if(null == instance) {
synchronized (Singleton.class) {
if(null == instance) {
instance = new Singleton();
}
}
}
return instance;
}
}

Java内存模型

在并发场景中,因可见性、原子性、有序性导致的问题常常会违背我们的直觉,从而成为并发编程的Bug之源。这三者在编程领域属于共性问题,所有的编程语言都会遇到,Java在诞生之初就支持多线程,自然也有针对这三者的技术方案,而且在编程语言领域处于领先地位。

Java内存模型可以解决其中的可见性和有序性导致的问题

什么是Java内存模型

导致可见性问题的原因:缓存

导致有序性问题的原因:编译优化

因此最直接的办法:禁用缓存和编译优化,但这样直接导致了程序性能下降

合理的方案:按需禁用缓存以及编译优化

Java内存模型是个很复杂的规范,可以从不同的视角来解读,站在程序员的视角,本质上可以理解为,Java内存模型规范了JVM如何提供按需禁用缓存和编译优化的方法。具体来说,这些方法包括 volatilesynchronizedfinal 三个关键字,以及六项 Happens-Before 规则

使用volatile的困惑

volatile关键字并不是Java语言的特产,古老的C语言里也有,它最原始的意义就是禁用CPU缓存。

例如,我们声明一个volatile变量 volatile int x = 0,它表达的是:告诉编译器,对这个变量的读写,不能使用CPU缓存,必须从内存中读取或者写入。这个语义看上去相当明确,但是在实际使用的时候却会带来困惑。

例如下面的示例代码,假设线程A执行writer()方法,按照 volatile 语义,会把变量 “v=true” 写入内存;假设线程B执行reader()方法,同样按照 volatile 语义,线程B会从内存中读取变量v,如果线程B看到 “v == true” 时,那么线程B看到的变量x是多少呢?

直觉上看,应该是42,那实际应该是多少呢?这个要看Java的版本,如果在低于1.5版本上运行,x可能是42,也有可能是0;如果在1.5以上的版本上运行,x就是等于42。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 以下代码来源于【http://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html】
class VolatileExample {
int x = 0;
volatile boolean v = false;
public void writer() {
x = 42;
v = true;
}
public void reader() {
if (v == true) {
// 这里x会是多少呢?
}
}
}

分析一下,为什么1.5以前的版本会出现x = 0的情况呢?我相信你一定想到了,变量x可能被CPU缓存而导致可见性问题。这个问题在1.5版本已经被圆满解决了。Java内存模型在1.5版本对volatile语义进行了增强。怎么增强的呢?答案是一项 Happens-Before 规则。

Happens-Before 规则

Happens-Before 并不是说前面一个操作发生在后续操作的前面,它真正要表达的是:前面一个操作的结果对后续操作是可见的。所以比较正式的说法是:Happens-Before 约束了编译器的优化行为,虽允许编译器优化,但是要求编译器优化后一定遵守 Happens-Before 规则

Happens-Before 规则应该是Java内存模型里面最晦涩的内容了,和程序员相关的规则一共有如下六项,都是关于可见性的。

前面示例代码涉及到这六项规则中的前三项。

1. 程序的顺序性规则

这条规则是指在一个线程中,按照程序顺序,前面的操作 Happens-Before 于后续的任意操作。这还是比较容易理解的,比如刚才那段示例代码,按照程序的顺序,第6行代码 “x = 42;” Happens-Before 于第7行代码 “v = true;”,这就是规则1的内容,也比较符合单线程里面的思维:程序前面对某个变量的修改一定是对后续操作可见的。

1
2
3
4
5
6
7
8
9
10
11
12
13
class VolatileExample {
int x = 0;
volatile boolean v = false;
public void writer() {
x = 42;
v = true;
}
public void reader() {
if (v == true) {
// 这里x会是多少呢?
}
}
}

2. volatile变量规则

这条规则是指对一个volatile变量的写操作, Happens-Before 于后续对这个volatile变量的读操作

这个就有点费解了,对一个volatile变量的写操作相对于后续对这个volatile变量的读操作可见,这怎么看都是禁用缓存的意思啊,貌似和1.5版本以前的语义没有变化啊?如果单看这个规则,的确是这样,但是如果我们关联一下规则3,就有点不一样的感觉了。

3. 传递性

这条规则是指如果A Happens-Before B,且B Happens-Before C,那么A Happens-Before C。

我们将规则3的传递性应用到我们的例子中,会发生什么呢?可以看下面这幅图:

img

示例代码中的传递性规则

从图中,我们可以看到:

  1. “x=42” Happens-Before 写变量 “v=true” ,这是规则1的内容;
  2. 写变量“v=true” Happens-Before 读变量 “v=true”,这是规则2的内容 。

再根据这个传递性规则,我们得到结果:“x=42” Happens-Before 读变量“v=true”。这意味着什么呢?

如果线程B读到了“v=true”,那么线程A设置的“x=42”对线程B是可见的。也就是说,线程B能看到 “x == 42” ,有没有一种恍然大悟的感觉?这就是1.5版本对volatile语义的增强,这个增强意义重大,1.5版本的并发工具包(java.util.concurrent)就是靠volatile语义来搞定可见性的,这个在后面的内容中会详细介绍。

4. 管程中锁的规则

这条规则是指对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。

要理解这个规则,就首先要了解“管程指的是什么”。管程是一种通用的同步原语,在Java中指的就是synchronized,synchronized是Java里对管程的实现。

管程中的锁在Java里是隐式实现的,例如下面的代码,在进入同步块之前,会自动加锁,而在代码块执行完会自动释放锁,加锁以及释放锁都是编译器帮我们实现的。

1
2
3
4
5
6
synchronized (this) { //此处自动加锁
// x是共享变量,初始值=10
if (this.x < 12) {
this.x = 12;
}
} //此处自动解锁

所以结合规则4——管程中锁的规则,可以这样理解:假设x的初始值是10,线程A执行完代码块后x的值会变成12(执行完自动释放锁),线程B进入代码块时,能够看到线程A对x的写操作,也就是线程B能够看到x==12。这个也是符合我们直觉的,应该不难理解。

5. 线程 start() 规则

这条是关于线程启动的。它是指主线程A启动子线程B后,子线程B能够看到主线程在启动子线程B前的操作。

换句话说就是,如果线程A调用线程B的 start() 方法(即在线程A中启动线程B),那么该start()操作 Happens-Before 于线程B中的任意操作。具体可参考下面示例代码。

1
2
3
4
5
6
7
8
9
Thread B = new Thread(()->{
// 主线程调用B.start()之前
// 所有对共享变量的修改,此处皆可见
// 此例中,var==77
});
// 此处对共享变量var修改
var = 77;
// 主线程启动子线程
B.start();

6. 线程 join() 规则

这条是关于线程等待的。它是指主线程A等待子线程B完成(主线程A通过调用子线程B的join()方法实现),当子线程B完成后(主线程A中join()方法返回),主线程能够看到子线程的操作。当然所谓的“看到”,指的是对共享变量的操作。

换句话说就是,如果在线程A中,调用线程B的 join() 并成功返回,那么线程B中的任意操作Happens-Before 于该 join() 操作的返回。具体可参考下面示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
Thread B = new Thread(()->{
// 此处对共享变量var修改
var = 66;
});
// 例如此处对共享变量修改,
// 则这个修改结果对线程B可见
// 主线程启动子线程
B.start();
B.join()
// 子线程所有对共享变量的修改
// 在主线程调用B.join()之后皆可见
// 此例中,var==66

被忽视的final

前面我们讲volatile为的是禁用缓存以及编译优化,我们再从另外一个方面来看,有没有办法告诉编译器优化得更好一点呢?这个可以有,就是final关键字

final修饰变量时,初衷是告诉编译器:这个变量生而不变,可以优化。Java编译器在1.5以前的版本的确优化得很努力,以至于都优化错了。

问题类似于上一期提到的利用双重检查方法创建单例,构造函数的错误重排导致线程可能看到final变量的值会变化。详细的案例可以参考这个文档

当然了,在1.5以后Java内存模型对final类型变量的重排进行了约束。现在只要我们提供正确构造函数没有“逸出”,就不会出问题了。

“逸出”有点抽象,我们还是举个例子吧,在下面例子中,在构造函数里面将this赋值给了全局变量global.obj,这就是“逸出”,线程通过global.obj读取x是有可能读到0的。因此我们一定要避免“逸出”。

1
2
3
4
5
6
7
8
9
// 以下代码来源于【参考1】
final int x;
// 错误的构造函数
public FinalFieldExample() {
x = 3;
y = 4;
// 此处就是讲this逸出,
global.obj = this;
}

互斥锁:解决原子性问题

原子性问题的源头是线程切换,如果能够禁用线程切换那不就能解决这个问题了吗?而操作系统做线程切换是依赖CPU中断的,所以禁止CPU发生中断就能够禁止线程切换。

在早期单核CPU时代,这个方案的确是可行的,而且也有很多应用案例,但是并不适合多核场景。这里我们以32位CPU上执行long型变量的写操作为例来说明这个问题,long型变量是64位,在32位CPU上执行写操作会被拆分成两次写操作(写高32位和写低32位,如下图所示)。

img

在单核CPU场景下,同一时刻只有一个线程执行,禁止CPU中断,意味着操作系统不会重新调度线程,也就是禁止了线程切换,获得CPU使用权的线程就可以不间断地执行,所以两次写操作一定是:要么都被执行,要么都没有被执行,具有原子性。

但是在多核场景下,同一时刻,有可能有两个线程同时在执行,一个线程执行在CPU-1上,一个线程执行在CPU-2上,此时禁止CPU中断,只能保证CPU上的线程连续执行,并不能保证同一时刻只有一个线程执行,如果这两个线程同时写long型变量高32位的话,那就有可能出现我们开头提及的诡异Bug了。

同一时刻只有一个线程执行”这个条件非常重要,我们称之为互斥。如果我们能够保证对共享变量的修改是互斥的,那么,无论是单核CPU还是多核CPU,就都能保证原子性了。

简易锁模型

当谈到互斥,相信聪明的你一定想到了那个杀手级解决方案:锁。同时大脑中还会出现以下模型:

简易锁模型

我们把一段需要互斥执行的代码称为临界区。线程在进入临界区之前,首先尝试加锁lock(),如果成功,则进入临界区,此时我们称这个线程持有锁;否则呢就等待,直到持有锁的线程解锁;持有锁的线程执行完临界区的代码后,执行解锁unlock()。

这个过程非常像办公室里高峰期抢占坑位,每个人都是进坑锁门(加锁),出坑开门(解锁),如厕这个事就是临界区。很长时间里,我也是这么理解的。这样理解本身没有问题,但却很容易让我们忽视两个非常非常重要的点:我们锁的是什么?我们保护的又是什么?

改进后的锁模型

我们知道在现实世界里,锁和锁要保护的资源是有对应关系的,比如你用你家的锁保护你家的东西,我用我家的锁保护我家的东西。在并发编程世界里,锁和资源也应该有这个关系,但这个关系在我们上面的模型中是没有体现的,所以我们需要完善一下我们的模型。

改进后的锁模型

首先,我们要把临界区要保护的资源标注出来,如图中临界区里增加了一个元素:受保护的资源R;其次,我们要保护资源R就得为它创建一把锁LR;最后,针对这把锁LR,我们还需在进出临界区时添上加锁操作和解锁操作。另外,在锁LR和受保护资源之间,我特地用一条线做了关联,这个关联关系非常重要。很多并发Bug的出现都是因为把它忽略了,然后就出现了类似锁自家门来保护他家资产的事情,这样的Bug非常不好诊断,因为潜意识里我们认为已经正确加锁了。

Java语言提供的锁技术:synchronized

锁是一种通用的技术方案,Java语言提供的synchronized关键字,就是锁的一种实现。synchronized关键字可以用来修饰方法,也可以用来修饰代码块,它的使用示例基本上都是下面这个样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class X {
// 修饰非静态方法
synchronized void foo() {
// 临界区
}
// 修饰静态方法
synchronized static void bar() {
// 临界区
}
// 修饰代码块
Object obj = new Object();
void baz() {
synchronized(obj) {
// 临界区
}
}
}

看完之后你可能会觉得有点奇怪,这个和我们上面提到的模型有点对不上号啊,加锁lock()和解锁unlock()在哪里呢?其实这两个操作都是有的,只是这两个操作是被Java默默加上的,Java编译器会在synchronized修饰的方法或代码块前后自动加上加锁lock()和解锁unlock(),这样做的好处就是加锁lock()和解锁unlock()一定是成对出现的,毕竟忘记解锁unlock()可是个致命的Bug(意味着其他线程只能死等下去了)。

那synchronized里的加锁lock()和解锁unlock()锁定的对象在哪里呢?上面的代码我们看到只有修饰代码块的时候,锁定了一个obj对象,那修饰方法的时候锁定的是什么呢?这个也是Java的一条隐式规则:

当修饰静态方法的时候,锁定的是当前类的Class对象,在上面的例子中就是Class X;
当修饰非静态方法的时候,锁定的是当前实例对象this。

对于上面的例子,synchronized修饰静态方法相当于:

1
2
3
4
5
6
class X {
// 修饰静态方法
synchronized(X.class) static void bar() {
// 临界区
}
}

修饰非静态方法,相当于:

1
2
3
4
5
6
class X {
// 修饰非静态方法
synchronized(this) void foo() {
// 临界区
}
}

用synchronized解决count+=1问题

前面提到过的count+=1存在的并发问题,现在我们可以尝试用synchronized来小试牛刀一把,代码如下所示。SafeCalc这个类有两个方法:一个是get()方法,用来获得value的值;另一个是addOne()方法,用来给value加1,并且addOne()方法我们用synchronized修饰。那么我们使用的这两个方法有没有并发问题呢?

1
2
3
4
5
6
7
8
9
class SafeCalc {
long value = 0L;
long get() {
return value;
}
synchronized void addOne() {
value += 1;
}
}

我们先来看看addOne()方法,首先可以肯定,被synchronized修饰后,无论是单核CPU还是多核CPU,只有一个线程能够执行addOne()方法,所以一定能保证原子操作,那是否有可见性问题呢?要回答这问题,就要重温一下之前提到的管程中锁的规则

管程中锁的规则:对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。

管程,就是我们这里的synchronized(至于为什么叫管程,我们后面介绍),我们知道synchronized修饰的临界区是互斥的,也就是说同一时刻只有一个线程执行临界区的代码;而所谓“对一个锁解锁 Happens-Before 后续对这个锁的加锁”,指的是前一个线程的解锁操作对后一个线程的加锁操作可见,综合Happens-Before的传递性原则,我们就能得出前一个线程在临界区修改的共享变量(该操作在解锁之前),对后续进入临界区(该操作在加锁之后)的线程是可见的。

按照这个规则,如果多个线程同时执行addOne()方法,可见性是可以保证的,也就说如果有1000个线程执行addOne()方法,最终结果一定是value的值增加了1000。看到这个结果,我们长出一口气,问题终于解决了。

但也许,你一不小心就忽视了get()方法。执行addOne()方法后,value的值对get()方法是可见的吗?这个可见性是没法保证的。管程中锁的规则,是只保证后续对这个锁的加锁的可见性,而get()方法并没有加锁操作,所以可见性没法保证。那如何解决呢?很简单,就是get()方法也synchronized一下,完整的代码如下所示。

1
2
3
4
5
6
7
8
9
class SafeCalc {
long value = 0L;
synchronized long get() {
return value;
}
synchronized void addOne() {
value += 1;
}
}

上面的代码转换为我们提到的锁模型,就是下面图示这个样子。get()方法和addOne()方法都需要访问value这个受保护的资源,这个资源用this这把锁来保护。线程要进入临界区get()和addOne(),必须先获得this这把锁,这样get()和addOne()也是互斥的。

保护临界区get()和addOne()的示意图

这个模型更像现实世界里面球赛门票的管理,一个座位只允许一个人使用,这个座位就是“受保护资源”,球场的入口就是Java类里的方法,而门票就是用来保护资源的“锁”,Java里的检票工作是由synchronized解决的。

锁和受保护资源的关系

我们前面提到,受保护资源和锁之间的关联关系非常重要,他们的关系是怎样的呢?一个合理的关系是:受保护资源和锁之间的关联关系是N:1的关系。还拿前面球赛门票的管理来类比,就是一个座位,我们只能用一张票来保护,如果多发了重复的票,那就要打架了。现实世界里,我们可以用多把锁来保护同一个资源,但在并发领域是不行的,并发领域的锁和现实世界的锁不是完全匹配的。不过倒是可以用同一把锁来保护多个资源,这个对应到现实世界就是我们所谓的“包场”了。

上面那个例子我稍作改动,把value改成静态变量,把addOne()方法改成静态方法,此时get()方法和addOne()方法是否存在并发问题呢?

1
2
3
4
5
6
7
8
9
class SafeCalc {
static long value = 0L;
synchronized long get() {
return value;
}
synchronized static void addOne() {
value += 1;
}
}

如果你仔细观察,就会发现改动后的代码是用两个锁保护一个资源。这个受保护的资源就是静态变量value,两个锁分别是this和SafeCalc.class。我们可以用下面这幅图来形象描述这个关系。由于临界区get()和addOne()是用两个锁保护的,因此这两个临界区没有互斥关系,临界区addOne()对value的修改对临界区get()也没有可见性保证,这就导致并发问题了。

两把锁保护一个资源的示意图

保护没有关联关系的多个资源

在现实世界里,球场的座位和电影院的座位就是没有关联关系的,这种场景非常容易解决,那就是球赛有球赛的门票,电影院有电影院的门票,各自管理各自的。

同样这对应到编程领域,也很容易解决。例如,银行业务中有针对账户余额(余额是一种资源)的取款操作,也有针对账户密码(密码也是一种资源)的更改操作,我们可以为账户余额和账户密码分配不同的锁来解决并发问题,这个还是很简单的。

相关的示例代码如下,账户类Account有两个成员变量,分别是账户余额balance和账户密码password。取款withdraw()和查看余额getBalance()操作会访问账户余额balance,我们创建一个final对象balLock作为锁(类比球赛门票);而更改密码updatePassword()和查看密码getPassword()操作会修改账户密码password,我们创建一个final对象pwLock作为锁(类比电影票)。不同的资源用不同的锁保护,各自管各自的,很简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Account {
// 锁:保护账户余额
private final Object balLock
= new Object();
// 账户余额
private Integer balance;
// 锁:保护账户密码
private final Object pwLock
= new Object();
// 账户密码
private String password;

// 取款
void withdraw(Integer amt) {
synchronized(balLock) {
if (this.balance > amt){
this.balance -= amt;
}
}
}
// 查看余额
Integer getBalance() {
synchronized(balLock) {
return balance;
}
}

// 更改密码
void updatePassword(String pw){
synchronized(pwLock) {
this.password = pw;
}
}
// 查看密码
String getPassword() {
synchronized(pwLock) {
return password;
}
}
}

当然,我们也可以用一把互斥锁来保护多个资源,例如我们可以用this这一把锁来管理账户类里所有的资源:账户余额和用户密码。具体实现很简单,示例程序中所有的方法都增加同步关键字synchronized就可以了,这里我就不一一展示了。

但是用一把锁有个问题,就是性能太差,会导致取款、查看余额、修改密码、查看密码这四个操作都是串行的。而我们用两把锁,取款和修改密码是可以并行的。用不同的锁对受保护资源进行精细化管理,能够提升性能。这种锁还有个名字,叫细粒度锁

保护有关联关系的多个资源

如果多个资源是有关联关系的,那这个问题就有点复杂了。例如银行业务里面的转账操作,账户A减少100元,账户B增加100元。这两个账户就是有关联关系的。那对于像转账这种有关联关系的操作,我们应该怎么去解决呢?先把这个问题代码化。我们声明了个账户类:Account,该类有一个成员变量余额:balance,还有一个用于转账的方法:transfer(),然后怎么保证转账操作transfer()没有并发问题呢?

1
2
3
4
5
6
7
8
9
10
11
class Account {
private int balance;
// 转账
void transfer(
Account target, int amt){
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}

相信你的直觉会告诉你这样的解决方案:用户synchronized关键字修饰一下transfer()方法就可以了,于是你很快就完成了相关的代码,如下所示。

1
2
3
4
5
6
7
8
9
10
11
class Account {
private int balance;
// 转账
synchronized void transfer(
Account target, int amt){
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}

在这段代码中,临界区内有两个资源,分别是转出账户的余额this.balance和转入账户的余额target.balance,并且用的是一把锁this,符合我们前面提到的,多个资源可以用一把锁来保护,这看上去完全正确呀。真的是这样吗?可惜,这个方案仅仅是看似正确,为什么呢?

问题就出在this这把锁上,this这把锁可以保护自己的余额this.balance,却保护不了别人的余额target.balance,就像你不能用自家的锁来保护别人家的资产,也不能用自己的票来保护别人的座位一样。

img

用锁this保护this.balance和target.balance的示意图

下面我们具体分析一下,假设有A、B、C三个账户,余额都是200元,我们用两个线程分别执行两个转账操作:账户A转给账户B 100 元,账户B转给账户C 100 元,最后我们期望的结果应该是账户A的余额是100元,账户B的余额是200元, 账户C的余额是300元。

我们假设线程1执行账户A转账户B的操作,线程2执行账户B转账户C的操作。这两个线程分别在两颗CPU上同时执行,那它们是互斥的吗?我们期望是,但实际上并不是。因为线程1锁定的是账户A的实例(A.this),而线程2锁定的是账户B的实例(B.this),所以这两个线程可以同时进入临界区transfer()。同时进入临界区的结果是什么呢?线程1和线程2都会读到账户B的余额为200,导致最终账户B的余额可能是300(线程1后于线程2写B.balance,线程2写的B.balance值被线程1覆盖),可能是100(线程1先于线程2写B.balance,线程1写的B.balance值被线程2覆盖),就是不可能是200。

并发转账示意图

使用锁的正确姿势

在上一篇文章中,我们提到用同一把锁来保护多个资源,也就是现实世界的“包场”,那在编程领域应该怎么“包场”呢?很简单,只要我们的锁能覆盖所有受保护资源就可以了。在上面的例子中,this是对象级别的锁,所以A对象和B对象都有自己的锁,如何让A对象和B对象共享一把锁呢?

稍微开动脑筋,你会发现其实方案还挺多的,比如可以让所有对象都持有一个唯一性的对象,这个对象在创建Account时传入。方案有了,完成代码就简单了。示例代码如下,我们把Account默认构造函数变为private,同时增加一个带Object lock参数的构造函数,创建Account对象时,传入相同的lock,这样所有的Account对象都会共享这个lock了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Account {
private Object lock;
private int balance;
private Account();
// 创建Account时传入同一个lock对象
public Account(Object lock) {
this.lock = lock;
}
// 转账
void transfer(Account target, int amt){
// 此处检查所有对象共享的锁
synchronized(lock) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}

这个办法确实能解决问题,但是有点小瑕疵,它要求在创建Account对象的时候必须传入同一个对象,如果创建Account对象时,传入的lock不是同一个对象,那可就惨了,会出现锁自家门来保护他家资产的荒唐事。在真实的项目场景中,创建Account对象的代码很可能分散在多个工程中,传入共享的lock真的很难。

所以,上面的方案缺乏实践的可行性,我们需要更好的方案。还真有,就是用Account.class作为共享的锁。Account.class是所有Account对象共享的,而且这个对象是Java虚拟机在加载Account类的时候创建的,所以我们不用担心它的唯一性。使用Account.class作为共享的锁,我们就无需在创建Account对象时传入了,代码更简单。

1
2
3
4
5
6
7
8
9
10
11
12
class Account {
private int balance;
// 转账
void transfer(Account target, int amt){
synchronized(Account.class) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}

下面这幅图很直观地展示了我们是如何使用共享的锁Account.class来保护不同对象的临界区的。

img

死锁

我们用Account.class作为互斥锁,来解决银行业务里面的转账问题,虽然这个方案不存在并发问题,但是所有账户的转账操作都是串行的,例如账户A 转账户B、账户C 转账户D这两个转账操作现实世界里是可以并行的,但是在这个方案里却被串行化了,这样的话,性能太差。

试想互联网支付盛行的当下,8亿网民每人每天一笔交易,每天就是8亿笔交易;每笔交易都对应着一次转账操作,8亿笔交易就是8亿次转账操作,也就是说平均到每秒就是近1万次转账操作,若所有的转账操作都串行,性能完全不能接受。

那下面我们就尝试着把性能提升一下。

向现实世界要答案

现实世界里,账户转账操作是支持并发的,而且绝对是真正的并行,银行所有的窗口都可以做转账操作。只要我们能仿照现实世界做转账操作,串行的问题就解决了。

我们试想在古代,没有信息化,账户的存在形式真的就是一个账本,而且每个账户都有一个账本,这些账本都统一存放在文件架上。银行柜员在给我们做转账时,要去文件架上把转出账本和转入账本都拿到手,然后做转账。这个柜员在拿账本的时候可能遇到以下三种情况:

  1. 文件架上恰好有转出账本和转入账本,那就同时拿走;
  2. 如果文件架上只有转出账本和转入账本之一,那这个柜员就先把文件架上有的账本拿到手,同时等着其他柜员把另外一个账本送回来;
  3. 转出账本和转入账本都没有,那这个柜员就等着两个账本都被送回来。

上面这个过程在编程的世界里怎么实现呢?其实用两把锁就实现了,转出账本一把,转入账本另一把。在transfer()方法内部,我们首先尝试锁定转出账户this(先把转出账本拿到手),然后尝试锁定转入账户target(再把转入账本拿到手),只有当两者都成功时,才执行转账操作。这个逻辑可以图形化为下图这个样子。

两个转账操作并行示意图

而至于详细的代码实现,如下所示。经过这样的优化后,账户A 转账户B和账户C 转账户D这两个转账操作就可以并行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Account {
private int balance;
// 转账
void transfer(Account target, int amt){
// 锁定转出账户
synchronized(this) {
// 锁定转入账户
synchronized(target) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}
}

没有免费的午餐

上面的实现看上去很完美,并且也算是将锁用得出神入化了。相对于用Account.class作为互斥锁,锁定的范围太大,而我们锁定两个账户范围就小多了,这样的锁,上一章我们介绍过,叫细粒度锁使用细粒度锁可以提高并行度,是性能优化的一个重要手段

这个时候可能你已经开始警觉了,使用细粒度锁这么简单,有这样的好事,是不是也要付出点什么代价啊?编写并发程序就需要这样时时刻刻保持谨慎。

的确,使用细粒度锁是有代价的,这个代价就是可能会导致死锁。

在详细介绍死锁之前,我们先看看现实世界里的一种特殊场景。如果有客户找柜员张三做个转账业务:账户A 转账户B 100元,此时另一个客户找柜员李四也做个转账业务:账户B 转账户A 100 元,于是张三和李四同时都去文件架上拿账本,这时候有可能凑巧张三拿到了账本A,李四拿到了账本B。张三拿到账本A后就等着账本B(账本B已经被李四拿走),而李四拿到账本B后就等着账本A(账本A已经被张三拿走),他们要等多久呢?他们会永远等待下去…因为张三不会把账本A送回去,李四也不会把账本B送回去。我们姑且称为死等吧。

转账业务中的“死等”

现实世界里的死等,就是编程领域的死锁了。死锁的一个比较专业的定义是:一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象

上面转账的代码是怎么发生死锁的呢?我们假设线程T1执行账户A转账户B的操作,账户A.transfer(账户B);同时线程T2执行账户B转账户A的操作,账户B.transfer(账户A)。当T1和T2同时执行完①处的代码时,T1获得了账户A的锁(对于T1,this是账户A),而T2获得了账户B的锁(对于T2,this是账户B)。之后T1和T2在执行②处的代码时,T1试图获取账户B的锁时,发现账户B已经被锁定(被T2锁定),所以T1开始等待;T2则试图获取账户A的锁时,发现账户A已经被锁定(被T1锁定),所以T2也开始等待。于是T1和T2会无期限地等待下去,也就是我们所说的死锁了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Account {
private int balance;
// 转账
void transfer(Account target, int amt){
// 锁定转出账户
synchronized(this){ ①
// 锁定转入账户
synchronized(target){ ②
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}
}

关于这种现象,我们还可以借助资源分配图来可视化锁的占用情况(资源分配图是个有向图,它可以描述资源和线程的状态)。其中,资源用方形节点表示,线程用圆形节点表示;资源中的点指向线程的边表示线程已经获得该资源,线程指向资源的边则表示线程请求资源,但尚未得到。转账发生死锁时的资源分配图就如下图所示,一个“各据山头死等”的尴尬局面。

转账发生死锁时的资源分配图

如何预防死锁

并发程序一旦死锁,一般没有特别好的方法,很多时候我们只能重启应用。因此,解决死锁问题最好的办法还是规避死锁。

那如何避免死锁呢?要避免死锁就需要分析死锁发生的条件,有个叫Coffman的牛人早就总结过了,只有以下这四个条件都发生时才会出现死锁:

  1. 互斥,共享资源X和Y只能被一个线程占用;
  2. 占有且等待,线程T1已经取得共享资源X,在等待共享资源Y的时候,不释放共享资源X;
  3. 不可抢占,其他线程不能强行抢占线程T1占有的资源;
  4. 循环等待,线程T1等待线程T2占有的资源,线程T2等待线程T1占有的资源,就是循环等待。

反过来分析,也就是说只要我们破坏其中一个,就可以成功避免死锁的发生

其中,互斥这个条件我们没有办法破坏,因为我们用锁为的就是互斥。不过其他三个条件都是有办法破坏掉的,到底如何做呢?

  1. 对于“占用且等待”这个条件,我们可以一次性申请所有的资源,这样就不存在等待了。
  2. 对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
  3. 对于“循环等待”这个条件,可以靠按序申请资源来预防。所谓按序申请,是指资源是有线性顺序的,申请的时候可以先申请资源序号小的,再申请资源序号大的,这样线性化后自然就不存在循环了。

我们已经从理论上解决了如何预防死锁,那具体如何体现在代码上呢?下面我们就来尝试用代码实践一下这些理论。

1. 破坏占用且等待条件

从理论上讲,要破坏这个条件,可以一次性申请所有资源。在现实世界里,就拿前面我们提到的转账操作来讲,它需要的资源有两个,一个是转出账户,另一个是转入账户,当这两个账户同时被申请时,我们该怎么解决这个问题呢?

可以增加一个账本管理员,然后只允许账本管理员从文件架上拿账本,也就是说柜员不能直接在文件架上拿账本,必须通过账本管理员才能拿到想要的账本。例如,张三同时申请账本A和B,账本管理员如果发现文件架上只有账本A,这个时候账本管理员是不会把账本A拿下来给张三的,只有账本A和B都在的时候才会给张三。这样就保证了“一次性申请所有资源”。

通过账本管理员拿账本

对应到编程领域,“同时申请”这个操作是一个临界区,我们也需要一个角色(Java里面的类)来管理这个临界区,我们就把这个角色定为Allocator。它有两个重要功能,分别是:同时申请资源apply()和同时释放资源free()。账户Account 类里面持有一个Allocator的单例(必须是单例,只能由一个人来分配资源)。当账户Account在执行转账操作的时候,首先向Allocator同时申请转出账户和转入账户这两个资源,成功后再锁定这两个资源;当转账操作执行完,释放锁之后,我们需通知Allocator同时释放转出账户和转入账户这两个资源。具体的代码实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class Allocator {
private List als =
new ArrayList<>();
// 一次性申请所有资源
synchronized boolean apply(
Object from, Object to){
if(als.contains(from) ||
als.contains(to)){
return false;
} else {
als.add(from);
als.add(to);
}
return true;
}
// 归还资源
synchronized void free(
Object from, Object to){
als.remove(from);
als.remove(to);
}
}

class Account {
// actr应该为单例
private Allocator actr;
private int balance;
// 转账
void transfer(Account target, int amt){
// 一次性申请转出账户和转入账户,直到成功
while(!actr.apply(this, target))

try{
// 锁定转出账户
synchronized(this){
// 锁定转入账户
synchronized(target){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
} finally {
actr.free(this, target)
}
}
}

2. 破坏不可抢占条件

破坏不可抢占条件看上去很简单,核心是要能够主动释放它占有的资源,这一点synchronized是做不到的。原因是synchronized申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。

你可能会质疑,“Java作为排行榜第一的语言,这都解决不了?”你的怀疑很有道理,Java在语言层次确实没有解决这个问题,不过在SDK层面还是解决了的,java.util.concurrent这个包下面提供的Lock是可以轻松解决这个问题的

3. 破坏循环等待条件

破坏这个条件,需要对资源进行排序,然后按序申请资源。这个实现非常简单,我们假设每个账户都有不同的属性 id,这个 id 可以作为排序字段,申请的时候,我们可以按照从小到大的顺序来申请。比如下面代码中,①~⑥处的代码对转出账户(this)和转入账户(target)排序,然后按照序号从小到大的顺序锁定账户。这样就不存在“循环”等待了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Account {
private int id;
private int balance;
// 转账
void transfer(Account target, int amt){
Account left = this
Account right = target; ②
if (this.id > target.id) { ③
left = target; ④
right = this; ⑤
} ⑥
// 锁定序号小的账户
synchronized(left){
// 锁定序号大的账户
synchronized(right){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
}
}

用“等待-通知”机制优化循环等待

破坏占用且等待条件的时候,如果转出账本和转入账本不满足同时在文件架上这个条件,就用死循环的方式来循环等待,核心代码如下:

1
2
3
// 一次性申请转出账户和转入账户,直到成功
while(!actr.apply(this, target))

如果apply()操作耗时非常短,而且并发冲突量也不大时,这个方案还挺不错的,因为这种场景下,循环上几次或者几十次就能一次性获取转出账户和转入账户了。但是如果apply()操作耗时长,或者并发冲突量大的时候,循环等待这种方案就不适用了,因为在这种场景下,可能要循环上万次才能获取到锁,太消耗CPU了。

其实在这种场景下,最好的方案应该是:如果线程要求的条件(转出账本和转入账本同在文件架上)不满足,则线程阻塞自己,进入等待状态;当线程要求的条件(转出账本和转入账本同在文件架上)满足后,通知等待的线程重新执行。其中,使用线程阻塞的方式就能避免循环等待消耗CPU的问题。

那Java语言是否支持这种等待-通知机制呢?答案是:一定支持(毕竟占据排行榜第一那么久)。下面我们就来看看Java语言是如何支持等待-通知机制的。

完美的就医流程

在介绍Java语言如何支持等待-通知机制之前,我们先看一个现实世界里面的就医流程,因为它有着完善的等待-通知机制,所以对比就医流程,我们就能更好地理解和应用并发编程中的等待-通知机制。

就医流程基本上是这样:

  1. 患者先去挂号,然后到就诊门口分诊,等待叫号;
  2. 当叫到自己的号时,患者就可以找大夫就诊了;
  3. 就诊过程中,大夫可能会让患者去做检查,同时叫下一位患者;
  4. 当患者做完检查后,拿检测报告重新分诊,等待叫号;
  5. 当大夫再次叫到自己的号时,患者再去找大夫就诊。

或许你已经发现了,这个有着完美等待-通知机制的就医流程,不仅能够保证同一时刻大夫只为一个患者服务,而且还能够保证大夫和患者的效率。与此同时你可能也会有疑问,“这个就医流程很复杂呀,我们前面描述的等待-通知机制相较而言是不是太简单了?”那这个复杂度是否是必须的呢?这个是必须的,我们不能忽视等待-通知机制中的一些细节。

下面我们来对比看一下前面都忽视了哪些细节。

  1. 患者到就诊门口分诊,类似于线程要去获取互斥锁;当患者被叫到时,类似线程已经获取到锁了。
  2. 大夫让患者去做检查(缺乏检测报告不能诊断病因),类似于线程要求的条件没有满足。
  3. 患者去做检查,类似于线程进入等待状态;然后大夫叫下一个患者,这个步骤我们在前面的等待-通知机制中忽视了,这个步骤对应到程序里,本质是线程释放持有的互斥锁
  4. 患者做完检查,类似于线程要求的条件已经满足;患者拿检测报告重新分诊,类似于线程需要重新获取互斥锁,这个步骤我们在前面的等待-通知机制中也忽视了

所以加上这些至关重要的细节,综合一下,就可以得出一个完整的等待-通知机制:线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁

用synchronized实现等待-通知机制

在Java语言里,等待-通知机制可以有多种实现方式,比如Java语言内置的synchronized配合wait()、notify()、notifyAll()这三个方法就能轻松实现

如何用synchronized实现互斥锁,你应该已经很熟悉了。在下面这个图里,左边有一个等待队列,同一时刻,只允许一个线程进入synchronized保护的临界区(这个临界区可以看作大夫的诊室),当有一个线程进入临界区后,其他线程就只能进入图中左边的等待队列里等待(相当于患者分诊等待)。这个等待队列和互斥锁是一对一的关系,每个互斥锁都有自己独立的等待队列。

wait()操作工作原理图

在并发程序中,当一个线程进入临界区后,由于某些条件不满足,需要进入等待状态,Java对象的wait()方法就能够满足这种需求。如上图所示,当调用wait()方法后,当前线程就会被阻塞,并且进入到右边的等待队列中,这个等待队列也是互斥锁的等待队列。 线程在进入等待队列的同时,会释放持有的互斥锁,线程释放锁后,其他线程就有机会获得锁,并进入临界区了。

那线程要求的条件满足时,该怎么通知这个等待的线程呢?很简单,就是Java对象的notify()和notifyAll()方法。我在下面这个图里为你大致描述了这个过程,当条件满足时调用notify(),会通知等待队列(互斥锁的等待队列)中的线程,告诉它条件曾经满足过

img

notify()操作工作原理图

为什么说是曾经满足过呢?因为notify()只能保证在通知时间点,条件是满足的。而被通知线程的执行时间点和通知的时间点基本上不会重合,所以当线程执行的时候,很可能条件已经不满足了(保不齐有其他线程插队)。这一点你需要格外注意。

除此之外,还有一个需要注意的点,被通知的线程要想重新执行,仍然需要获取到互斥锁(因为曾经获取的锁在调用wait()时已经释放了)。

上面我们一直强调wait()、notify()、notifyAll()方法操作的等待队列是互斥锁的等待队列,所以如果synchronized锁定的是this,那么对应的一定是this.wait()、this.notify()、this.notifyAll();如果synchronized锁定的是target,那么对应的一定是target.wait()、target.notify()、target.notifyAll() 。而且wait()、notify()、notifyAll()这三个方法能够被调用的前提是已经获取了相应的互斥锁,所以我们会发现wait()、notify()、notifyAll()都是在synchronized{}内部被调用的。如果在synchronized{}外部调用,或者锁定的this,而用target.wait()调用的话,JVM会抛出一个运行时异常:java.lang.IllegalMonitorStateException

小试牛刀:一个更好地资源分配器

等待-通知机制的基本原理搞清楚后,我们就来看看它如何解决一次性申请转出账户和转入账户的问题吧。在这个等待-通知机制中,我们需要考虑以下四个要素。

  1. 互斥锁:上一篇文章我们提到Allocator需要是单例的,所以我们可以用this作为互斥锁。
  2. 线程要求的条件:转出账户和转入账户都没有被分配过。
  3. 何时等待:线程要求的条件不满足就等待。
  4. 何时通知:当有线程释放账户时就通知。

将上面几个问题考虑清楚,可以快速完成下面的代码。需要注意的是我们使用了:

1
2
3
while(条件不满足) {
wait();
}

利用这种范式可以解决上面提到的条件曾经满足过这个问题。因为当wait()返回时,有可能条件已经发生变化了,曾经条件满足,但是现在已经不满足了,所以要重新检验条件是否满足。范式,意味着是经典做法,所以没有特殊理由不要尝试换个写法。后面在介绍“管程”的时候,我会详细介绍这个经典做法的前世今生。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Allocator {
private List als;
// 一次性申请所有资源
synchronized void apply(Object from, Object to){
// 经典写法
while(als.contains(from) || als.contains(to)){
try{
//锁被占有,则等待
wait();
}catch(Exception e){
}
}
als.add(from);
als.add(to);
}
// 归还资源
synchronized void free(Object from, Object to){
als.remove(from);
als.remove(to);
notifyAll();
}
}

尽量使用notifyAll()

在上面的代码中,我用的是notifyAll()来实现通知机制,为什么不使用notify()呢?这二者是有区别的,notify()是会随机地通知等待队列中的一个线程,而notifyAll()会通知等待队列中的所有线程。从感觉上来讲,应该是notify()更好一些,因为即便通知所有线程,也只有一个线程能够进入临界区。但那所谓的感觉往往都蕴藏着风险,实际上使用notify()也很有风险,它的风险在于可能导致某些线程永远不会被通知到。

假设我们有资源A、B、C、D,线程1申请到了AB,线程2申请到了CD,此时线程3申请AB,会进入等待队列(AB分配给线程1,线程3要求的条件不满足),线程4申请CD也会进入等待队列。我们再假设之后线程1归还了资源AB,如果使用notify()来通知等待队列中的线程,有可能被通知的是线程4,但线程4申请的是CD,所以此时线程4还是会继续等待,而真正该唤醒的线程3就再也没有机会被唤醒了。所以除非经过深思熟虑,否则尽量使用notifyAll()。

管程:并发编程的万能钥匙

什么是管程

不知道你是否曾思考过这个问题:为什么Java在1.5之前仅仅提供了synchronized关键字及wait()、notify()、notifyAll()这三个看似从天而降的方法?在刚接触Java的时候,我以为它会提供信号量这种编程原语,因为操作系统原理课程告诉我,用信号量能解决所有并发问题,结果我发现不是。后来我找到了原因:Java采用的是管程技术,synchronized关键字及wait()、notify()、notifyAll()这三个方法都是管程的组成部分。而管程和信号量是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程。但是管程更容易使用,所以Java选择了管程。

管程,对应的英文是Monitor,很多Java领域的同学都喜欢将其翻译成“监视器”,这是直译。操作系统领域一般都翻译成“管程”,这个是意译,而我自己也更倾向于使用“管程”。

所谓管程,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。翻译为Java领域的语言,就是管理类的成员变量和成员方法,让这个类是线程安全的。那管程是怎么管的呢?

MESA模型

在管程的发展史上,先后出现过三种不同的管程模型,分别是:Hasen模型、Hoare模型和MESA模型。其中,现在广泛应用的是MESA模型,并且Java管程的实现参考的也是MESA模型。所以今天我们重点介绍一下MESA模型。

在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。

我们先来看看管程是如何解决互斥问题的。

管程解决互斥问题的思路很简单,就是将共享变量及其对共享变量的操作统一封装起来。假如我们要实现一个线程安全的阻塞队列,一个最直观的想法就是:将线程不安全的队列封装起来,对外提供线程安全的操作方法,例如入队操作和出队操作。

利用管程,可以快速实现这个直观的想法。在下图中,管程X将共享变量queue这个线程不安全的队列和相关的操作入队操作enq()、出队操作deq()都封装起来了;线程A和线程B如果想访问共享变量queue,只能通过调用管程提供的enq()、deq()方法来实现;enq()、deq()保证互斥性,只允许一个线程进入管程。

不知你有没有发现,管程模型和面向对象高度契合的。估计这也是Java选择管程的原因吧。而我在前面章节介绍的互斥锁用法,其背后的模型其实就是它。

管程模型的代码化语义

那管程如何解决线程间的同步问题呢?

这个就比较复杂了,不过你可以借鉴一下我们曾经提到过的就医流程,它可以帮助你快速地理解这个问题。为进一步便于你理解,在下面,我展示了一幅MESA管程模型示意图,它详细描述了MESA模型的主要组成部分。

在管程模型里,共享变量和对共享变量的操作是被封装起来的,图中最外层的框就代表封装的意思。框的上面只有一个入口,并且在入口旁边还有一个入口等待队列。当多个线程同时试图进入管程内部时,只允许一个线程进入,其他线程则在入口等待队列中等待。这个过程类似就医流程的分诊,只允许一个患者就诊,其他患者都在门口等待。

管程里还引入了条件变量的概念,而且每个条件变量都对应有一个等待队列,如下图,条件变量A和条件变量B分别都有自己的等待队列。

MESA管程模型

条件变量条件变量等待队列的作用是什么呢?其实就是解决线程同步问题。你可以结合上面提到的阻塞队列的例子加深一下理解(阻塞队列的例子,是用管程来实现线程安全的阻塞队列,这个阻塞队列和管程内部的等待队列没有关系,本文中一定要注意阻塞队列和等待队列是不同的)。

假设有个线程T1执行阻塞队列的出队操作,执行出队操作,需要注意有个前提条件,就是阻塞队列不能是空的(空队列只能出Null值,是不允许的),阻塞队列不空这个前提条件对应的就是管程里的条件变量。 如果线程T1进入管程后恰好发现阻塞队列是空的,那怎么办呢?等待啊,去哪里等呢?就去条件变量对应的等待队列里面等。此时线程T1就去“队列不空”这个条件变量的等待队列中等待。这个过程类似于大夫发现你要去验个血,于是给你开了个验血的单子,你呢就去验血的队伍里排队。线程T1进入条件变量的等待队列后,是允许其他线程进入管程的。这和你去验血的时候,医生可以给其他患者诊治,道理都是一样的。

再假设之后另外一个线程T2执行阻塞队列的入队操作,入队操作执行成功之后,“阻塞队列不空”这个条件对于线程T1来说已经满足了,此时线程T2要通知T1,告诉它需要的条件已经满足了。当线程T1得到通知后,会从等待队列里面出来,但是出来之后不是马上执行,而是重新进入到入口等待队列里面。这个过程类似你验血完,回来找大夫,需要重新分诊。

条件变量及其等待队列我们讲清楚了,下面再说说wait()、notify()、notifyAll()这三个操作。前面提到线程T1发现“阻塞队列不空”这个条件不满足,需要进到对应的等待队列里等待。这个过程就是通过调用wait()来实现的。如果我们用对象A代表“阻塞队列不空”这个条件,那么线程T1需要调用A.wait()。同理当“则塞队列不空”这个条件满足时,线程T2需要调用A.notify()来通知A等待队列中的一个线程,此时这个等待队列里面只有线程T1。至于notifyAll()这个方法,它可以通知等待队列中的所有线程。

这里我还是来一段代码再次说明一下吧。下面的代码用管程实现了一个线程安全的阻塞队列(再次强调:这个阻塞队列和管程内部的等待队列没关系,示例代码只是用管程来实现阻塞队列,而不是解释管程内部等待队列的实现原理)。阻塞队列有两个操作分别是入队和出队,这两个方法都是先获取互斥锁,类比管程模型中的入口。

  1. 对于阻塞队列的入队操作,如果阻塞队列已满,就需要等待直到阻塞队列不满,所以这里用了notFull.await();
  2. 对于阻塞出队操作,如果阻塞队列为空,就需要等待直到阻塞队列不空,所以就用了notEmpty.await();
  3. 如果入队成功,那么阻塞队列就不空了,就需要通知条件变量:阻塞队列不空notEmpty对应的等待队列。
  4. 如果出队成功,那就阻塞队列就不满了,就需要通知条件变量:阻塞队列不满notFull对应的等待队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class BlockedQueue{
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();

// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
//入队后,通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
//出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}

在这段示例代码中,我们用了Java并发包里面的Lock和Condition,如果你看着吃力,也没关系,后面我们还会详细介绍,这个例子只是先让你明白条件变量及其等待队列是怎么回事。需要注意的是:await()和前面我们提到的wait()语义是一样的;signal()和前面我们提到的notify()语义是一样的

wait()的正确姿势

但是有一点,需要再次提醒,对于MESA管程来说,有一个编程范式,就是需要在一个while循环里面调用wait()。这个是MESA管程特有的

1
2
3
while(条件不满足) {
wait();
}

Hasen模型、Hoare模型和MESA模型的一个核心区别就是当条件满足后,如何通知相关线程。管程要求同一时刻只允许一个线程执行,那当线程T2的操作使线程T1等待的条件满足时,T1和T2究竟谁可以执行呢?

  1. Hasen模型里面,要求notify()放在代码的最后,这样T2通知完T1后,T2就结束了,然后T1再执行,这样就能保证同一时刻只有一个线程执行。
  2. Hoare模型里面,T2通知完T1后,T2阻塞,T1马上执行;等T1执行完,再唤醒T2,也能保证同一时刻只有一个线程执行。但是相比Hasen模型,T2多了一次阻塞唤醒操作。
  3. MESA管程里面,T2通知完T1后,T2还是会接着执行,T1并不立即执行,仅仅是从条件变量的等待队列进到入口等待队列里面。这样做的好处是notify()不用放到代码的最后,T2也没有多余的阻塞唤醒操作。但是也有个副作用,就是当T1再次执行的时候,可能曾经满足的条件,现在已经不满足了,所以需要以循环方式检验条件变量。

notify()何时可以使用

还有一个需要注意的地方,就是notify()和notifyAll()的使用,前面章节,我曾经介绍过,**除非经过深思熟虑,否则尽量使用notifyAll()**。那什么时候可以使用notify()呢?需要满足以下三个条件:

  1. 所有等待线程拥有相同的等待条件;
  2. 所有等待线程被唤醒后,执行相同的操作;
  3. 只需要唤醒一个线程。

比如上面阻塞队列的例子中,对于“阻塞队列不满”这个条件变量,其等待线程都是在等待“阻塞队列不满”这个条件,反映在代码里就是下面这3行代码。对所有等待线程来说,都是执行这3行代码,重点是 while 里面的等待条件是完全相同的

1
2
3
4
while (阻塞队列已满){
// 等待队列不满
notFull.await();
}

所有等待线程被唤醒后执行的操作也是相同的,都是下面这几行:

1
2
3
// 省略入队操作...
// 入队后,通知可出队
notEmpty.signal();

同时也满足第3条,只需要唤醒一个线程。所以上面阻塞队列的代码,使用signal()是可以的。

Java线程:线程的生命周期

在操作系统层面,线程也有“生老病死”,专业的说法叫有生命周期。对于有生命周期的事物,要学好它,思路非常简单,只要能搞懂生命周期中各个节点的状态转换机制就可以了。

虽然不同的开发语言对于操作系统线程进行了不同的封装,但是对于线程的生命周期这部分,基本上是雷同的。所以,我们可以先来了解一下通用的线程生命周期模型,这部分内容也适用于很多其他编程语言;然后再详细有针对性地学习一下Java中线程的生命周期。

通用的线程生命周期

通用的线程生命周期基本上可以用下图这个“五态模型”来描述。这五态分别是:初始状态、可运行状态、运行状态、休眠状态终止状态

通用线程状态转换图——五态模型

这“五态模型”的详细情况如下所示。

  1. 初始状态,指的是线程已经被创建,但是还不允许分配CPU执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建。
  2. 可运行状态,指的是线程可以分配CPU执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配CPU执行。
  3. 当有空闲的CPU时,操作系统会将其分配给一个处于可运行状态的线程,被分配到CPU的线程的状态就转换成了运行状态
  4. 运行状态的线程如果调用一个阻塞的API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放CPU使用权,休眠状态的线程永远没有机会获得CPU使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
  5. 线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。

这五种状态在不同编程语言里会有简化合并。例如,C语言的POSIX Threads规范,就把初始状态和可运行状态合并了;Java语言里则把可运行状态和运行状态合并了,这两个状态在操作系统调度层面有用,而JVM层面不关心这两个状态,因为JVM把线程调度交给操作系统处理了。

除了简化合并,这五种状态也有可能被细化,比如,Java语言里就细化了休眠状态(这个下面我们会详细讲解)。

Java中线程的生命周期

介绍完通用的线程生命周期模型,想必你已经对线程的“生老病死”有了一个大致的了解。那接下来我们就来详细看看Java语言里的线程生命周期是什么样的。

Java语言中线程共有六种状态,分别是:

  1. NEW(初始化状态)
  2. RUNNABLE(可运行/运行状态)
  3. BLOCKED(阻塞状态)
  4. WAITING(无时限等待)
  5. TIMED_WAITING(有时限等待)
  6. TERMINATED(终止状态)

这看上去挺复杂的,状态类型也比较多。但其实在操作系统层面,Java线程中的BLOCKED、WAITING、TIMED_WAITING是一种状态,即前面我们提到的休眠状态。也就是说只要Java线程处于这三种状态之一,那么这个线程就永远没有CPU的使用权

所以Java线程的生命周期可以简化为下图:

Java中的线程状态转换图

其中,BLOCKED、WAITING、TIMED_WAITING可以理解为线程导致休眠状态的三种原因。那具体是哪些情形会导致线程从RUNNABLE状态转换到这三种状态呢?而这三种状态又是何时转换回RUNNABLE的呢?以及NEW、TERMINATED和RUNNABLE状态是如何转换的?

1. RUNNABLE与BLOCKED的状态转换

只有一种场景会触发这种转换,就是线程等待synchronized的隐式锁。synchronized修饰的方法、代码块同一时刻只允许一个线程执行,其他线程只能等待,这种情况下,等待的线程就会从RUNNABLE转换到BLOCKED状态。而当等待的线程获得synchronized隐式锁时,就又会从BLOCKED转换到RUNNABLE状态。

如果你熟悉操作系统线程的生命周期的话,可能会有个疑问:线程调用阻塞式API时,是否会转换到BLOCKED状态呢?在操作系统层面,线程是会转换到休眠状态的,但是在JVM层面,Java线程的状态不会发生变化,也就是说Java线程的状态会依然保持RUNNABLE状态。JVM层面并不关心操作系统调度相关的状态,因为在JVM看来,等待CPU使用权(操作系统层面此时处于可执行状态)与等待I/O(操作系统层面此时处于休眠状态)没有区别,都是在等待某个资源,所以都归入了RUNNABLE状态。

而我们平时所谓的Java在调用阻塞式API时,线程会阻塞,指的是操作系统线程的状态,并不是Java线程的状态。

2. RUNNABLE与WAITING的状态转换

总体来说,有三种场景会触发这种转换。

第一种场景,获得synchronized隐式锁的线程,调用无参数的Object.wait()方法。其中,wait()方法我们在上一篇讲解管程的时候已经深入介绍过了,这里就不再赘述。

第二种场景,调用无参数的Thread.join()方法。其中的join()是一种线程同步方法,例如有一个线程对象thread A,当调用A.join()的时候,执行这条语句的线程会等待thread A执行完,而等待中的这个线程,其状态会从RUNNABLE转换到WAITING。当线程thread A执行完,原来等待它的线程又会从WAITING状态转换到RUNNABLE。

第三种场景,调用LockSupport.park()方法。其中的LockSupport对象,也许你有点陌生,其实Java并发包中的锁,都是基于它实现的。调用LockSupport.park()方法,当前线程会阻塞,线程的状态会从RUNNABLE转换到WAITING。调用LockSupport.unpark(Thread thread)可唤醒目标线程,目标线程的状态又会从WAITING状态转换到RUNNABLE。

3. RUNNABLE与TIMED_WAITING的状态转换

有五种场景会触发这种转换:

  1. 调用带超时参数的Thread.sleep(long millis)方法;
  2. 获得synchronized隐式锁的线程,调用带超时参数的Object.wait(long timeout)方法;
  3. 调用带超时参数的Thread.join(long millis)方法;
  4. 调用带超时参数的LockSupport.parkNanos(Object blocker, long deadline)方法;
  5. 调用带超时参数的LockSupport.parkUntil(long deadline)方法。

这里你会发现TIMED_WAITING和WAITING状态的区别,仅仅是触发条件多了超时参数

4. 从NEW到RUNNABLE状态

Java刚创建出来的Thread对象就是NEW状态,而创建Thread对象主要有两种方法。一种是继承Thread对象,重写run()方法。示例代码如下:

1
2
3
4
5
6
7
8
9
// 自定义线程对象
class MyThread extends Thread {
public void run() {
// 线程需要执行的代码
......
}
}
// 创建线程对象
MyThread myThread = new MyThread();

另一种是实现Runnable接口,重写run()方法,并将该实现类作为创建Thread对象的参数。示例代码如下:

1
2
3
4
5
6
7
8
9
10
// 实现Runnable接口
class Runner implements Runnable {
@Override
public void run() {
// 线程需要执行的代码
......
}
}
// 创建线程对象
Thread thread = new Thread(new Runner());

NEW状态的线程,不会被操作系统调度,因此不会执行。Java线程要执行,就必须转换到RUNNABLE状态。从NEW状态转换到RUNNABLE状态很简单,只要调用线程对象的start()方法就可以了,示例代码如下:

1
2
3
MyThread myThread = new MyThread();
// 从NEW状态转换到RUNNABLE状态
myThread.start();

5. 从RUNNABLE到TERMINATED状态

线程执行完 run() 方法后,会自动转换到TERMINATED状态,当然如果执行run()方法的时候异常抛出,也会导致线程终止。有时候我们需要强制中断run()方法的执行,例如 run()方法访问一个很慢的网络,我们等不下去了,想终止怎么办呢?Java的Thread类里面倒是有个stop()方法,不过已经标记为@Deprecated,所以不建议使用了。正确的姿势其实是调用interrupt()方法。

那stop()和interrupt()方法的主要区别是什么呢?

stop()方法会真的杀死线程,不给线程喘息的机会,如果线程持有ReentrantLock锁,被stop()的线程并不会自动调用ReentrantLock的unlock()去释放锁,那其他线程就再也没机会获得ReentrantLock锁,这实在是太危险了。所以该方法就不建议使用了,类似的方法还有suspend() 和 resume()方法,这两个方法同样也都不建议使用了,所以这里也就不多介绍了。

而interrupt()方法就温柔多了,interrupt()方法仅仅是通知线程,线程有机会执行一些后续操作,同时也可以无视这个通知。被interrupt的线程,是怎么收到通知的呢?一种是异常,另一种是主动检测。

当线程A处于WAITING、TIMED_WAITING状态时,如果其他线程调用线程A的interrupt()方法,会使线程A返回到RUNNABLE状态,同时线程A的代码会触发InterruptedException异常。上面我们提到转换到WAITING、TIMED_WAITING状态的触发条件,都是调用了类似wait()、join()、sleep()这样的方法,我们看这些方法的签名,发现都会throws InterruptedException这个异常。这个异常的触发条件就是:其他线程调用了该线程的interrupt()方法。

当线程A处于RUNNABLE状态时,并且阻塞在java.nio.channels.InterruptibleChannel上时,如果其他线程调用线程A的interrupt()方法,线程A会触发java.nio.channels.ClosedByInterruptException这个异常;而阻塞在java.nio.channels.Selector上时,如果其他线程调用线程A的interrupt()方法,线程A的java.nio.channels.Selector会立即返回。

上面这两种情况属于被中断的线程通过异常的方式获得了通知。还有一种是主动检测,如果线程处于RUNNABLE状态,并且没有阻塞在某个I/O操作上,例如中断计算圆周率的线程A,这时就得依赖线程A主动检测中断状态了。如果其他线程调用线程A的interrupt()方法,那么线程A可以通过isInterrupted()方法,检测是不是自己被中断了。

Java线程:创建多少线程

在Java领域,实现并发程序的主要手段就是多线程,使用多线程还是比较简单的,但是使用多少个线程却是个困难的问题。工作中,经常有人问,“各种线程池的线程数量调整成多少是合适的?”或者“Tomcat的线程数、Jdbc连接池的连接数是多少?”等等。那我们应该如何设置合适的线程数呢?

要解决这个问题,首先要分析以下两个问题:

  1. 为什么要使用多线程?
  2. 多线程的应用场景有哪些?

为什么要使用多线程?

使用多线程,本质上就是提升程序性能。不过此刻谈到的性能,可能在你脑海里还是比较笼统的,基本上就是快、快、快,这种无法度量的感性认识很不科学,所以在提升性能之前,首要问题是:如何度量性能。

度量性能的指标有很多,但是有两个指标是最核心的,它们就是延迟和吞吐量。延迟指的是发出请求到收到响应这个过程的时间;延迟越短,意味着程序执行得越快,性能也就越好。 吞吐量指的是在单位时间内能处理请求的数量;吞吐量越大,意味着程序能处理的请求越多,性能也就越好。这两个指标内部有一定的联系(同等条件下,延迟越短,吞吐量越大),但是由于它们隶属不同的维度(一个是时间维度,一个是空间维度),并不能互相转换。

我们所谓提升性能,从度量的角度,主要是降低延迟,提高吞吐量。这也是我们使用多线程的主要目的。那我们该怎么降低延迟,提高吞吐量呢?这个就要从多线程的应用场景说起了。

多线程的应用场景

要想“降低延迟,提高吞吐量”,对应的方法呢,基本上有两个方向,一个方向是优化算法,另一个方向是将硬件的性能发挥到极致。前者属于算法范畴,后者则是和并发编程息息相关了。那计算机主要有哪些硬件呢?主要是两类:一个是I/O,一个是CPU。简言之,在并发编程领域,提升性能本质上就是提升硬件的利用率,再具体点来说,就是提升I/O的利用率和CPU的利用率

估计这个时候你会有个疑问,操作系统不是已经解决了硬件的利用率问题了吗?的确是这样,例如操作系统已经解决了磁盘和网卡的利用率问题,利用中断机制还能避免CPU轮询I/O状态,也提升了CPU的利用率。但是操作系统解决硬件利用率问题的对象往往是单一的硬件设备,而我们的并发程序,往往需要CPU和I/O设备相互配合工作,也就是说,我们需要解决CPU和I/O设备综合利用率的问题。关于这个综合利用率的问题,操作系统虽然没有办法完美解决,但是却给我们提供了方案,那就是:多线程。

下面我们用一个简单的示例来说明:如何利用多线程来提升CPU和I/O设备的利用率?假设程序按照CPU计算和I/O操作交叉执行的方式运行,而且CPU计算和I/O操作的耗时是1:1。

如下图所示,如果只有一个线程,执行CPU计算的时候,I/O设备空闲;执行I/O操作的时候,CPU空闲,所以CPU的利用率和I/O设备的利用率都是50%。

单线程执行示意图

如果有两个线程,如下图所示,当线程A执行CPU计算的时候,线程B执行I/O操作;当线程A执行I/O操作的时候,线程B执行CPU计算,这样CPU的利用率和I/O设备的利用率就都达到了100%。

img

二线程执行示意图

我们将CPU的利用率和I/O设备的利用率都提升到了100%,会对性能产生了哪些影响呢?通过上面的图示,很容易看出:单位时间处理的请求数量翻了一番,也就是说吞吐量提高了1倍。此时可以逆向思维一下,如果CPU和I/O设备的利用率都很低,那么可以尝试通过增加线程来提高吞吐量

在单核时代,多线程主要就是用来平衡CPU和I/O设备的。如果程序只有CPU计算,而没有I/O操作的话,多线程不但不会提升性能,还会使性能变得更差,原因是增加了线程切换的成本。但是在多核时代,这种纯计算型的程序也可以利用多线程来提升性能。为什么呢?因为利用多核可以降低响应时间。

为便于你理解,这里我举个简单的例子说明一下:计算1+2+… … +100亿的值,如果在4核的CPU上利用4个线程执行,线程A计算[1,25亿),线程B计算[25亿,50亿),线程C计算[50,75亿),线程D计算[75亿,100亿],之后汇总,那么理论上应该比一个线程计算[1,100亿]快将近4倍,响应时间能够降到25%。一个线程,对于4核的CPU,CPU的利用率只有25%,而4个线程,则能够将CPU的利用率提高到100%。

多核执行多线程示意图

创建多少线程合适?

创建多少线程合适,要看多线程具体的应用场景。我们的程序一般都是CPU计算和I/O操作交叉执行的,由于I/O设备的速度相对于CPU来说都很慢,所以大部分情况下,I/O操作执行的时间相对于CPU计算来说都非常长,这种场景我们一般都称为I/O密集型计算;和I/O密集型计算相对的就是CPU密集型计算了,CPU密集型计算大部分场景下都是纯CPU计算。I/O密集型程序和CPU密集型程序,计算最佳线程数的方法是不同的。

下面我们对这两个场景分别说明。

对于CPU密集型计算,多线程本质上是提升多核CPU的利用率,所以对于一个4核的CPU,每个核一个线程,理论上创建4个线程就可以了,再多创建线程也只是增加线程切换的成本。所以,对于CPU密集型的计算场景,理论上“线程的数量=CPU核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU核数+1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证CPU的利用率。

对于I/O密集型的计算场景,比如前面我们的例子中,如果CPU计算和I/O操作的耗时是1:1,那么2个线程是最合适的。如果CPU计算和I/O操作的耗时是1:2,那多少个线程合适呢?是3个线程,如下图所示:CPU在A、B、C三个线程之间切换,对于线程A,当CPU从B、C切换回来时,线程A正好执行完I/O操作。这样CPU和I/O设备的利用率都达到了100%。

img

三线程执行示意图

通过上面这个例子,我们会发现,对于I/O密集型计算场景,最佳的线程数是与程序中CPU计算和I/O操作的耗时比相关的,我们可以总结出这样一个公式:

最佳线程数=1 +(I/O耗时 / CPU耗时)

我们令R=I/O耗时 / CPU耗时,综合上图,可以这样理解:当线程A执行IO操作时,另外R个线程正好执行完各自的CPU计算。这样CPU的利用率就达到了100%。

不过上面这个公式是针对单核CPU的,至于多核CPU,也很简单,只需要等比扩大就可以了,计算公式如下:

最佳线程数=CPU核数 * [ 1 +(I/O耗时 / CPU耗时)]

Java线程:为什么局部变量是线程安全的

我们一遍一遍重复再重复地讲到,多个线程同时访问共享变量的时候,会导致并发问题。那在Java语言里,是不是所有变量都是共享变量呢?工作中我发现不少同学会给方法里面的局部变量设置同步,显然这些同学并没有把共享变量搞清楚。那Java方法里面的局部变量是否存在并发问题呢?下面我们就先结合一个例子剖析下这个问题。

比如,下面代码里的 fibonacci() 这个方法,会根据传入的参数 n ,返回 1 到 n 的斐波那契数列,斐波那契数列类似这样: 1、1、2、3、5、8、13、21、34……第1项和第2项是1,从第3项开始,每一项都等于前两项之和。在这个方法里面,有个局部变量:数组 r 用来保存数列的结果,每次计算完一项,都会更新数组 r 对应位置中的值。你可以思考这样一个问题,当多个线程调用 fibonacci() 这个方法的时候,数组 r 是否存在数据竞争(Data Race)呢?

1
2
3
4
5
6
7
8
9
10
11
12
// 返回斐波那契数列
int[] fibonacci(int n) {
// 创建结果数组
int[] r = new int[n];
// 初始化第一、第二个数
r[0] = r[1] = 1; // ①
// 计算2..n
for(int i = 2; i < n; i++) {
r[i] = r[i-2] + r[i-1];
}
return r;
}

你自己可以在大脑里模拟一下多个线程调用 fibonacci() 方法的情景,假设多个线程执行到 ① 处,多个线程都要对数组r的第1项和第2项赋值,这里看上去感觉是存在数据竞争的,不过感觉再次欺骗了你。

其实很多人也是知道局部变量不存在数据竞争的,但是至于原因嘛,就说不清楚了。

那它背后的原因到底是怎样的呢?要弄清楚这个,你需要一点编译原理的知识。你知道在CPU层面,是没有方法概念的,CPU的眼里,只有一条条的指令。编译程序,负责把高级语言里的方法转换成一条条的指令。所以你可以站在编译器实现者的角度来思考“怎么完成方法到指令的转换”。

方法是如何被执行的

高级语言里的普通语句,例如上面的r[i] = r[i-2] + r[i-1];翻译成CPU的指令相对简单,可方法的调用就比较复杂了。例如下面这三行代码:第1行,声明一个int变量a;第2行,调用方法 fibonacci(a);第3行,将b赋值给c。

1
2
3
int a = 7;
int[] b = fibonacci(a);
int[] c = b;

当你调用fibonacci(a)的时候,CPU要先找到方法 fibonacci() 的地址,然后跳转到这个地址去执行代码,最后CPU执行完方法 fibonacci() 之后,要能够返回。首先找到调用方法的下一条语句的地址:也就是int[] c=b;的地址,再跳转到这个地址去执行。 你可以参考下面这个图再加深一下理解。

方法的调用过程

到这里,方法调用的过程想必你已经清楚了,但是还有一个很重要的问题,“CPU去哪里找到调用方法的参数和返回地址?”如果你熟悉CPU的工作原理,你应该会立刻想到:通过CPU的堆栈寄存器。CPU支持一种栈结构,栈你一定很熟悉了,就像手枪的弹夹,先入后出。因为这个栈是和方法调用相关的,因此经常被称为调用栈

例如,有三个方法A、B、C,他们的调用关系是A->B->C(A调用B,B调用C),在运行时,会构建出下面这样的调用栈。每个方法在调用栈里都有自己的独立空间,称为栈帧,每个栈帧里都有对应方法需要的参数和返回地址。当调用方法时,会创建新的栈帧,并压入调用栈;当方法返回时,对应的栈帧就会被自动弹出。也就是说,栈帧和方法是同生共死的

调用栈结构

利用栈结构来支持方法调用这个方案非常普遍,以至于CPU里内置了栈寄存器。虽然各家编程语言定义的方法千奇百怪,但是方法的内部执行原理却是出奇的一致:都是靠栈结构解决的。Java语言虽然是靠虚拟机解释执行的,但是方法的调用也是利用栈结构解决的。

局部变量存哪里?

我们已经知道了方法间的调用在CPU眼里是怎么执行的,但还有一个关键问题:方法内的局部变量存哪里?

局部变量的作用域是方法内部,也就是说当方法执行完,局部变量就没用了,局部变量应该和方法同生共死。此时你应该会想到调用栈的栈帧,调用栈的栈帧就是和方法同生共死的,所以局部变量放到调用栈里那儿是相当的合理。事实上,的确是这样的,局部变量就是放到了调用栈里。于是调用栈的结构就变成了下图这样。

保护局部变量的调用栈结构

这个结论相信很多人都知道,因为学Java语言的时候,基本所有的教材都会告诉你 new 出来的对象是在堆里,局部变量是在栈里,只不过很多人并不清楚堆和栈的区别,以及为什么要区分堆和栈。现在你应该很清楚了,局部变量是和方法同生共死的,一个变量如果想跨越方法的边界,就必须创建在堆里。

调用栈与线程

两个线程可以同时用不同的参数调用相同的方法,那调用栈和线程之间是什么关系呢?答案是:每个线程都有自己独立的调用栈。因为如果不是这样,那两个线程就互相干扰了。如下面这幅图所示,线程A、B、C每个线程都有自己独立的调用栈。

线程与调用栈的关系图

现在,让我们回过头来再看篇首的问题:Java方法里面的局部变量是否存在并发问题?现在你应该很清楚了,一点问题都没有。因为每个线程都有自己的调用栈,局部变量保存在线程各自的调用栈里面,不会共享,所以自然也就没有并发问题。再次重申一遍:没有共享,就没有伤害。

线程封闭

方法里的局部变量,因为不会和其他线程共享,所以没有并发问题,这个思路很好,已经成为解决并发问题的一个重要技术,同时还有个响当当的名字叫做线程封闭,比较官方的解释是:仅在单线程内访问数据。由于不存在共享,所以即便不同步也不会有并发问题,性能杠杠的。

采用线程封闭技术的案例非常多,例如从数据库连接池里获取的连接Connection,在JDBC规范里并没有要求这个Connection必须是线程安全的。数据库连接池通过线程封闭技术,保证一个Connection一旦被一个线程获取之后,在这个线程关闭Connection之前的这段时间里,不会再分配给其他线程,从而保证了Connection不会有并发问题。

用面向对象思想写好并发程序

在Java语言里,面向对象思想能够让并发编程变得更简单

那如何才能用面向对象思想写好并发程序呢?结合我自己的工作经验来看,我觉得你可以从封装共享变量、识别共享变量间的约束条件和制定并发访问策略这三个方面下手。

一、封装共享变量

并发程序,我们关注的一个核心问题,不过是解决多线程同时访问共享变量的问题。在之前我们类比过球场门票的管理,现实世界里门票管理的一个核心问题是:所有观众只能通过规定的入口进入,否则检票就形同虚设。在编程世界这个问题也很重要,编程领域里面对于共享变量的访问路径就类似于球场的入口,必须严格控制。好在有了面向对象思想,对共享变量的访问路径可以轻松把控。

面向对象思想里面有一个很重要的特性是封装,封装的通俗解释就是将属性和实现细节封装在对象内部,外界对象只能通过目标对象提供的公共方法来间接访问这些内部属性,这和门票管理模型匹配度相当的高,球场里的座位就是对象属性,球场入口就是对象的公共方法。我们把共享变量作为对象的属性,那对于共享变量的访问路径就是对象的公共方法,所有入口都要安排检票程序就相当于我们前面提到的并发访问策略。

利用面向对象思想写并发程序的思路,其实就这么简单:将共享变量作为对象属性封装在内部,对所有公共方法制定并发访问策略。就拿很多统计程序都要用到计数器来说,下面的计数器程序共享变量只有一个,就是value,我们把它作为Counter类的属性,并且将两个公共方法get()和addOne()声明为同步方法,这样Counter类就成为一个线程安全的类了。

1
2
3
4
5
6
7
8
9
public class Counter {
private long value;
synchronized long get(){
return value;
}
synchronized long addOne(){
return ++value;
}
}

当然,实际工作中,很多的场景都不会像计数器这么简单,经常要面临的情况往往是有很多的共享变量,例如,信用卡账户有卡号、姓名、身份证、信用额度、已出账单、未出账单等很多共享变量。这么多的共享变量,如果每一个都考虑它的并发安全问题,那我们就累死了。但其实仔细观察,你会发现,很多共享变量的值是不会变的,例如信用卡账户的卡号、姓名、身份证。对于这些不会发生变化的共享变量,建议你用final关键字来修饰。这样既能避免并发问题,也能很明了地表明你的设计意图,让后面接手你程序的兄弟知道,你已经考虑过这些共享变量的并发安全问题了。

二、识别共享变量间的约束条件

识别共享变量间的约束条件非常重要。因为这些约束条件,决定了并发访问策略。例如,库存管理里面有个合理库存的概念,库存量不能太高,也不能太低,它有一个上限和一个下限。关于这些约束条件,我们可以用下面的程序来模拟一下。在类SafeWM中,声明了两个成员变量upper和lower,分别代表库存上限和库存下限,这两个变量用了AtomicLong这个原子类,原子类是线程安全的,所以这两个成员变量的set方法就不需要同步了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SafeWM {
// 库存上限
private final AtomicLong upper =
new AtomicLong(0);
// 库存下限
private final AtomicLong lower =
new AtomicLong(0);
// 设置库存上限
void setUpper(long v){
upper.set(v);
}
// 设置库存下限
void setLower(long v){
lower.set(v);
}
// 省略其他业务代码
}

虽说上面的代码是没有问题的,但是忽视了一个约束条件,就是库存下限要小于库存上限,这个约束条件能够直接加到上面的set方法上吗?我们先直接加一下看看效果(如下面代码所示)。我们在setUpper()和setLower()中增加了参数校验,这乍看上去好像是对的,但其实存在并发问题,问题在于存在竞态条件。这里我顺便插一句,其实当你看到代码里出现if语句的时候,就应该立刻意识到可能存在竞态条件。

我们假设库存的下限和上限分别是(2,10),线程A调用setUpper(5)将上限设置为5,线程B调用setLower(7)将下限设置为7,如果线程A和线程B完全同时执行,你会发现线程A能够通过参数校验,因为这个时候,下限还没有被线程B设置,还是2,而5>2;线程B也能够通过参数校验,因为这个时候,上限还没有被线程A设置,还是10,而7<10。当线程A和线程B都通过参数校验后,就把库存的下限和上限设置成(7, 5)了,显然此时的结果是不符合库存下限要小于库存上限这个约束条件的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SafeWM {
// 库存上限
private final AtomicLong upper =
new AtomicLong(0);
// 库存下限
private final AtomicLong lower =
new AtomicLong(0);
// 设置库存上限
void setUpper(long v){
// 检查参数合法性
if (v < lower.get()) {
throw new IllegalArgumentException();
}
upper.set(v);
}
// 设置库存下限
void setLower(long v){
// 检查参数合法性
if (v > upper.get()) {
throw new IllegalArgumentException();
}
lower.set(v);
}
// 省略其他业务代码
}

在没有识别出库存下限要小于库存上限这个约束条件之前,我们制定的并发访问策略是利用原子类,但是这个策略,完全不能保证库存下限要小于库存上限这个约束条件。所以说,在设计阶段,我们一定要识别出所有共享变量之间的约束条件,如果约束条件识别不足,很可能导致制定的并发访问策略南辕北辙

共享变量之间的约束条件,反映在代码里,基本上都会有if语句,所以,一定要特别注意竞态条件。

三、制定并发访问策略

制定并发访问策略,是一个非常复杂的事情。应该说整个专栏都是在尝试搞定它。不过从方案上来看,无外乎就是以下“三件事”。

  1. 避免共享:避免共享的技术主要是利于线程本地存储以及为每个任务分配独立的线程。
  2. 不变模式:这个在Java领域应用的很少,但在其他领域却有着广泛的应用,例如Actor模式、CSP模式以及函数式编程的基础都是不变模式。
  3. 管程及其他同步工具:Java领域万能的解决方案是管程,但是对于很多特定场景,使用Java并发包提供的读写锁、并发容器等同步工具会更好。

接下来在咱们专栏的第二模块我会仔细讲解Java并发工具类以及他们的应用场景,在第三模块我还会讲解并发编程的设计模式,这些都是和制定并发访问策略有关的。

除了这些方案之外,还有一些宏观的原则需要你了解。这些宏观原则,有助于你写出“健壮”的并发程序。这些原则主要有以下三条。

  1. 优先使用成熟的工具类:Java SDK并发包里提供了丰富的工具类,基本上能满足你日常的需要,建议你熟悉它们,用好它们,而不是自己再“发明轮子”,毕竟并发工具类不是随随便便就能发明成功的。
  2. 迫不得已时才使用低级的同步原语:低级的同步原语主要指的是synchronized、Lock、Semaphore等,这些虽然感觉简单,但实际上并没那么简单,一定要小心使用。
  3. 避免过早优化:安全第一,并发程序首先要保证安全,出现性能瓶颈后再优化。在设计期和开发期,很多人经常会情不自禁地预估性能的瓶颈,并对此实施优化,但残酷的现实却是:性能瓶颈不是你想预估就能预估的。

Lock和Condition:隐藏在并发包中的管程

Java SDK并发包内容很丰富,包罗万象,但是我觉得最核心的还是其对管程的实现。因为理论上利用管程,你几乎可以实现并发包里所有的工具类。在前面提到过在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。Java SDK并发包通过Lock和Condition两个接口来实现管程,其中Lock用于解决互斥问题,Condition用于解决同步问题

先介绍Lock的使用,在介绍Lock的使用之前,有个问题需要你首先思考一下:Java语言本身提供的synchronized也是管程的一种实现,既然Java从语言层面已经实现了管程了,那为什么还要在SDK里提供另外一种实现呢?难道Java标准委员会还能同意“重复造轮子”的方案?很显然它们之间是有巨大区别的。那区别在哪里呢?如果能深入理解这个问题,对你用好Lock帮助很大。

再造管程的理由

你也许曾经听到过很多这方面的传说,例如在Java的1.5版本中,synchronized性能不如SDK里面的Lock,但1.6版本之后,synchronized做了很多优化,将性能追了上来,所以1.6之后的版本又有人推荐使用synchronized了。那性能是否可以成为“重复造轮子”的理由呢?显然不能。因为性能问题优化一下就可以了,完全没必要“重复造轮子”。

到这里,关于这个问题,你是否能够想出一条理由来呢?如果你细心的话,也许能想到一点。那就是我们前面在介绍死锁问题的时候,提出了一个破坏不可抢占条件方案,但是这个方案synchronized没有办法解决。原因是synchronized申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。但我们希望的是:

对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。

如果我们重新设计一把互斥锁去解决这个问题,那该怎么设计呢?我觉得有三种方案。

  1. 能够响应中断。synchronized的问题是,持有锁A后,如果尝试获取锁B失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁A。这样就破坏了不可抢占条件了。
  2. 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  3. 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

这三种方案可以全面弥补synchronized的问题。到这里相信你应该也能理解了,这三个方案就是“重复造轮子”的主要原因,体现在API上,就是Lock接口的三个方法。详情如下:

1
2
3
4
5
6
7
8
// 支持中断的API
void lockInterruptibly()
throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();

如何保证可见性

Java SDK里面Lock的使用,有一个经典的范例,就是try{}finally{},需要重点关注的是在finally里面释放锁。这个范例无需多解释,你看一下下面的代码就明白了。但是有一点需要解释一下,那就是可见性是怎么保证的。你已经知道Java里多线程的可见性是通过Happens-Before规则保证的,而synchronized之所以能够保证可见性,也是因为有一条synchronized相关的规则:synchronized的解锁 Happens-Before 于后续对这个锁的加锁。那Java SDK里面Lock靠什么保证可见性呢?例如在下面的代码中,线程T1对value进行了+=1操作,那后续的线程T2能够看到value的正确结果吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public void addOne() {
// 获取锁
rtl.lock();
try {
value+=1;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}

答案必须是肯定的。Java SDK里面锁的实现非常复杂,这里我就不展开细说了,但是原理还是需要简单介绍一下:它是利用了volatile相关的Happens-Before规则。Java SDK里面的ReentrantLock,内部持有一个volatile 的成员变量state,获取锁的时候,会读写state的值;解锁的时候,也会读写state的值(简化后的代码如下面所示)。也就是说,在执行value+=1之前,程序先读写了一次volatile变量state,在执行value+=1之后,又读写了一次volatile变量state。根据相关的Happens-Before规则:

  1. 顺序性规则:对于线程T1,value+=1 Happens-Before 释放锁的操作unlock();
  2. volatile变量规则:由于state = 1会先读取state,所以线程T1的unlock()操作Happens-Before线程T2的lock()操作;
  3. 传递性规则:线程 T1的value+=1 Happens-Before 线程 T2 的 lock() 操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
class SampleLock {
volatile int state;
// 加锁
lock() {
// 省略代码无数
state = 1;
}
// 解锁
unlock() {
// 省略代码无数
state = 0;
}
}

所以说,后续线程T2能够看到value的正确结果。

什么是可重入锁

如果你细心观察,会发现我们创建的锁的具体类名是ReentrantLock,这个翻译过来叫可重入锁,这个概念前面我们一直没有介绍过。所谓可重入锁,顾名思义,指的是线程可以重复获取同一把锁。例如下面代码中,当线程T1执行到 ① 处时,已经获取到了锁 rtl ,当在 ① 处调用 get()方法时,会在 ② 再次对锁 rtl 执行加锁操作。此时,如果锁 rtl 是可重入的,那么线程T1可以再次加锁成功;如果锁 rtl 是不可重入的,那么线程T1此时会被阻塞。

除了可重入锁,可能你还听说过可重入函数,可重入函数怎么理解呢?指的是线程可以重复调用?显然不是,所谓可重入函数,指的是多个线程可以同时调用该函数,每个线程都能得到正确结果;同时在一个线程内支持线程切换,无论被切换多少次,结果都是正确的。多线程可以同时执行,还支持线程切换,这意味着什么呢?线程安全啊。所以,可重入函数是线程安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public int get() {
// 获取锁
rtl.lock(); ②
try {
return value;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
public void addOne() {
// 获取锁
rtl.lock();
try {
value = 1 + get(); ①
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}

公平锁与非公平锁

在使用ReentrantLock的时候,你会发现ReentrantLock这个类有两个构造函数,一个是无参构造函数,一个是传入fair参数的构造函数。fair参数代表的是锁的公平策略,如果传入true就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。

1
2
3
4
5
6
7
8
9
//无参构造函数:默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//根据公平策略参数创建锁
public ReentrantLock(boolean fair){
sync = fair ? new FairSync()
: new NonfairSync();
}

在前面介绍过入口等待队列,锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。

用锁的最佳实践

你已经知道,用锁虽然能解决很多并发问题,但是风险也是挺高的。可能会导致死锁,也可能影响性能。这方面有是否有相关的最佳实践呢?有,还很多。但是我觉得最值得推荐的是并发大师Doug Lea《Java并发编程:设计原则与模式》一书中,推荐的三个用锁的最佳实践,它们分别是:

  1. 永远只在更新对象的成员变量时加锁
  2. 永远只在访问可变的成员变量时加锁
  3. 永远不在调用其他对象的方法时加锁

这三条规则,前两条估计你一定会认同,最后一条你可能会觉得过于严苛。但是我还是倾向于你去遵守,因为调用其他对象的方法,实在是太不安全了,也许“其他”方法里面有线程sleep()的调用,也可能会有奇慢无比的I/O操作,这些都会严重影响性能。更可怕的是,“其他”类的方法可能也会加锁,然后双重加锁就可能导致死锁。

并发问题,本来就难以诊断,所以你一定要让你的代码尽量安全,尽量简单,哪怕有一点可能会出问题,都要努力避免。

总结

Java SDK 并发包里的Lock接口里面的每个方法,你可以感受到,都是经过深思熟虑的。除了支持类似synchronized隐式加锁的lock()方法外,还支持超时、非阻塞、可中断的方式获取锁,这三种方式为我们编写更加安全、健壮的并发程序提供了很大的便利。希望你以后在使用锁的时候,一定要仔细斟酌。

除了并发大师Doug Lea推荐的三个最佳实践外,你也可以参考一些诸如:减少锁的持有时间、减小锁的粒度等业界广为人知的规则,其实本质上它们都是相通的,不过是在该加锁的地方加锁而已。你可以自己体会,自己总结,最终总结出自己的一套最佳实践来。

接着再来详细聊聊Java SDK并发包里的Condition,Condition实现了管程模型里面的条件变量

我们提到过Java 语言内置的管程里只有一个条件变量,而Lock&Condition实现的管程是支持多个条件变量的,这是二者的一个重要区别。

在很多并发场景下,支持多个条件变量能够让我们的并发程序可读性更好,实现起来也更容易。例如,实现一个阻塞队列,就需要两个条件变量。

那如何利用两个条件变量快速实现阻塞队列呢?

一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队),这个例子我们前面在介绍管程的时候详细说过,这里就不再赘述。相关的代码,我这里重新列了出来,你可以温故知新一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class BlockedQueue{
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();

// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
//入队后,通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
//出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}

不过,这里你需要注意,Lock和Condition实现的管程,**线程等待和通知需要调用await()、signal()、signalAll()**,它们的语义和wait()、notify()、notifyAll()是相同的。但是不一样的是,Lock&Condition实现的管程里只能使用前面的await()、signal()、signalAll(),而后面的wait()、notify()、notifyAll()只有在synchronized实现的管程里才能使用。如果一不小心在Lock&Condition实现的管程里调用了wait()、notify()、notifyAll(),那程序可就彻底玩儿完了。

Java SDK并发包里的Lock和Condition不过就是管程的一种实现而已,管程你已经很熟悉了,那Lock和Condition的使用自然是小菜一碟。下面我们就来看看在知名项目Dubbo中,Lock和Condition是怎么用的。不过在开始介绍源码之前,我还先要介绍两个概念:同步和异步。

同步与异步

我们平时写的代码,基本都是同步的。但最近几年,异步编程大火。那同步和异步的区别到底是什么呢?通俗点来讲就是调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步

比如在下面的代码里,有一个计算圆周率小数点后100万位的方法pai1M(),这个方法可能需要执行俩礼拜,如果调用pai1M()之后,线程一直等着计算结果,等俩礼拜之后结果返回,就可以执行 printf("hello world")了,这个属于同步;如果调用pai1M()之后,线程不用等待计算结果,立刻就可以执行 printf("hello world"),这个就属于异步。

1
2
3
4
5
6
7
// 计算圆周率小说点后100万位 
String pai1M() {
//省略代码无数
}

pai1M()
printf("hello world")

同步,是Java代码默认的处理方式。如果你想让你的程序支持异步,可以通过下面两种方式来实现:

  1. 调用方创建一个子线程,在子线程中执行方法调用,这种调用我们称为异步调用;
  2. 方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接return,这种方法我们一般称为异步方法。

Dubbo源码分析

其实在编程领域,异步的场景还是挺多的,比如TCP协议本身就是异步的,我们工作中经常用到的RPC调用,在TCP协议层面,发送完RPC请求后,线程是不会等待RPC的响应结果的。可能你会觉得奇怪,平时工作中的RPC调用大多数都是同步的啊?这是怎么回事呢?

其实很简单,一定是有人帮你做了异步转同步的事情。例如目前知名的RPC框架Dubbo就给我们做了异步转同步的事情,那它是怎么做的呢?下面我们就来分析一下Dubbo的相关源码。

对于下面一个简单的RPC调用,默认情况下sayHello()方法,是个同步方法,也就是说,执行service.sayHello(“dubbo”)的时候,线程会停下来等结果。

1
2
3
4
DemoService service = 初始化部分省略
String message =
service.sayHello("dubbo");
System.out.println(message);

如果此时你将调用线程dump出来的话,会是下图这个样子,你会发现调用线程阻塞了,线程状态是TIMED_WAITING。本来发送请求是异步的,但是调用线程却阻塞了,说明Dubbo帮我们做了异步转同步的事情。通过调用栈,你能看到线程是阻塞在DefaultFuture.get()方法上,所以可以推断:Dubbo异步转同步的功能应该是通过DefaultFuture这个类实现的。

调用栈信息

不过为了理清前后关系,还是有必要分析一下调用DefaultFuture.get()之前发生了什么。DubboInvoker的108行调用了DefaultFuture.get(),这一行很关键,我稍微修改了一下列在了下面。这一行先调用了request(inv, timeout)方法,这个方法其实就是发送RPC请求,之后通过调用get()方法等待RPC返回结果。

1
2
3
4
5
6
7
8
9
public class DubboInvoker{
Result doInvoke(Invocation inv){
// 下面这行就是源码中108行
// 为了便于展示,做了修改
return currentClient
.request(inv, timeout)
.get();
}
}

DefaultFuture这个类是很关键,我把相关的代码精简之后,列到了下面。不过在看代码之前,你还是有必要重复一下我们的需求:当RPC返回结果之前,阻塞调用线程,让调用线程等待;当RPC返回结果后,唤醒调用线程,让调用线程重新执行。不知道你有没有似曾相识的感觉,这不就是经典的等待-通知机制吗?这个时候想必你的脑海里应该能够浮现出管程的解决方案了。有了自己的方案之后,我们再来看看Dubbo是怎么实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 创建锁与条件变量
private final Lock lock
= new ReentrantLock();
private final Condition done
= lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() ||
cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC结果是否已经返回
boolean isDone() {
return response != null;
}
// RPC结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}

调用线程通过调用get()方法等待RPC返回结果,这个方法里面,你看到的都是熟悉的“面孔”:调用lock()获取锁,在finally里面调用unlock()释放锁;获取锁后,通过经典的在循环中调用await()方法来实现等待。

当RPC结果返回时,会调用doReceived()方法,这个方法里面,调用lock()获取锁,在finally里面调用unlock()释放锁,获取锁后通过调用signal()来通知调用线程,结果已经返回,不用继续等待了。

至此,Dubbo里面的异步转同步的源码就分析完了,有没有觉得还挺简单的?最近这几年,工作中需要异步处理的越来越多了,其中有一个主要原因就是有些API本身就是异步API。例如websocket也是一个异步的通信协议,如果基于这个协议实现一个简单的RPC,你也会遇到异步转同步的问题。现在很多公有云的API本身也是异步的,例如创建云主机,就是一个异步的API,调用虽然成功了,但是云主机并没有创建成功,你需要调用另外一个API去轮询云主机的状态。如果你需要在项目内部封装创建云主机的API,你也会面临异步转同步的问题,因为同步的API更易用。

总结

Lock&Condition是管程的一种实现,所以能否用好Lock和Condition要看你对管程模型理解得怎么样。管程的技术前面我们已经专门用了一篇文章做了介绍,你可以结合着来学,理论联系实践,有助于加深理解。

Lock&Condition实现的管程相对于synchronized实现的管程来说更加灵活、功能也更丰富。

结合我自己的经验,我认为了解原理比了解实现更能让你快速学好并发编程,所以没有介绍太多Java SDK并发包里锁和条件变量是如何实现的。但如果你对实现感兴趣,可以参考《Java并发编程的艺术》一书的第5章《Java中的锁》,里面详细介绍了实现原理,我觉得写得非常好。

另外,专栏里对DefaultFuture的代码缩减了很多,如果你感兴趣,也可以去看看完整版。
Dubbo的源代码在Github上,DefaultFuture的路径是:incubator-dubbo/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java。

Semaphore如何快速实现一个限流器

Semaphore,现在普遍翻译为“信号量”,以前也曾被翻译成“信号灯”,因为类似现实生活里的红绿灯,车辆能不能通行,要看是不是绿灯。同样,在编程世界里,线程能不能执行,也要看信号量是不是允许。

信号量是由大名鼎鼎的计算机科学家迪杰斯特拉(Dijkstra)于1965年提出,在这之后的15年,信号量一直都是并发编程领域的终结者,直到1980年管程被提出来,我们才有了第二选择。目前几乎所有支持并发编程的语言都支持信号量机制,所以学好信号量还是很有必要的。

下面我们首先介绍信号量模型,之后介绍如何使用信号量,最后我们再用信号量来实现一个限流器。

信号量模型

信号量模型还是很简单的,可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down()和up()。你可以结合下图来形象化地理解。

信号量模型图

这三个方法详细的语义具体如下所示。

  • init():设置计数器的初始值。
  • down():计数器的值减1;如果此时计数器的值小于0,则当前线程将被阻塞,否则当前线程可以继续执行。
  • up():计数器的值加1;如果此时计数器的值小于或者等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

这里提到的init()、down()和up()三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。在Java SDK里面,信号量模型是由java.util.concurrent.Semaphore实现的,Semaphore这个类能够保证这三个方法都是原子操作。

如果你觉得上面的描述有点绕的话,可以参考下面这个代码化的信号量模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Semaphore{
// 计数器
int count;
// 等待队列
Queue queue;
// 初始化操作
Semaphore(int c){
this.count=c;
}
//
void down(){
this.count--;
if(this.count<0){
//将当前线程插入等待队列
//阻塞当前线程
}
}
void up(){
this.count++;
if(this.count<=0) {
//移除等待队列中的某个线程T
//唤醒线程T
}
}
}

这里再插一句,信号量模型里面,down()、up()这两个操作历史上最早称为P操作和V操作,所以信号量模型也被称为PV原语。另外,还有些人喜欢用semWait()和semSignal()来称呼它们,虽然叫法不同,但是语义都是相同的。在Java SDK并发包里,down()和up()对应的则是acquire()和release()。

如何使用信号量

通过上文,你应该会发现信号量的模型还是很简单的,那具体该如何使用呢?其实你想想红绿灯就可以了。十字路口的红绿灯可以控制交通,得益于它的一个关键规则:车辆在通过路口前必须先检查是否是绿灯,只有绿灯才能通行。这个规则和我们前面提到的锁规则是不是很类似?

其实,信号量的使用也是类似的。这里我们还是用累加器的例子来说明信号量的使用吧。在累加器的例子里面,count+=1操作是个临界区,只允许一个线程执行,也就是说要保证互斥。那这种情况用信号量怎么控制呢?

其实很简单,就像我们用互斥锁一样,只需要在进入临界区之前执行一下down()操作,退出临界区之前执行一下up()操作就可以了。下面是Java代码的示例,acquire()就是信号量里的down()操作,release()就是信号量里的up()操作

1
2
3
4
5
6
7
8
9
10
11
12
13
static int count;
//初始化信号量
static final Semaphore s
= new Semaphore(1);
//用信号量保证互斥
static void addOne() {
s.acquire();
try {
count+=1;
} finally {
s.release();
}
}

下面我们再来分析一下,信号量是如何保证互斥的。假设两个线程T1和T2同时访问addOne()方法,当它们同时调用acquire()的时候,由于acquire()是一个原子操作,所以只能有一个线程(假设T1)把信号量里的计数器减为0,另外一个线程(T2)则是将计数器减为-1。对于线程T1,信号量里面的计数器的值是0,大于等于0,所以线程T1会继续执行;对于线程T2,信号量里面的计数器的值是-1,小于0,按照信号量模型里对down()操作的描述,线程T2将被阻塞。所以此时只有线程T1会进入临界区执行count+=1;

当线程T1执行release()操作,也就是up()操作的时候,信号量里计数器的值是-1,加1之后的值是0,小于等于0,按照信号量模型里对up()操作的描述,此时等待队列中的T2将会被唤醒。于是T2在T1执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。

快速实现一个限流器

上面的例子,我们用信号量实现了一个最简单的互斥锁功能。估计你会觉得奇怪,既然有Java SDK里面提供了Lock,为啥还要提供一个Semaphore ?其实实现一个互斥锁,仅仅是 Semaphore的部分功能,Semaphore还有一个功能是Lock不容易实现的,那就是:Semaphore可以允许多个线程访问一个临界区

现实中还有这种需求?有的。比较常见的需求就是我们工作中遇到的各种池化资源,例如连接池、对象池、线程池等等。其中,你可能最熟悉数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。

其实前不久,我在工作中也遇到了一个对象池的需求。所谓对象池呢,指的是一次性创建出N个对象,之后所有的线程重复利用这N个对象,当然对象在被释放前,也是不允许其他线程使用的。对象池,可以用List保存实例对象,这个很简单。但关键是限流器的设计,这里的限流,指的是不允许多于N个线程同时进入临界区。那如何快速实现一个这样的限流器呢?这种场景,我立刻就想到了信号量的解决方案。

信号量的计数器,在上面的例子中,我们设置成了1,这个1表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数N,就能完美解决对象池的限流问题了。下面就是对象池的示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}

// 创建对象池
ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
// 通过对象池获取t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});

我们用一个List来保存对象实例,用Semaphore实现限流器。关键的代码是ObjPool里面的exec()方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用acquire()方法(与之匹配的是在finally里面调用release()方法),假设对象池的大小是10,信号量的计数器初始化为10,那么前10个线程调用acquire()方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在acquire()方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过pool.remove(0)实现的),分配完之后会执行一个回调函数func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过pool.add(t)实现的),同时调用release()方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于0,那么说明有线程在等待,此时会自动唤醒等待的线程。

简言之,使用信号量,我们可以轻松地实现一个限流器,使用起来还是非常简单的。

总结

信号量在Java语言里面名气并不算大,但是在其他语言里却是很有知名度的。Java在并发编程领域走的很快,重点支持的还是管程模型。 管程模型理论上解决了信号量模型的一些不足,主要体现在易用性和工程化方面,例如用信号量解决我们曾经提到过的阻塞队列问题,就比管程模型麻烦很多,你如果感兴趣,可以课下了解和尝试一下。

ReadWriteLock:如何快速实现一个完备的缓存?

前面我们介绍了管程和信号量这两个同步原语在Java语言中的实现,理论上用这两个同步原语中任何一个都可以解决所有的并发问题。那Java SDK并发包里为什么还有很多其他的工具类呢?原因很简单:分场景优化性能,提升易用性

今天我们就介绍一种非常普遍的并发场景:读多写少场景。实际工作中,为了优化性能,我们经常会使用缓存,例如缓存元数据、缓存基础数据等,这就是一种典型的读多写少应用场景。缓存之所以能提升性能,一个重要的条件就是缓存的数据一定是读多写少的,例如元数据和基础数据基本上不会发生变化(写少),但是使用它们的地方却很多(读多)。

针对读多写少这种并发场景,Java SDK并发包提供了读写锁——ReadWriteLock,非常容易使用,并且性能很好。

那什么是读写锁呢?

读写锁,并不是Java语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则:

  1. 允许多个线程同时读共享变量;
  2. 只允许一个线程写共享变量;
  3. 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,而互斥锁是不允许的,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。

快速实现一个缓存

下面我们就实践起来,用ReadWriteLock快速实现一个通用的缓存工具类。

在下面的代码中,我们声明了一个Cache类,其中类型参数K代表缓存里key的类型,V代表缓存里value的类型。缓存的数据保存在Cache类内部的HashMap里面,HashMap不是线程安全的,这里我们使用读写锁ReadWriteLock 来保证其线程安全。ReadWriteLock 是一个接口,它的实现类是ReentrantReadWriteLock,通过名字你应该就能判断出来,它是支持可重入的。下面我们通过rwl创建了一把读锁和一把写锁。

Cache这个工具类,我们提供了两个方法,一个是读缓存方法get(),另一个是写缓存方法put()。读缓存需要用到读锁,读锁的使用和前面我们介绍的Lock的使用是相同的,都是try{}finally{}这个编程范式。写缓存则需要用到写锁,写锁的使用和读锁是类似的。这样看来,读写锁的使用还是非常简单的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Cache {
final Map m = new HashMap<>();
final ReadWriteLock rwl = new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
// 读缓存
V get(K key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
// 写缓存
V put(K key, V value) {
w.lock();
try { return m.put(key, v); }
finally { w.unlock(); }
}
}

如果你曾经使用过缓存的话,你应该知道使用缓存首先要解决缓存数据的初始化问题。缓存数据的初始化,可以采用一次性加载的方式,也可以使用按需加载的方式。

如果源头数据的数据量不大,就可以采用一次性加载的方式,这种方式最简单(可参考下图),只需在应用启动的时候把源头数据查询出来,依次调用类似上面示例代码中的put()方法就可以了。

缓存一次性加载示意图

如果源头数据量非常大,那么就需要按需加载了,按需加载也叫懒加载,指的是只有当应用查询缓存,并且数据不在缓存里的时候,才触发加载源头相关数据进缓存的操作。下面你可以结合文中示意图看看如何利用ReadWriteLock 来实现缓存的按需加载。

缓存按需加载示意图

实现缓存的按需加载

文中下面的这段代码实现了按需加载的功能,这里我们假设缓存的源头是数据库。需要注意的是,如果缓存中没有缓存目标对象,那么就需要从数据库中加载,然后写入缓存,写缓存需要用到写锁,所以在代码中的⑤处,我们调用了 w.lock() 来获取写锁。

另外,还需要注意的是,在获取写锁之后,我们并没有直接去查询数据库,而是在代码⑥⑦处,重新验证了一次缓存中是否存在,再次验证如果还是不存在,我们才去查询数据库并更新本地缓存。为什么我们要再次验证呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Cache {
final Map m = new HashMap<>();
final ReadWriteLock rwl = new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();

V get(K key) {
V v = null;
//读缓存
r.lock(); ①
try {
v = m.get(key); ②
} finally{
r.unlock(); ③
}
//缓存中存在,返回
if(v != null) { ④
return v;
}
//缓存中不存在,查询数据库
w.lock(); ⑤
try {
//再次验证
//其他线程可能已经查询过数据库
v = m.get(key); ⑥
if(v == null){ ⑦
//查询数据库
v=省略代码无数
m.put(key, v);
}
} finally{
w.unlock();
}
return v;
}
}

原因是在高并发的场景下,有可能会有多线程竞争写锁。假设缓存是空的,没有缓存任何东西,如果此时有三个线程T1、T2和T3同时调用get()方法,并且参数key也是相同的。那么它们会同时执行到代码⑤处,但此时只有一个线程能够获得写锁,假设是线程T1,线程T1获取写锁之后查询数据库并更新缓存,最终释放写锁。此时线程T2和T3会再有一个线程能够获取写锁,假设是T2,如果不采用再次验证的方式,此时T2会再次查询数据库。T2释放写锁之后,T3也会再次查询一次数据库。而实际上线程T1已经把缓存的值设置好了,T2、T3完全没有必要再次查询数据库。所以,再次验证的方式,能够避免高并发场景下重复查询数据的问题。

读写锁的升级与降级

上面按需加载的示例代码中,在①处获取读锁,在③处释放读锁,那是否可以在②处的下面增加验证缓存并更新缓存的逻辑呢?详细的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//读缓存
r.lock(); ①
try {
v = m.get(key); ②
if (v == null) {
w.lock();
try {
//再次验证并更新缓存
//省略详细代码
} finally{
w.unlock();
}
}
} finally{
r.unlock(); ③
}

这样看上去好像是没有问题的,先是获取读锁,然后再升级为写锁,对此还有个专业的名字,叫锁的升级。可惜ReadWriteLock并不支持这种升级。在上面的代码示例中,读锁还没有释放,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。锁的升级是不允许的,这个你一定要注意。

不过,虽然锁的升级是不允许的,但是锁的降级却是允许的。以下代码来源自ReentrantReadWriteLock的官方示例,略做了改动。你会发现在代码①处,获取读锁的时候线程还是持有写锁的,这种锁的降级是支持的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class CachedData {
Object data;
volatile boolean cacheValid;
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
//写锁
final Lock w = rwl.writeLock();

void processCachedData() {
// 获取读锁
r.lock();
if (!cacheValid) {
// 释放读锁,因为不允许读锁的升级
r.unlock();
// 获取写锁
w.lock();
try {
// 再次检查状态
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 释放写锁前,降级为读锁
// 降级是可以的
r.lock(); ①
} finally {
// 释放写锁
w.unlock();
}
}
// 此处仍然持有读锁
try {use(data);}
finally {r.unlock();}
}
}

总结

读写锁类似于ReentrantLock,也支持公平模式和非公平模式。读锁和写锁都实现了 java.util.concurrent.locks.Lock接口,所以除了支持lock()方法外,tryLock()、lockInterruptibly() 等方法也都是支持的。但是有一点需要注意,那就是只有写锁支持条件变量,读锁是不支持条件变量的,读锁调用newCondition()会抛出UnsupportedOperationException异常。

今天我们用ReadWriteLock实现了一个简单的缓存,这个缓存虽然解决了缓存的初始化问题,但是没有解决缓存数据与源头数据的同步问题,这里的数据同步指的是保证缓存数据和源头数据的一致性。解决数据同步问题的一个最简单的方案就是超时机制。所谓超时机制指的是加载进缓存的数据不是长久有效的,而是有时效的,当缓存的数据超过时效,也就是超时之后,这条数据在缓存中就失效了。而访问缓存中失效的数据,会触发缓存重新从源头把数据加载进缓存。

当然也可以在源头数据发生变化时,快速反馈给缓存,但这个就要依赖具体的场景了。例如MySQL作为数据源头,可以通过近实时地解析binlog来识别数据是否发生了变化,如果发生了变化就将最新的数据推送给缓存。另外,还有一些方案采取的是数据库和缓存的双写方案。

总之,具体采用哪种方案,还是要看应用的场景。

  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2022 ZHU
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信