logo头像
书院的十三先生

Java并发编程入门(七)轻松理解wait和notify以及使用场景

一、使用场景

wait经常被用于生产者和消费者模式,如图:

1.Producer负责生成任务并添加到TaskQueue中
2.Consumer从TaskQueue中获取任务并处理,如果获取步到则进入wait队列
3.TaskQueue中添加新的任务后,唤醒wait队列中的Consumer
4.被唤醒后的Consumer进入执行任务队列,重新获取任务并执行

代码参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import java.util.Random;
import java.util.Vector;

/**
* @ClassName WaitDemo2
* @Description TODO
* @Author 铿然一叶
* @Date 2019/10/3 11:43
* @Version 1.0
* javashizhan.com
**/
public class WaitDemo2 {

public static void main(String[] args) {
//初始化任务队列
TaskQueue taskQueue = new TaskQueue();

//启动任务consumer
for (int i = 0; i < 4; i++) {
new Thread(new Consumer(taskQueue)).start();
}

//休眠一段时间等到consumer都启动好
sleep(2000);

//启动任务生产者Producer
new Thread(new Producer(taskQueue)).start();
}

private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

//任务生产者
class Producer implements Runnable {

private TaskQueue taskQueue;

public Producer(TaskQueue taskQueue) {
this.taskQueue = taskQueue;
}

public void run() {
while(true) {
generateTask();
sleep(2000);
}
}

//生成任务
private void generateTask() {
int taskNum = (int)(Math.random()*5+1);
long timestamp = System.currentTimeMillis();
for (int i = 0; i < taskNum; i++) {
String task = "Task_" + timestamp + "_" + i;
taskQueue.addTask(task);
}
}

private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

//任务消费者
class Consumer implements Runnable {

private TaskQueue taskQueue;

public Consumer(TaskQueue taskQueue) {
this.taskQueue = taskQueue;
}

public void run() {
execTask();
}

private void execTask() {
while (true) {
//获取任务,如果获取不到,会进入wait队列
String task = taskQueue.removeTask();
//任务不为null则模拟执行
if (null != task) {
System.out.println(task + " be done. Caller is " + Thread.currentThread().getName());
}
}
}
}

//任务队列
class TaskQueue {

private Vector<String> taskVector = new Vector<String>();

//添加任务
public synchronized void addTask(String task) {
System.out.println(task + " has generated.");
taskVector.add(task);
//唤醒Consumer
notify();
}

//移除任务
public synchronized String removeTask() {
if (!taskVector.isEmpty()) {
return taskVector.remove(0);
} else {
try {
System.out.println(Thread.currentThread().getName() + " waiting...");
//没有任务则进入等待队列
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

return null;
}
}

运行日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Thread-0 waiting...
Thread-3 waiting...
Thread-2 waiting...
Thread-1 waiting...
Task_1570104227120_0 has generated.
Task_1570104227120_1 has generated.
Task_1570104227120_0 be done. Caller is Thread-0
Task_1570104227120_2 has generated.
Task_1570104227120_3 has generated.
Task_1570104227120_1 be done. Caller is Thread-3
Task_1570104227120_3 be done. Caller is Thread-1
Thread-2 waiting...
Task_1570104227120_2 be done. Caller is Thread-0
Thread-1 waiting...
Thread-3 waiting...
Thread-0 waiting...
Task_1570104229120_0 has generated.
Task_1570104229120_1 has generated.
Task_1570104229120_2 has generated.
Task_1570104229120_3 has generated.
Task_1570104229120_4 has generated.
Task_1570104229120_0 be done. Caller is Thread-2
Task_1570104229120_2 be done. Caller is Thread-2
Task_1570104229120_4 be done. Caller is Thread-2
Thread-2 waiting...
Task_1570104229120_1 be done. Caller is Thread-0
Thread-1 waiting...
Thread-0 waiting...
Task_1570104229120_3 be done. Caller is Thread-3
Thread-3 waiting...
Task_1570104231121_0 has generated.
Task_1570104231121_1 has generated.
Task_1570104231121_2 has generated.
Task_1570104231121_0 be done. Caller is Thread-2
Thread-2 waiting...
Task_1570104231121_1 be done. Caller is Thread-0
Thread-0 waiting...
Task_1570104231121_2 be done. Caller is Thread-1
Thread-1 waiting...

从日志可以看出,生成的任务数和线程被调用次数是相等的。

二、TaskQueue中的this.wait发生了什么

我们通过一副图来理解this.wait:

1.wait操作将调用线程放入wait队列中,等待唤醒。这里的调用线程不是TaskQueue,而是调用了removeTask()方法的Consumer。
2.wait队列归属一个对象,这里是this,而this是TaskQueue的一个实例对象,因此这个wait队列归属一个TaskQueue实例。

注:很多人容易犯的错误是谁调用了wait,那么谁就进入wait队列,而实际上进入wait队列的应该是调用线程:

通过jstack命令查看堆栈信息可以验证这一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
2019-10-03 13:04:52
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.40-b25 mixed mode):

"DestroyJavaVM" #16 prio=5 os_prio=0 tid=0x00000000035b2800 nid=0x8fe0 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"Thread-3" #15 prio=5 os_prio=0 tid=0x000000001f1cd800 nid=0x324c in Object.wait() [0x00000000200ae000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076bedff18> (a com.javashizhan.concurrent.demo.base.TaskQueue)
at java.lang.Object.wait(Object.java:502)
at com.javashizhan.concurrent.demo.base.TaskQueue.removeTask(WaitDemo2.java:108)
- locked <0x000000076bedff18> (a com.javashizhan.concurrent.demo.base.TaskQueue)
at com.javashizhan.concurrent.demo.base.Consumer.execTask(WaitDemo2.java:84)
at com.javashizhan.concurrent.demo.base.Consumer.run(WaitDemo2.java:79)
at java.lang.Thread.run(Thread.java:745)

"Thread-2" #14 prio=5 os_prio=0 tid=0x000000001f1cd000 nid=0x84d0 in Object.wait() [0x000000001ffaf000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076bedff18> (a com.javashizhan.concurrent.demo.base.TaskQueue)
at java.lang.Object.wait(Object.java:502)
at com.javashizhan.concurrent.demo.base.TaskQueue.removeTask(WaitDemo2.java:108)
- locked <0x000000076bedff18> (a com.javashizhan.concurrent.demo.base.TaskQueue)
at com.javashizhan.concurrent.demo.base.Consumer.execTask(WaitDemo2.java:84)
at com.javashizhan.concurrent.demo.base.Consumer.run(WaitDemo2.java:79)
at java.lang.Thread.run(Thread.java:745)

"Thread-1" #13 prio=5 os_prio=0 tid=0x000000001f1cb000 nid=0x4404 in Object.wait() [0x000000001feae000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076bedff18> (a com.javashizhan.concurrent.demo.base.TaskQueue)
at java.lang.Object.wait(Object.java:502)
at com.javashizhan.concurrent.demo.base.TaskQueue.removeTask(WaitDemo2.java:108)
- locked <0x000000076bedff18> (a com.javashizhan.concurrent.demo.base.TaskQueue)
at com.javashizhan.concurrent.demo.base.Consumer.execTask(WaitDemo2.java:84)
at com.javashizhan.concurrent.demo.base.Consumer.run(WaitDemo2.java:79)
at java.lang.Thread.run(Thread.java:745)

"Thread-0" #12 prio=5 os_prio=0 tid=0x000000001f0d8800 nid=0x6750 in Object.wait() [0x000000001fdae000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076bedff18> (a com.javashizhan.concurrent.demo.base.TaskQueue)
at java.lang.Object.wait(Object.java:502)
at com.javashizhan.concurrent.demo.base.TaskQueue.removeTask(WaitDemo2.java:108)
- locked <0x000000076bedff18> (a com.javashizhan.concurrent.demo.base.TaskQueue)
at com.javashizhan.concurrent.demo.base.Consumer.execTask(WaitDemo2.java:84)
at com.javashizhan.concurrent.demo.base.Consumer.run(WaitDemo2.java:79)
at java.lang.Thread.run(Thread.java:745)

"Service Thread" #11 daemon prio=9 os_prio=0 tid=0x000000001f123800 nid=0x60ac runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"C1 CompilerThread3" #10 daemon prio=9 os_prio=2 tid=0x000000001f099000 nid=0x8094 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"C2 CompilerThread2" #9 daemon prio=9 os_prio=2 tid=0x000000001f084800 nid=0x91d0 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #8 daemon prio=9 os_prio=2 tid=0x000000001f07f000 nid=0x8f44 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #7 daemon prio=9 os_prio=2 tid=0x000000001f07b000 nid=0x9034 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"Monitor Ctrl-Break" #6 daemon prio=5 os_prio=0 tid=0x000000001f056800 nid=0x47dc runnable [0x000000001f6ae000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
- locked <0x000000076c010b70> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
- locked <0x000000076c010b70> (a java.io.InputStreamReader)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at com.intellij.rt.execution.application.AppMainV2$1.run(AppMainV2.java:64)

"Attach Listener" #5 daemon prio=5 os_prio=2 tid=0x000000001efe9000 nid=0x8514 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=2 tid=0x000000001f038000 nid=0x861c runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=1 tid=0x00000000036aa000 nid=0x6f48 in Object.wait() [0x000000001efaf000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076bc06f58> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x000000076bc06f58> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=2 tid=0x00000000036a3000 nid=0x6e7c in Object.wait() [0x000000001eeaf000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076bc06998> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
- locked <0x000000076bc06998> (a java.lang.ref.Reference$Lock)

"VM Thread" os_prio=2 tid=0x000000001cfea000 nid=0x1780 runnable

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00000000035c8000 nid=0x8a28 runnable

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00000000035c9800 nid=0x8e94 runnable

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00000000035cb000 nid=0x9128 runnable

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00000000035cd800 nid=0x8f60 runnable

"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x00000000035d0000 nid=0xec0 runnable

"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x00000000035d1000 nid=0x9100 runnable

"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x00000000035d4000 nid=0x4104 runnable

"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x00000000035d5800 nid=0x6f44 runnable

"VM Periodic Task Thread" os_prio=2 tid=0x000000001f0d7000 nid=0x7978 waiting on condition

JNI global references: 22

类似如下代码证明是线程Consumer进入了wait队列:

1
2
"Thread-3" #15 prio=5 os_prio=0 tid=0x000000001f1cd800 nid=0x324c in Object.wait() [0x00000000200ae000]
java.lang.Thread.State: WAITING (on object monitor)

三、notify和notifyAll

1.notify唤醒队列中的一个等待对象
2.notifyAll唤醒队列中的所有等待对象

在这个例子中任务是一个个添加的,因此调用notify没有问题;如果是批量添加任务,只调用一次notify,那么就可能出现只有一个consumer被唤醒,只有一个consumer在处理任务,其他consumer被饿死;而如果添加一个任务就调用notifyAll,那么会无谓的唤醒多余的Consumer,没有任务可执行的Consumer被唤醒后,又立即进入wait队列。

很多时候了避免consumer被意外饿死,保险起见都统一调用notifyAll而不是notify,实际也不至于都如此,只要理解了原理,合理分析就可以知道应该调用哪个。

判断使用notify的依据有:
1.所有等待线程拥有相同的等待条件;
2.所有等待线程被唤醒后,执行相同的操作;
3.只需要唤醒一个线程。

四、wait和sleep

1.wait会释放锁而sleep不会
2.wait只能在synchronized代码块中执行,而sleep没有限制
3.wait的使用更像事件监听机制,工作线程监听某个事件(如任务队列),事件到达后通知工作线程,而sleep的使用更像轮询机制,不断的轮询任务队列中是否又任务。在处理任务队列这个场景上使用wait更优一些。

五、总结

1.wait和notify,notifyAll只能出现在synchronized代码块中
2.obj.wait()方法基于obj对象生成了一个它的wait队列
3.调用obj.wait的同步代码块的线程进入了等待队列,而不是obj自己进入等待队列
4.使用notify和notifyAll要根据实际场景分析

end.


站点: http://javashizhan.com/


微信公众号: