并发笔记三 JUC(一)
JUC (java.util.concurrent) 包主要包含三部分内容:
- 同步控制的工具
- 线程池
- 并发容器
# 同步控制
# 重入锁 ReentrantLock:synchronized 的功能扩展
重入锁可以完全替代关键字 synchronized。JDK5 之前重入锁性能远好于 synchronized,但是 JDK6 之后,两者性能差距不大。
java.util.concurrent.locks.ReentrantLock 使用示例:
public class ReenterLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;
@Override
public void run() {
for (int j = 0; j < 10000000; j++) {
lock.lock();
try {
i++;
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLock t = new ReenterLock();
Thread t1 = new Thread(t);
Thread t2 = new Thread(t);
t1.start();t2.start();
t1.join();t2.join();
// 输出 20000000
System.out.println(i);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 与 synchronized 对比
相同点:
- 都是加锁方式同步,而且都是阻塞式的同步
不同点:
synchronized 是 Java 关键字,需要 JVM 实现。
ReentrantLock 则是 API 层面的互斥锁,需要 lock() 和 unlock() 方法配合 try/finally 语句块来完成。
中断响应
对于 synchronized 来说,如果一个线程在等待锁,结果只有两种情况:要么获得锁继续执行,要么保持等待。
对于 重入锁 来说,则还有另外一种可能:线程可以被中断。
lockInterruptibly()方法表示在等待锁的过程中,遇到中断会抛出 InterruptedException 异常。由此可以取消对锁的请求。锁申请等待限时
除了等待外部通知,限时等待也可以是避免死锁的一种方法。
public boolean tryLock() public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException1
2当前线程会尝试获得锁,如果锁没有被其他线程占用,则申请锁成功。
由于线程不会傻傻地等待,而是不停地尝试,只要执行足够久,总会得到所有需要的资源。
公平锁
synchronized 产生的锁是非公平的
重入锁则允许对公平性进行设置:
public ReentrantLock(boolean fair)1
# 重入锁主要要素
原子状态。
使用CAS操作来存储当前锁的状态,判断锁是否已被其他线程持有。
等待队列。
所有没有请求到锁的线程,都会进入等待队列进行等待。
阻塞源语
park()和unpark()。用来挂起和恢复线程。没有得到所得线程将会被挂起。
# 重入锁与 Condition
Condition 接口提供的基本方法如下:
// 使当前线程等待,并释放当前锁。其他线程使用 signal() / signalAll() 方法时,线程会重新获得锁并继续执行。
// 遇到中断会抛出异常
void await() throws InterruptedException;
// 遇到中断不会抛出异常
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个在等待中的线程
void signal();
// 唤醒所有在等待中的线程
void signalAll();
2
3
4
5
6
7
8
9
10
11
12
在 ArrayBlockingQueue 中使用了 Condition
# 信号量 Semaphore:允许多个线程同时访问
// 构造函数
public Semaphore(int permits);
public Semaphore(int permits, boolean fair);
// 尝试获得一个(或指定个数)准入许可
public void acquire();
public void acquire(int permits);
public void acquireUninterruptibly();
// 尝试获得,成功返回true,失败返回false,不会等待
public boolean tryAcquire();
public boolean tryAcquire(long timeout, TimeUnit unit);
// 释放许可
public void release()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 读写锁 ReadWriteLock
读写分离锁可以减少锁竞争,提升性能。
| 读 | 写 | |
|---|---|---|
| 读 | 非阻塞 | 阻塞 |
| 写 | 阻塞 | 阻塞 |
只有 读-读 是非阻塞的。
# 倒计数器 CountDownLatch
Latch 意思为门闩。可以让某一线程等待直到倒计数结束。
类似火箭发射之前需要检查所有设备,检查全部通过之后才能发射。
# 循环栅栏 CyclicBarrier
// 构造函数,指定线程数量,和需要执行的操作
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
2
3
可以实现让一组线程等待至某个状态之后再全部同时执行。
叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。 叫做栅栏,大概是描述所有线程被栅栏挡住了,当都达到时,一起跳过栅栏执行。
# LockSupport
可以在线程内任意位置阻塞,且不会出现死锁。
和 Object.wait() 和 notify() 对比
无需先获得某个对象的锁。
和 Thread.suspend() 相比
如果 resume() 先于 suspend() 执行,会导致线程无法执行。
而 park() 和 unpark() 会对每个线程维持一个许可(boolean值):
unpark调用时,如果当前线程还未进入park,则许可为true。
park调用时,判断许可是否为true: 如果是true,则继续往下执行; 如果是false,则等待,直到许可为true。
线程Dump中,suspend() 挂起状态显示
Runnable;park() 则会显示WAITING(parking)
此外:中断的时候 park 不会抛出 InterruptedException 异常,但是可以从 Thread.interrupted() 等方法中获得中断标记。
# 代码示例
# ReadWriteLock
使用读写锁,耗时2s多,只有2个写操作是串行。
使用重入锁,耗时超过20s,所有读写线程之间必须互相等待。
public class ReadWriteLockDemo {
private static Lock lock = new ReentrantLock();
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
private int value;
// 模拟读操作
public Object handleRead(Lock lock) throws InterruptedException {
lock.lock();
try {
Thread.sleep(1000);
return value;
} finally {
lock.unlock();
}
}
// 模拟写操作
public void handleWrite(Lock lock, int index) throws InterruptedException {
lock.lock();
try {
Thread.sleep(1000);
value = index;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReadWriteLockDemo demo = new ReadWriteLockDemo();
Runnable readRunnable = new Runnable() {
@Override
public void run() {
try {
demo.handleRead(readLock);
// demo.handleRead(lock);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable writeRunnable = new Runnable() {
@Override
public void run() {
try {
demo.handleWrite(writeLock, new Random().nextInt());
// demo.handleWrite(lock, new Random().nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 18; i++) {
new Thread(readRunnable).start();
}
for (int i = 18; i < 20; i++) {
new Thread(writeRunnable).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# CountDownLatch
public class CountDownLatchDemo implements Runnable {
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(10);
private static final CountDownLatchDemo DEMO = new CountDownLatchDemo();
@Override
public void run() {
try {
// 模拟任务
Thread.sleep(new Random().nextInt(10) * 1000);
System.out.println(Thread.currentThread().getId() + " check complete.");
COUNT_DOWN_LATCH.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.submit(DEMO);
}
// 等待所有任务完毕
COUNT_DOWN_LATCH.await();
System.out.println("Fire!");
executorService.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# CyclicBarrier
public class CyclicBarrierDemo {
public static class Soldier implements Runnable {
private String soldier;
private final CyclicBarrier cyclicBarrier;
public Soldier(String soldier, CyclicBarrier cyclicBarrier) {
this.soldier = soldier;
this.cyclicBarrier = cyclicBarrier;
}
private void doWork() {
try {
Thread.sleep(Math.abs(new Random().nextInt() % 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier + " : 任务完成!");
}
@Override
public void run() {
try {
// 等待所有士兵到齐
cyclicBarrier.await();
doWork();
// 等待士兵完成工作
cyclicBarrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
e.printStackTrace();
}
}
}
public static class BarrierRun implements Runnable {
private boolean flag;
private int N;
public BarrierRun(boolean flag, int n) {
this.flag = flag;
N = n;
}
@Override
public void run() {
if (flag) {
System.out.println("司令:士兵【" + N + "】个,任务完成!");
} else {
System.out.println("司令:士兵【" + N + "】个,集合完毕!");
flag = true;
}
}
}
public static void main(String[] args) {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
CyclicBarrier cyclicBarrier = new CyclicBarrier(N, new BarrierRun(flag, N));
System.out.println("集合队伍!");
for (int i = 0; i < N; i++) {
System.out.println("士兵【" + i + "】报道!");
allSoldier[i] = new Thread(new Soldier("士兵" + i, cyclicBarrier));
allSoldier[i].start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66