Java 线程间的通信机制详解
目录
1、volatile 关键字
2、等待唤醒(等待通知)机制
(1)wait() 和 notify() 等待唤醒
(2)park() 和 unpark() 等待唤醒
3、join() 方法——线程排队
4、管道输入输出流
5、ThreadLocal类和InheritableThreadLocal类
线程间通信使线程成为一个整体,提高系统之间的交互性,在提高CPU利用率的同时可以对线程任务进行有效的把控与监督。
1、volatile 关键字
volatile有两大特性,一是可见性,二是有序性,禁止指令重排序。其中可见性就是可以让线程之间进行通信。
以下两个线程会轮流执行
public class VolatileTest {
// 保证可见性和有序性
private static volatile boolean flag = true;
public static void main(String[] args) {
new Thread(new Runnable() { // 线程一
@Override
public void run() {
while (true) {
if (flag) {
System.out.println(Thread.currentThread().getName()+": turn on");
flag = false;
}
}
}
}, "thread-1").start();
new Thread(new Runnable() { // 线程二
@Override
public void run() {
while (true) {
if (!flag) {
System.out.println(Thread.currentThread().getName()+": turn off");
flag = true;
}
}
}
}, "thread-2").start();
}
}
2、等待唤醒(等待通知)机制
(1)wait() 和 notify() 等待唤醒
等待唤醒机制可以基于 wait() 和 notify() 方法来实现,在一个线程内调用该线程锁对象的 wait() 方法, 线程将进入等待队列等待直到被唤醒。
wait()/notify() 方法只能在同步方法或同部块中调用
调用 wait() 方法,线程需要获取该对象的对象级别锁,如果没有持锁,将会抛出IllegalMonitorStateException,执行 wait() 方法后,当前线程立即释放锁
执行 notify() 方法后,当前线程不会立即释放锁,需要等到执行 notify() 方法的线程将程序执行完,也就是退出synchronized代码块后,当前线程才会释放锁,此时 wait() 状态所在的线程才可以获取该对象的锁。
public class ThreadTest {
// 对象锁
private static final Object lock = new Object();
public static void main(String[] args) {
new Thread(new Runnable() { // 线程一
@Override
public void run() {
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + ":获取锁");
try {
lock.wait(); // 线程进入等待
System.out.println(Thread.currentThread().getName() + ":执行wait()方法结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":执行结束");
}
}
}, "thread-1").start();
new Thread(new Runnable() { // 线程二
@Override
public void run() {
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + "获取锁");
lock.notify(); // 唤醒一个线程
System.out.println(Thread.currentThread().getName() + ":执行notify()方法结束");
try {
Thread.sleep(1000); // 模拟业务逻辑执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":执行结束");
}
}
}, "thread-2").start();
}
}
notify() 方法可以随机唤醒等待队列中等待同一共享资源的一个(随机的,仅仅一个)线程,并使该线程退出等待队列,进入可运行状态。
notifyAll() 方法可以使所有正在等待队列中等待同一资源的全部线程从等待状态进入可运行状态。
wait() 方法和 sleep() 方法都会响应 interrupt() 方法,抛出中断异常
需要注意的是 sleep() 方法即使抛出中断异常后也不会释放锁
public class ThreadTest {
// 对象锁
private static final Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Runnable() { // 线程一
@Override
public void run() {
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + ":获取锁");
try {
// lock.wait(); // 线程进入等待->释放锁
Thread.sleep(5000); // 即使中断也不会释放锁
System.out.println(Thread.currentThread().getName() + ":执行wait()方法结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(3000);// 模拟业务逻辑时间
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":执行结束");
}
}
}, "thread-1");
Thread t2 = new Thread(() -> { // 线程二
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + ":获取锁");
try {
Thread.sleep(3000);// 模拟业务逻辑时间
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":执行结束");
}
}, "thread-2");
t1.start();
t2.start();
Thread.sleep(1000);
t1.interrupt(); // 给线程1打上中断标记
}
}
notify() 通知过早问题
如果通知 notify() 方法先执行,等待 wait() 方法后执行,那么,执行 wait() 的线程将永远的等待下去,出现类似死锁的情况。
这个问题出现的原因,是等待和通知是分别由两个不同线程去实现的,在并发量大的情况下,两者的执行顺序是得不到保证的,也就是说,等待并不总是出现在通知前。
怎么去解决过早通知问题呢?
解决方案:在等待和通知方法中添加条件!
比如:等待线程A需要在条件true下才会执行等待,通知线程B在执行通知后会把条件更改为false。这样做的好处是,如果通知线程B先执行,那么等待线程A拿不到等待的条件,就不会进入等待状态,从而避免出现死锁。
就好比你去火车站乘火车,如果火车已经开走了(通知已经执行),那么车站会便提示你本次列车已经开出(修改条件),等你到车站一看,火车已经开走了(条件发生变化),那你就无需再去等此次列车了(不执行等待)。
如下情况,不会让等待线程一直等待下去
public class ThreadTest {
// 对象锁
private static final Object lock = new Object();
// 等待条件
private static volatile boolean condition = true;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
synchronized (lock) {
while (condition) { // 等待是需要条件的
try {
lock.wait(); // 执行等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "执行结束");
}
}, "thread-1");
Thread t2 = new Thread(() -> {
synchronized (lock) {
lock.notify();
// 已经通知过了,为避免通知先执行,等待线程等不到通知,所以修改条件
condition = false;
System.out.println(Thread.currentThread().getName() + "执行结束");
}
}, "thread-2");
t2.start(); // 通知先执行
Thread.sleep(1000);
t1.start();
}
}
因为等待线程执行等待是有条件的,通知线程执行完通知都会去修改等待的条件,所以,不管是哪个线程先执行,都不会出现类似死锁的情况。
(2)park() 和 unpark() 等待唤醒
LockSupport 是JDK中用来实现线程阻塞和唤醒的工具,线程调用 park() 则等待“许可”,调用 unpark() 则为指定线程提供“许可”。使用它可以在任何场合使线程阻塞,可以指定任何线程进行唤醒,并且不用担心阻塞和唤醒操作的顺序,但要注意连续多次唤醒的效果和一次唤醒是一样的。
public class LockSupportTest {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "开始执行");
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "执行完成");
}, "thread-1");
t1.start();
Thread.sleep(1000);
System.out.println("唤醒thread-1线程");
LockSupport.unpark(t1);
}
}
3、join() 方法——线程排队
join() 可以理解成线程合并,当在一个线程调用另一个线程的 join() 方法时,当前线程阻塞等待被调用 join() 方法的线程执行完毕才能继续执行,所以 join() 的好处能够保证线程的执行顺序,具有使线程排队的作用,但是如果调用线程的 join() 方法其实已经失去了并行的意义,虽然存在多个线程,但是本质上还是串行的,另外,join() 的实现其实也是基于等待通知机制。
public class ThreadTest {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "开始执行");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行结束");
}, "thread-1");
t1.start();
t1.join(); // 需要等待线程1执行完
System.out.println(Thread.currentThread().getName() + "执行结束");
}
}
join(long) 和 sleep(long) 的区别
join(long) 方法释放锁,sleep(long) 方法不释放锁。这是因为 join(long) 方法内部是使用 wait(long) 方法来实现的,所以该方法具有释放锁的特点。同样的原因,当 join(long) 在执行过程中遇到 interrupt() 方法时,也会抛出中断异常。// join(long) 基于 wait() 实现
4、管道输入输出流
管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。
管道输入/输出流主要包括了如下4种具体实现:PipedOutputStream、PipedInputStream、PipedReader和PipedWriter,前两种面向字节, 而后两种面向字符。
public class Piped {
public static void main(String[] args) throws Exception {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 将输出流和输入流进行连接,否则在使用时会抛出IOException
out.connect(in);
Thread printThread = new Thread(new Print(in), "PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
} finally {
out.close();
}
}
static class Print implements Runnable {
private PipedReader in;
public Print(PipedReader in) {
this.in = in;
}
@Override
public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException ex) {
}
}
}
}
5、ThreadLocal类和InheritableThreadLocal类
(1)ThreadLocal类
ThreadLocal类用来存放线程的数据,每个线程都可以通过ThreadLocal来绑定自己的值,ThreadLocal使变量在线程之间具有隔离性,也就是说ThreadLocal存的变量值是私有的。
public class ThreadLocalTest {
public final static ThreadLocal threadLocal = new ThreadLocal();
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(() -> { // 线程2存3次
for (int i = 0; i < 3; i++) {
threadLocal.set(Thread.currentThread().getName() + "存入" + i);
}
System.out.println(Thread.currentThread().getName() + "取出:" + threadLocal.get());
}, "t1");
Thread t2 = new Thread(() -> { // 线程2存5次
for (int i = 0; i < 5; i++) {
threadLocal.set(Thread.currentThread().getName() + "存入" + i);
}
System.out.println(Thread.currentThread().getName() + "取出:" + threadLocal.get());
}, "t2");
t1.start();
t2.start();
}
}
从上边的结果看到,虽然两个线程同时向 threadLocal (threadLocal 是唯一的)存储值,但是最后取出来的都是各自线程存放的值,其他线程并不会取到别的线程存放的数据,验证了ThreadLocal类的隔离性。
同时,当同一个线程往ThreadLocal里边多次设置值时,只会保留最后设置的值。这说明,ThreadLocal只会存储唯一的值,存多个值时,前边的值都会被替换掉。
这是因为ThreadLocal的底层存储使用的是Map(键值对),通过源码,可以看到这个map存值的键是this(当前线程)。这也是为什么不同线程具有不同值,相同线程只有一个值的原因,因为不同线程this对象的值是不一样的,而相同线程this值都是相同的。
(2)InheritableThreadLocal类
ThreadLocal 不能实现父子线程间数据的传递(局限),但是,InheritableThreadLocal类可以使子线程能够获取到父线程设置的值。
在InheritableThreadLocal中,如果父线程修改了值,那么子线程将获取到父线程修改后的值。但是子线程修改了值,对父线程的值并无影响。
public class ThreadLocalTest {
public static InheritableThreadLocal inheritableThreadLocal = new InheritableThreadLocal();
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(() -> { // 父线程
inheritableThreadLocal.set("Thread-T1");
System.out.println("父线程取出的值:" + inheritableThreadLocal.get());
Thread t2 = new Thread(()->{ // 子线程
System.out.println("子线程取出的值:" + inheritableThreadLocal.get());
inheritableThreadLocal.set("Thread-T2");
System.out.println("子线修改后的值:" + inheritableThreadLocal.get());
});
t2.start();
try {
Thread.sleep(1000); // 等待子线程修改数据
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("父线程取出的值:" + inheritableThreadLocal.get());
}, "t1");
t1.start();
}
}
注意:如果子线程在取值的同时,父线程修改了InheritableThreadLocal里的值,子线程可能取到修改之前的旧值。