wait/notify
Object.wait()和Object.wait(long),以及Object.notify()和Object.notifyAll()可用于等待和通知。wait用于暂停一个线程,生命周期状态变为WAITING;notify用于唤醒一个暂停的线程。执行Object.wait的称为等待线程。Java中的任何对象都能实现等待和通知。
实现Object.wait的通用模板:
synchronized (someObject) {
// 等待通知线程更新
while (someCondition) {
someObject.wait();
}
doActioin();
}
someObject.wait会以原子的形式使执行线程暂停,并释放其持有的someObject内部锁,此时wait调用并未返回。notify方法可以唤醒任意一个线程,被唤醒的线程需再次申请someObject对应的内部锁。被唤醒线程再次持有someObject对应的内部锁的情况下继续执行wait剩余的指令,直到wait返回。
通知线程模板
synchronized (someObject) {
update();
someObject.notify();
}
等待线程被唤醒后继续占有处理器,该线程可能会被其它线程占用内部锁导致上下文切换。notify方法要尽量靠近临界区结束的地方。
JVM会为每个对象维护一个入口集(EntrySet)用于存储申请该对象内部锁的线程。此外还会维护一个等待集(WaitSet)的队列,用于存储对象上的等待线程。Object.wait将当前线程暂停并释放相应内部锁的同时将调用线程存入等待集。对象 的notify方法调用后,任意一个线程会被唤醒,被唤醒线程仍在等待集中,直到相应线程持有内部锁,wait会将线程从等待集中移除,接着wait方法返回。
wait/notify问题
- 过早唤醒。通知线程唤醒了等待在某个对象的线程,但此时等等待线程所需的条件并没有得到满足。可以利用Conditional解决。
- 信号丢失。等待线程没有结果条件判断直接调用wait,那么会出现一种情形:通知线程在等待线程进入临界区之前更新了共享变量,并调用了notify,然而等待线程直接调用wait,此时就没有其它线程进行通知。在需要调用notifyAll的地方调用了notify也会导致这种现象,如随机唤醒的线程使用了其它的保护条件。
- 欺骗性唤醒。在没有任何线程调用notify的情况下被唤醒
Thread.join(long)方法可以等待一个线程结束后继续执行,它的底层采用了wait/notify来实现。调用时,如果检测到目标线程未结束,就调用wait来暂停当前线程。JVM会在目标线程run方法结束后调用notifyAll通知所有等待线程。目标线程充当了同步对象(Object.wait),
条件变量
wait/notify过于底层,且存在过早唤醒的问题。Object.wait(long)也无法区分其返回是由于超时还是被通知线程唤醒等问题。
Conditional可以作为wait/notify的替代品来实现通知,解决了过早唤醒、区分超时等待的问题。Conditional接口的await/signal/signalAll相当于Object的wait/notify/notifyAll。
Conditional模板与wait/notify一样。
class ConditionalUsage {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public void action() throws InterruptException {
lock.lock();
try {
while (con) {
condition.await();
}
doAction();
} finally {
lock.unlock();
}
}
public void notify() throws InterruptException {
lock.lock();
try {
updateState();
condition.signal();
} finally {
lock.unlock();
}
}
}
await/signal执行时,执行线程需持有条件变量的 显示锁 。循环语句与目标动作在同一个显示锁引导的临界区内,防止欺骗性唤醒和信号丢失。await和wait类似,暂停当前线程的同时也使当前线程释放持有的锁,这是调用还没有返回。被唤醒的等待线程继续运行后,也需要申请相应的显示锁,被唤醒线程申请到锁后,await调用才返回。
通过在应用代码层为每个保护条件建立一个条件变量来解决过早唤醒的问题,让不同的线程在不同的条件变量上等待;并让通知线程尽唤醒相应保护条件的线程。Condition在唤醒时,能够分辨是超时还是正常的signal唤醒。Condition.awaitUntil(Date)返回true,则说明是正常唤醒,可以通过返回值做进一步的动作。
CountDownLatch
CountDownLatch可以实现一个或多个线程等待其它线程完成一组特定的操作(先决操作)之后才继续运行。
CountDownLatch内部维护一个 未完成先决操作 数量的计数器。CountDownLatch.countDown()每调用一次,计数器减1。CountDownLatch.await()方法相当于一个保护方法,保护条件为“计数器值为0”。当计数器不为0时,调用await的线程被暂停,countDown()相当于通知方法,当计数器值为0时,唤醒该实例上所有等待线程。
一个CountDownLatch实例只能实现一次等待和唤醒,当计数器达到0后,继续调用countDown不会有任何作用,也不会唤醒线程;同时继续调用await不会暂停线程。
调用await和countDown时无需加锁。CountDownLatch.await(long, TimeUnit)允许设置超时时间,返回值为true说明为正常唤醒。
CyclicBarrier
CyclicBarrier用于多个线程互相等到代码执行到某个地方,这是这些线程才能继续执行。Cyclic意思是该类可以重复使用。使用CyclicBarrier的线程称为参与方(party),参与方执行执行CyclicBarrier.await()就可以实现等待。CyclicBarrier内部维护了显示锁,可以区分最后一个调用await的参与方。除了最后一个线程外的任何参与方调用await都会被暂停(WATING)。最后一个线程执行await的时候,会使得所有参与方都被唤醒,但是最后一个线程本身不会被唤醒。
CyclicBarrier(int, Runnable)允许指定一个任务,在最后一个线程执行await时、其它线程唤醒前执行该任务。
阻塞队列
JDK1.5引入了juc.BlockingQueue接口,它定义了一种线程安全队列——阻塞队列。BlockingQueue的常见实现类有ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue。阻塞队列可按容量是否受限划分为有界队列和无界队列。有界队列由应用程序指定,无界队列最大存储容量为Integer.MAX_VALUE。使用有界队列的好处是“反压”——当消费者能力跟不上时,队列积满,生产者会被暂停。
put操作将数据入队,生产者执行的put操作对后续消费者的take操作是可见的、有序的。
ArrayBlockingQueue在内部实现时,put和take操作用的是同一把锁,可能导致锁的高争用,造成过多的上下文切换。
LinkedBlockingQueue既能做有界队列,又能做无界队列。它的优点是在内部实现时,put、take操作时用的是两把显示锁,这降低了锁争用的可能性。
SynchronousQueue是空间为1的有界队列,它的内部不维护任何存储空间。生产者执行了SynchronousQueue.put时,该生产者被暂停,知道有消费线程执行take。类似,如果消费者执行了take,它会被暂停,知道生成者执行了put。
SynchronousQueue和ArrayBlockingQueue既支持非公平调度也支持公平调度。LinkedBlockingQueue只支持非公平调度。
阻塞队列支持非阻塞调用,offer(E)和poll。offer通过返回false表示入队失败,poll通过返回null表示队列为空。
信号量
信号量Semaphore用来控制对一个资源的访问次数,Semaphore(int)用来指定一个资源能同时被访问几次。Semaphore.acquire()用来申请配额,在调用成功后会使Semaphore内部的计数器减1,当配额不足时,会使当前线程睡眠。Semaphore。release会使计数器加1,并唤醒等待队列中的一个线程。
不像锁那样,acquire/release不必配合使用。
管道
双缓冲
用两个缓冲区来实现数据的并发交换。生产者将一个缓冲区填满后,可以被消费者消费。而另一个空的、使用过的缓冲区用力啊填充新数据。即消费者消费一个填满的缓冲区,生产者填充一个缓冲区。
Java的Exchanger类用来实现消息的交换,它相当于只有两个参与者的CyclicBarrier。只有生产者和消费者都调用Exchaner.exchange(V)后,该方法调用才能返回。
public V exchange(V x) throws InterruptedException
参数x相当于缓冲区,生产者的返回值为消费者的参数(即已消费的缓冲区),消费者的返回值为生产者的参数(即已填满的缓冲区)。非常像一手交钱,一手交货。
线程中断
一个线程请(发起线程)求另一个线程(目标线程)停止其正在执行的操作。中断是发起线程发送给目标线程的一种指示,指示目标线程停止其正在执行的操作。Java为每个线程维护一个中断标记,用于表示响应的线程是否收到中断。目标线程可以通过Thread.currentThread.isInterrupted()来获取该线程的中断标记,也可以通过Thread.interrupted()来获取并重置标记。
目标现场检查中断标记后所执行的操作,被称为中断响应。目标线程对中断的响应包括:
- 无影响。发起线程调用目标线程的interru()方法不会对目标线程产生任何影响。这种操作包括target正在调用InputStream.read()、ReentrantLock.lock()、申请内部锁等阻塞操作
- 取消任务运行,中断发生时的任务被取消,但是不会影响继续运行其它任务
- 工作者线程停止,生命周期变更为TERMINATED
InterruptedException
Java标准库对许多中断的处理方法都是抛出InterruptedException等异常。Java标准库中有些阻塞方法也无法响应中断,如InputStream.read()、Lock.lock()以及内部锁申请。
能够对中断做出相应的标准库:
- Object.wait()/Object.wait(long)/wait(long, int)
- Thread.sleep(long)/sleep(long, int)
- Thread.join()/join(long)/join(long, int)
- BlockingQueue.take()/put(E)
- Lock.lockInterruptibly()
- CountDownLatch.await()、CyclicBarrier.await()、Exchanger.exchang(V)
- channels.InterruptibleChannel。抛出ClosedByInterruptException
能够响应中断的方法通常在调用阻塞操作前判断中断标志位,若中断标志位为true,则抛出InterruptedException。如ReentrantLock.lockInterruptibly与ReentrantLock.lock()效果相似,但是前者能够对中断进行相应。凡是抛出InterruptedException的方法,通常会在其抛出异常前,会将当前标志位重置为false。
如果中断发生的那一刻,目标线程以及调用了阻塞方法而被暂停,那么JVM会设置目标线程会将相应的线程唤醒,并使其抛出InterruptedException。所以Jaba应用层代码可以通过对InterruptedException等异常的处理来响应中断。对InterruptedException的处理包括以下方式:
- 不捕获,可以在方法声明处加throw,抛给上层处理
- 捕获后重新抛出
- 捕获后中断当前线程。这种策略实际上在捕获异常后又恢复中断标志位。相当于告诉其它代码,当前保留了中断标志位,但不知如何处理
捕获异常又恢复中断标志位:
public class Foo() {
public void bar() {
try {
// ...
} catch(InterruptedException e) {
Thread.currentThread().interrupt(); // 保留中断标记
}
}
}