Java Concurrency [2]

PREFACE

第一部分讲述了Java Concurrency的基础知识,第二部分来简单的介绍一下Java的java.util.concurrent类库。这篇写着写着还是觉得官方文档比较清楚详细23333,建议还是把整个concurrent package的官方文档读了。

LIBRARY

ConcurrentHashMap

ConcurrentHashMap提供和HashMap相同的功能,正如名字所表现的一样,ConcurrentHashMap同时还提供了同步机制。ConcurrentHashMap和对整个方法使用synchronized来达到同步的效果不同,它采用了更加细粒度的锁,以提高并发和可扩展性。

查询retrieve操作(包括get)通常不会被阻塞,因此在A线程对Map进行更新update操作(包括put、remove)的时候,线程B可以完成查询操作。查询操作得到的结果是最近在查询操作之前完成的更新操作所改变的状态,也就是说所有更新操作满足happens-before查询操作。但对于聚合aggregate操作(putAll、clear),并发的查询操作可能会得到一些“中间状态”,比如查到部分加入或者删除的entries。

在通过iterator对ConcurrentHashMap的entries进行遍历的同时,我们还可以对ConcurrentHashMap进行更新操作,而不会抛出ConcurrentModificationException的异常(注意,在设计中iterators一次只被一个线程所使用??不知道指的是一个iterator只被一个线程所使用(个人倾向)?还是在同一时间只能有一个iterator被使用)

See also:

CopyOnWriteArrayList

CopyOnWriteArrayList是synchronized list的替代版本,它在某些常见的情形(iteration的频率远远大于更新的频率)下提供了更好的并发行,且省去了在iteration时候对锁的需要,和ConcurrentHashMap类似,在iterator的过程中,CopyOnWriteArrayList的修改并不会引起ConcurrentModificationException。

CopyOnWriteArrayList的每次更新操作都会创建一个新的副本。Iterators保留的数组引用是iteration开始时CopyOnWriteArrayList底层数组的快照,因为该数组永远都不会改变,所以我们也无需额外的同步。

Queue

BlockingQueue提供put/take和offer/poll。如果Queue是满的,put操作会阻塞直到有空间;如果Queue是空的take操作会阻塞直到Queue之中有元素。而offer(E e)会在Queue为满时,直觉返回false,也可以offer(E e, long timeout, TimeUnit unit)等待一段时间,同理pool。Queue通常会用在生产者消费者模式上,一边生产put,一边消费take,Queue简直天生为此。

See also:

  • LinkedBlockingQueue
  • ArrayBlockingQueue
  • DelayQueue
  • PriorityBlockingQueue

Latch

Latch中文名字门闩,常常被用在线程C等待其他线程A、B条件后再继续执行的情形。CountDownLatch。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// in somewhere
CountDownLatch latch = new CountDownLatch(2);

// thread A
// ... do something
latch.countDown();

// thread B
// ... do something
latch.countDown();

// thread C
// ... do something
latch.await(); // waiting util latch count down to zero
// ... do something

还有另外一种类似的情形,不过不是Latch,一组线程都互相等待直到大家都达到某个边界情况。当A、B都执行到barrier.await()时,线程A、B才会向下继续执行。还可以通过添加Runnable的形式CyclicBarrier(int parties, Runnable barrierAction),增加后置的处理工作,当线程A、B都执行到await时,barrierAction会被执行。CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
// in somewhere
CyclicBarrier barrier = new CyclicBarrier(2);

// thread A
// ... do something
latch.await();

// thread B
// ... do something
latch.await();
Semaphore

信号量Semaphore这个词让我回想起了那一天曾经被操作系统的过桥问题支配深深的恐惧。Semaphore适合控制资源的线程访问数量的场景,通过acquire来获得权限,release来释放权限。如下面程序所示,当线程A、B分别acquire,且还未release之前,线程C会被阻塞在acquire上,直到A或B调用release。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// in somewhere
Semaphore available = new Semaphore(2);

// thread A
available.acquire();
// ...do something
available.release();

// thread B
available.acquire();
// ...do something
available.release();

// thread C
available.acquire();
// ...do something
available.release();

Remaining…

还是把官方文档全都看一遍吧,写起来太无聊!!!

HOW TO WRITE A CONCURRENT PROGRAM

Explicitly Creating Threads for Tasks

最简单的方式就是显式的创建Thread并且调用start()开始任务并发的执行。

通常通过显式的创建Thread执行并发任务有两种方式。

继承Thread

继承Thread,将所需执行的任务重写入run()方法中。使用start()方法来启动线程。

注意是调用start()来启动线程,而不是run()。调用run()只会像正常的方法调用一样,顺序执行run()中的代码,执行完毕后继续执行调用run()之后的代码。

1
2
3
4
5
6
7
8
9
10
11
public class MyTask extends Thread{
public void run() {
// your task ...
}
}

public class Main {
public static void main(String ... args) {
new MyTask().start();
}
}

实现Runnable

实现Runnable接口,将所需执行的任务写入run()方法中。再用实现Runnable接口的对象作为参数传入Thread的构造函数来实例化Thread对象。最后调用Thread的start()来启动线程。

1
2
3
4
5
6
7
8
9
10
11
class MyTask implements Runnable {
public void run() {
// you task ...
}
}

public class Main {
public static void main(String ... args) {
new Thread(new MyTask()).start();
}
}

两种方式都最终需要显示的创建Thread来执行任务。这些Threads执行完毕以后就会被销毁。然而Thread的创建和销毁都需要较多的时间,这就给程序带来了延迟和资源负担。如果执行的任务都是轻量级的,且很频繁,正如很多服务器程序一样,为每个请求都创建一个新的Thread会消耗系统大量的计算资源。

The Executor Framework

前面两种方式线程执行完任务就被销毁了,不免会有点浪费。为了减少线程的创建和销毁,让线程活得重于泰山,线程池应运而生。线程池有两个重要的组成部分:Queue(任务寄存处),Worker(处理任务的工人)。最简单的线程池实现如下。提交任务#submit就是把任务放入任务寄存处中。处理任务的工人不断的从任务寄存处中取出任务#take,然后执行任务#run。呃,这样最简单的线程池就实现了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class NaiveThreadPool {
private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
private Thread[] workers = new Worker[4];

public NaiveThreadPool() {
for (Thread worker : workers) {
worker.start();
}
}

public void submit(Runnable task) {
tasks.add(task);
}

class Worker extends Thread {
public void run() {
for (;;) {
try {
Runnable task = tasks.take();
task.run();
} catch (InterruptedException e) {
return;
} catch (Exception ex) {
// 任务执行异常
}
}
}
}

其余的什么corePoolSize、maximumPoolSize、不同种类的Queue、线程过期策略、任务拒绝策略,建议自行看文档,已经很详细了。