并发笔记三 JUC(一)

2021/11/12

JUC (java.util.concurrent) 包主要包含三部分内容:

  1. 同步控制的工具
  2. 线程池
  3. 并发容器

# 同步控制

# 重入锁 ReentrantLock:synchronized 的功能扩展

重入锁可以完全替代关键字 synchronized。JDK5 之前重入锁性能远好于 synchronized,但是 JDK6 之后,两者性能差距不大。

java.util.concurrent.locks.ReentrantLock 使用示例:

public class ReenterLock implements Runnable {
    public static ReentrantLock lock = new ReentrantLock();
    public static int i = 0;

    @Override
    public void run() {
        for (int j = 0; j < 10000000; j++) {
            lock.lock();
            try {
                i++;
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReenterLock t = new ReenterLock();
        Thread t1 = new Thread(t);
        Thread t2 = new Thread(t);
        t1.start();t2.start();
        t1.join();t2.join();
//      输出 20000000
        System.out.println(i);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 与 synchronized 对比

  • 相同点:

    • 都是加锁方式同步,而且都是阻塞式的同步
  • 不同点:

    1. synchronized 是 Java 关键字,需要 JVM 实现。

      ReentrantLock 则是 API 层面的互斥锁,需要 lock() 和 unlock() 方法配合 try/finally 语句块来完成。

    2. 中断响应

      对于 synchronized 来说,如果一个线程在等待锁,结果只有两种情况:要么获得锁继续执行,要么保持等待。

      对于 重入锁 来说,则还有另外一种可能:线程可以被中断。lockInterruptibly() 方法表示在等待锁的过程中,遇到中断会抛出 InterruptedException 异常。由此可以取消对锁的请求。

    3. 锁申请等待限时

      除了等待外部通知,限时等待也可以是避免死锁的一种方法。

      public boolean tryLock()
      public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException
      
      1
      2

      当前线程会尝试获得锁,如果锁没有被其他线程占用,则申请锁成功。

      由于线程不会傻傻地等待,而是不停地尝试,只要执行足够久,总会得到所有需要的资源。

    4. 公平锁

      synchronized 产生的锁是非公平的

      重入锁则允许对公平性进行设置:

      public ReentrantLock(boolean fair)
      
      1

# 重入锁主要要素

  1. 原子状态。

    使用CAS操作来存储当前锁的状态,判断锁是否已被其他线程持有。

  2. 等待队列。

    所有没有请求到锁的线程,都会进入等待队列进行等待。

  3. 阻塞源语 park()unpark()

    用来挂起和恢复线程。没有得到所得线程将会被挂起。

# 重入锁与 Condition

Condition 接口提供的基本方法如下:

// 使当前线程等待,并释放当前锁。其他线程使用 signal() / signalAll() 方法时,线程会重新获得锁并继续执行。
// 遇到中断会抛出异常
void await() throws InterruptedException;
// 遇到中断不会抛出异常
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个在等待中的线程
void signal();
// 唤醒所有在等待中的线程
void signalAll();
1
2
3
4
5
6
7
8
9
10
11
12

在 ArrayBlockingQueue 中使用了 Condition

# 信号量 Semaphore:允许多个线程同时访问

// 构造函数
public Semaphore(int permits);
public Semaphore(int permits, boolean fair);

// 尝试获得一个(或指定个数)准入许可
public void acquire();
public void acquire(int permits);
public void acquireUninterruptibly();

// 尝试获得,成功返回true,失败返回false,不会等待
public boolean tryAcquire();
public boolean tryAcquire(long timeout, TimeUnit unit);

// 释放许可
public void release()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 读写锁 ReadWriteLock

读写分离锁可以减少锁竞争,提升性能。

非阻塞 阻塞
阻塞 阻塞

只有 读-读 是非阻塞的。

示例代码

# 倒计数器 CountDownLatch

Latch 意思为门闩。可以让某一线程等待直到倒计数结束。

类似火箭发射之前需要检查所有设备,检查全部通过之后才能发射。

示例代码

# 循环栅栏 CyclicBarrier

// 构造函数,指定线程数量,和需要执行的操作
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
1
2
3

可以实现让一组线程等待至某个状态之后再全部同时执行。

叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。 叫做栅栏,大概是描述所有线程被栅栏挡住了,当都达到时,一起跳过栅栏执行。

示例代码

# LockSupport

可以在线程内任意位置阻塞,且不会出现死锁

  • 和 Object.wait() 和 notify() 对比

    无需先获得某个对象的锁。

  • 和 Thread.suspend() 相比

    1. 如果 resume() 先于 suspend() 执行,会导致线程无法执行。

      而 park() 和 unpark() 会对每个线程维持一个许可(boolean值)

      • unpark调用时,如果当前线程还未进入park,则许可为true。

      • park调用时,判断许可是否为true: 如果是true,则继续往下执行; 如果是false,则等待,直到许可为true。

    2. 线程Dump中,suspend() 挂起状态显示 Runnable;park() 则会显示 WAITING(parking)

此外:中断的时候 park 不会抛出 InterruptedException 异常,但是可以从 Thread.interrupted() 等方法中获得中断标记。

# 代码示例

# ReadWriteLock

使用读写锁,耗时2s多,只有2个写操作是串行。

使用重入锁,耗时超过20s,所有读写线程之间必须互相等待。

public class ReadWriteLockDemo {
    private static Lock lock = new ReentrantLock();
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    private int value;

    // 模拟读操作
    public Object handleRead(Lock lock) throws InterruptedException {
        lock.lock();
        try {
            Thread.sleep(1000);
            return value;
        } finally {
            lock.unlock();
        }
    }

    // 模拟写操作
    public void handleWrite(Lock lock, int index) throws InterruptedException {
        lock.lock();
        try {
            Thread.sleep(1000);
            value = index;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ReadWriteLockDemo demo = new ReadWriteLockDemo();
        Runnable readRunnable = new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleRead(readLock);
//                    demo.handleRead(lock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable writeRunnable = new Runnable() {
            @Override
            public void run() {
                try {
                    demo.handleWrite(writeLock, new Random().nextInt());
//                    demo.handleWrite(lock, new Random().nextInt());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        for (int i = 0; i < 18; i++) {
            new Thread(readRunnable).start();
        }
        for (int i = 18; i < 20; i++) {
            new Thread(writeRunnable).start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

# CountDownLatch

public class CountDownLatchDemo implements Runnable {
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(10);
    private static final CountDownLatchDemo DEMO = new CountDownLatchDemo();

    @Override
    public void run() {
        try {
            // 模拟任务
            Thread.sleep(new Random().nextInt(10) * 1000);
            System.out.println(Thread.currentThread().getId() + " check complete.");
            COUNT_DOWN_LATCH.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            executorService.submit(DEMO);
        }
        // 等待所有任务完毕
        COUNT_DOWN_LATCH.await();

        System.out.println("Fire!");
        executorService.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# CyclicBarrier

public class CyclicBarrierDemo {
    public static class Soldier implements Runnable {
        private String soldier;
        private final CyclicBarrier cyclicBarrier;

        public Soldier(String soldier, CyclicBarrier cyclicBarrier) {
            this.soldier = soldier;
            this.cyclicBarrier = cyclicBarrier;
        }

        private void doWork() {
            try {
                Thread.sleep(Math.abs(new Random().nextInt() % 10000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldier + " : 任务完成!");
        }

        @Override
        public void run() {
            try {
                // 等待所有士兵到齐
                cyclicBarrier.await();
                doWork();
                // 等待士兵完成工作
                cyclicBarrier.await();
            } catch (BrokenBarrierException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static class BarrierRun implements Runnable {
        private boolean flag;
        private int N;

        public BarrierRun(boolean flag, int n) {
            this.flag = flag;
            N = n;
        }

        @Override
        public void run() {
            if (flag) {
                System.out.println("司令:士兵【" + N + "】个,任务完成!");
            } else {
                System.out.println("司令:士兵【" + N + "】个,集合完毕!");
                flag = true;
            }
        }
    }

    public static void main(String[] args) {
        final int N = 10;
        Thread[] allSoldier = new Thread[N];
        boolean flag = false;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(N, new BarrierRun(flag, N));
        System.out.println("集合队伍!");
        for (int i = 0; i < N; i++) {
            System.out.println("士兵【" + i + "】报道!");
            allSoldier[i] = new Thread(new Soldier("士兵" + i, cyclicBarrier));
            allSoldier[i].start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
Last Updated: 2021/12/7 下午2:23:53