# concurrent **Repository Path**: wh543/concurrent ## Basic Information - **Project Name**: concurrent - **Description**: 并发编程学习 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-01-18 - **Last Updated**: 2021-04-14 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 多线程核心知识 ## 1. 实现多线程的方法 ​ 官网api中的描述:There are two ways to create a new thread of execution. One is to declare a class to be a subclass of Thread. The other way to create a thread is to declare a class that implements the Runnable interface. (https://docs.oracle.com/javase/8/docs/api/ 找到java.lang.Thread) ### 1.1 继承Thread的方式 ```java package threadcorekownledage.createtheads; public class ThreadStyle extends Thread{ public static void main(String[] args) { ThreadStyle threadStyle = new ThreadStyle(); threadStyle.start(); } @Override public void run() { System.out.println(Thread.currentThread().getName() + " do something"); } } ``` ### 1.2 实现Runnable的方式 ```java package threadcorekownledage.createtheads; public class RunnableStyle implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName() + " do something"); } public static void main(String[] args) { new Thread(new RunnableStyle()).start(); } } ``` ### 1.3 两种方式的对比 结论:优先选择Runnable的方式创建线程。 - 架构角度:通过Runnable方式实现,可以将业务代码和Thread类解耦; - 性能损耗:使用Runnable可以通过线程池的方式减少线程频繁的创建和销毁带来的性能损耗; - 单继承机制:通过Runnable创建线程可以避免java的单继承机制带来的限制。 两种方法的**本质**对比 - Thread方式:调用的是被Thread子类重写的run方法 - Runnable方式:调用的是target.run() ```java /** * If this thread was constructed using a separate * Runnable run object, then that * Runnable object's run method is called; * otherwise, this method does nothing and returns. *

* Subclasses of Thread should override this method. * * @see #start() * @see #stop() * @see #Thread(ThreadGroup, Runnable, String) */ @Override public void run() { if (target != null) { target.run(); } } ``` 同时使用两种方式会怎样? ```java package threadcorekownledage.createtheads; public class BothRunnableThread { public static void main(String[] args) { new Thread(() -> System.out.println("runnable run")){ @Override public void run() { System.out.println("thread run"); } }.start(); } } ``` ​ 最终执行打印的结果是`thread run`。 ​ 由于Thread类的run方法被重写了,所以不存在`if(target != null){target.run();}`这段代码。 ​ 总结:准确来说,创建线程只有一种方式那就是构造Thread类,而实现线程的执行单元有两种方式。 1. 实现Runnable接口的run方法,并把Runnable实例传给Thread类。 2. 继承Thread类并重写Thread的run方法 ## 2. 启动线程的方法 ### 2.1 start()和run()方法 ```java package threadcorekownledage.starttheads; /** * 对比start和run两种启动线程的方式 */ public class StartAndRunMethod { public static void main(String[] args) { Runnable runnable = () -> { System.out.println(Thread.currentThread().getName()); }; runnable.run(); new Thread(runnable).run(); new Thread(runnable).start(); } } ``` ​ 打印的结果为: ``` main main Thread-0 ``` start()方法含义 - 主线程启动新线程 ```java /** * Causes this thread to begin execution; the Java Virtual Machine * calls the run method of this thread. *

* The result is that two threads are running concurrently: the * current thread (which returns from the call to the * start method) and the other thread (which executes its * run method). *

* It is never legal to start a thread more than once. * In particular, a thread may not be restarted once it has completed * execution. * * @exception IllegalThreadStateException if the thread was already * started. * @see #run() * @see #stop() */ public synchronized void start() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this); boolean started = false; try { // 实际开启线程的方法,调用的是native方法,底层是C++写的代码。 start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } } private native void start0(); ``` ### 2.2 常见面试题 - 一个线程**两次**调用**start()**方法会出现什么情况?为什么? - 会抛出IllegalThreadStateException,因为start方法执行的时候会判断当前线程状态是否为0(NEW),如果不是则抛出异常。 - 既然start()方法会调用run()方法,为什么我们选择调用start()方法,而不是直接调用run()方法? - start()方法会调用native方法开启子线程,而直接调用run()方法实际上只是主线程进行的普通方法的调用。 ## 3. 停止线程的方法 ### 3.1 如何正确停止线程 ​ 使用**interrupt**来通知,而不是强制停止。 ​ 如果强制停止线程,则线程中所使用的资源,例如文件描述符、网络连接等无法正常关闭。 #### 3.1.1 没有阻塞的情况 ```java package threadcorekownledage.stopthread; /** * run方法内没有sleep和wait方法时,停止线程 */ public class RightwayStopThreadWithoutsleep implements Runnable{ @Override public void run() { int i = 0; // 接收interrupt信号 while (!Thread.currentThread().isInterrupted()) { System.out.println(i++); } System.out.println("------线程停止-------"); } public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(new RightwayStopThreadWithoutsleep()); // 启动线程 thread.start(); Thread.sleep(1000); // 1s钟后,停止线程 thread.interrupt(); } } ``` #### 3.1.2 线程阻塞的情况 ```java package threadcorekownledage.stopthread; /** * run方法内有sleep或wait方法时,停止线程 */ package threadcorekownledage.stopthread; /** * run方法内有sleep或wait方法时,停止线程 */ public class RightwayStopThreadWithsleep{ public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(()->{ int i = 0; try { // 接收interrupt信号 while (i < 1000000 && !Thread.currentThread().isInterrupted()) { System.out.println(i++); } System.out.println("开始休眠"); Thread.sleep(1000); System.out.println("这句讲道理不会被打出来"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------线程停止-------"); }); // 启动线程 thread.start(); Thread.sleep(500); // 1s钟后,停止线程 thread.interrupt(); } } ``` #### 3.1.3 线程在每次迭代后都会阻塞的情况 ```java package threadcorekownledage.stopthread; /** * run方法内每次循环都有sleep或wait方法时,停止线程 */ public class RightwayStopThreadWithsleepEveryLoop { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(()->{ int i = 0; try { // 接收interrupt信号 while (i < 1000) {// 不需要 && !Thread.currentThread().isInterrupted() System.out.println(i++); Thread.sleep(10); } } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------线程停止-------"); }); // 启动线程 thread.start(); Thread.sleep(1000); // 1s钟后,停止线程 thread.interrupt(); } } ``` ​ 这里不需要通过Thread.currentThread().isInterrupted()判断线程中断标志,因为sleep被唤醒直接走的catch代码块。 - while内try catch的问题 ```java package threadcorekownledage.stopthread; /** * 注意try catch的位置 */ public class CantInterrupt { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(()->{ int i = 0; while (i < 100000 && !Thread.currentThread().isInterrupted()) {// 程序抛出InterruptedException后中断标志位将被清除 System.out.println(i++); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("------线程停止-------"); }); // 启动线程 thread.start(); Thread.sleep(1000); // 1s钟后,停止线程 thread.interrupt(); } } ``` ​ 上面的代码会出现中断失效。try catch之后重新会进入while循环,这个时候获取到的中断标志拿到的却是false。这是因为程序抛出InterruptedException后中断标志位将被清除。 #### 3.1.4 最佳实践 - 优先选择:传递中断 - 不想或无法传递:恢复中断 - 不应屏蔽中断 ```java package threadcorekownledage.stopthread; /** * catch了InterruptedException之后优先选择:在方法签名中抛出异常 */ public class RightwayStopThreadInProd implements Runnable { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(new RightwayStopThreadInProd()); thread.start(); Thread.sleep(10); thread.interrupt(); } @Override public void run() { while (true && !Thread.currentThread().isInterrupted()) { System.out.println("--------"); // 处理传递中断 // try { // throwInMethodRightway(); // } catch (InterruptedException e) { // // 代码块抛出异常,让上层代码决定需要记录信息或者中断线程 // System.out.println("--记录错误信息--"); // e.printStackTrace(); // } // 处理恢复中断 throwInMethodRightway2(); } } /** * 把异常吞了,只打日志很难排查问题(屏蔽中断) */ private void throwInMethodWrongway() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 传递中断,让顶层代码捕获 * @throws InterruptedException */ public void throwInMethodRightway() throws InterruptedException { Thread.sleep(1000); } /** * 捕获异常,并恢复当前线程的中断状态 */ public void throwInMethodRightway2() { try { Thread.sleep(1000); } catch (InterruptedException e) { // 恢复中断 Thread.currentThread().interrupt(); e.printStackTrace(); } } } ``` #### 3.1.5 响应中断的方法总结 - Object.wait()/wait(long)/wait(long,int) - Thread.sleep(long)/sleep(long,int) - Thread.join()/join(long)/join(long,int) - java.util.concurrent.BlockingQueue.take()/put(E) - java.util.concurrent.locks.Lock.lockInterruptibly() - java.util.concurrent.CountDownLatch.await() - java.util.concurrent.CyclicBarrier.await() - java.util.concurrent.Exchanger.exchange(V) - java.nio.channels.InterruptibleChannel相关方法 - java.nio.channels.Seletor的相关方法 ### 3.2 错误的停止方法 - 被弃用的stop、suspend和resume方法 - 用volatile设置boolean标记位 volatile设置boolean貌似可行的情况 ```java package threadcorekownledage.stopthread.volatiledemo; /** * 演示volatile的局限性 */ public class WrongwayVolatile implements Runnable { // 该变量线程可见 private volatile boolean canceled = false; public static void main(String[] args) throws InterruptedException { WrongwayVolatile wrongwayVolatile = new WrongwayVolatile(); Thread thread = new Thread(wrongwayVolatile); thread.start(); Thread.sleep(5000); wrongwayVolatile.canceled = true; } @Override public void run() { while (!canceled) { System.out.println("-----"); } } } ``` volatile设置boolean却停不了线程的情况 ```java package threadcorekownledage.stopthread.volatiledemo; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; /** * 演示volatile的局限性 */ public class WrongwayVolatileCantstop { public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue integers = new ArrayBlockingQueue(10); Producer producer = new Producer(integers); new Thread(new Producer(integers)).start(); Thread.sleep(1000); // 假设消费者想通知生产者停止向队列添加元素 producer.canceled = true; } } class Producer implements Runnable { private ArrayBlockingQueue arrayBlockingQueue; public volatile boolean canceled; public Producer(ArrayBlockingQueue arrayBlockingQueue) { this.arrayBlockingQueue = arrayBlockingQueue; } @Override public void run() { try { // 由于阻塞发生在while循环内,所以canceled的值无法起到任何作用 while (true && !canceled) { arrayBlockingQueue.put(new Random().nextInt(10)); } } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("producer quit"); } } } ``` 修复后的方案 ```java package threadcorekownledage.stopthread.volatiledemo; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; /** * 演示volatile的局限性 */ public class WrongwayVolatileCantstopFix { public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue integers = new ArrayBlockingQueue(10); Producer2 producer = new Producer2(integers); Thread thread = new Thread(producer); thread.start(); Thread.sleep(1000); // 假设消费者想通知生产者停止向队列添加元素 // 通过interrupt的方式终止线程 thread.interrupt(); } } class Producer2 implements Runnable { private ArrayBlockingQueue arrayBlockingQueue; public Producer2(ArrayBlockingQueue arrayBlockingQueue) { this.arrayBlockingQueue = arrayBlockingQueue; } @Override public void run() { try { while (true) { arrayBlockingQueue.put(new Random().nextInt(10)); } } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("producer quit"); } } } ``` ### 3.3 重要函数源码解析 - interrupt方法 - 判断是否已被中断相关的方法 - static boolean interrupted() - boolean isInterrupted() - Thread.interrupted()的目标对象 ```java /** * Interrupts this thread. * *

Unless the current thread is interrupting itself, which is * always permitted, the {@link #checkAccess() checkAccess} method * of this thread is invoked, which may cause a {@link * SecurityException} to be thrown. * *

If this thread is blocked in an invocation of the {@link * Object#wait() wait()}, {@link Object#wait(long) wait(long)}, or {@link * Object#wait(long, int) wait(long, int)} methods of the {@link Object} * class, or of the {@link #join()}, {@link #join(long)}, {@link * #join(long, int)}, {@link #sleep(long)}, or {@link #sleep(long, int)}, * methods of this class, then its interrupt status will be cleared and it * will receive an {@link InterruptedException}. * *

If this thread is blocked in an I/O operation upon an {@link * java.nio.channels.InterruptibleChannel InterruptibleChannel} * then the channel will be closed, the thread's interrupt * status will be set, and the thread will receive a {@link * java.nio.channels.ClosedByInterruptException}. * *

If this thread is blocked in a {@link java.nio.channels.Selector} * then the thread's interrupt status will be set and it will return * immediately from the selection operation, possibly with a non-zero * value, just as if the selector's {@link * java.nio.channels.Selector#wakeup wakeup} method were invoked. * *

If none of the previous conditions hold then this thread's interrupt * status will be set.

* *

Interrupting a thread that is not alive need not have any effect. * * @throws SecurityException * if the current thread cannot modify this thread * * @revised 6.0 * @spec JSR-51 */ public void interrupt() { if (this != Thread.currentThread()) checkAccess(); synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); return; } } interrupt0(); } ``` #### 3.3.1 如何分析native方法 - 进github/gitee 查找openjdk(https://gitee.com/ckl111/openjdk-jdk8u) - 进行文件搜索,Thread.c ![](.muke-learn_images/b4c244f5.png) https://gitee.com/ckl111/openjdk-jdk8u/blob/master/jdk/src/share/native/java/lang/Thread.c ``` static JNINativeMethod methods[] = { {"start0", "()V", (void *)&JVM_StartThread}, {"stop0", "(" OBJ ")V", (void *)&JVM_StopThread}, {"isAlive", "()Z", (void *)&JVM_IsThreadAlive}, {"suspend0", "()V", (void *)&JVM_SuspendThread}, {"resume0", "()V", (void *)&JVM_ResumeThread}, {"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority}, {"yield", "()V", (void *)&JVM_Yield}, {"sleep", "(J)V", (void *)&JVM_Sleep}, {"currentThread", "()" THD, (void *)&JVM_CurrentThread}, {"countStackFrames", "()I", (void *)&JVM_CountStackFrames}, {"interrupt0", "()V", (void *)&JVM_Interrupt}, {"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted}, {"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock}, {"getThreads", "()[" THD, (void *)&JVM_GetAllThreads}, {"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads}, {"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName}, }; ``` 上述代码能看到native代码中方法对应c++代码的方法。 https://github.com/openjdk-mirror/jdk7u-hotspot 搜索JVM_Interrupt ![](.muke-learn_images/f780d979.png) ![](.muke-learn_images/ac8f606a.png) 定位到 https://github.com/openjdk-mirror/jdk7u-hotspot/blob/50bdefc3afe944ca74c3093e7448d6b889cd20d1/src/share/vm/prims/jvm.cpp ```c++ JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread)) JVMWrapper("JVM_Interrupt"); // Ensure that the C++ Thread and OSThread structures aren't freed before we operate oop java_thread = JNIHandles::resolve_non_null(jthread); MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock); // We need to re-resolve the java_thread, since a GC might have happened during the // acquire of the lock JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)); if (thr != NULL) { Thread::interrupt(thr); } JVM_END ``` 定位到Thread.cpp(https://gitee.com/czr27/openjdk8-hotspot/blob/master/runtime/thread.cpp) ```c++ void Thread::interrupt(Thread* thread) { trace("interrupt", thread); debug_only(check_for_dangling_thread_pointer(thread);) os::interrupt(thread); } ``` 定位到os_linux.cpp (https://github.com/openjdk-mirror/jdk7u-hotspot/blob/50bdefc3afe944ca74c3093e7448d6b889cd20d1/src/os/linux/vm/os_linux.cpp) ```c++ void os::interrupt(Thread* thread) { assert(Thread::current() == thread || Threads_lock->owned_by_self(), "possibility of dangling Thread pointer"); OSThread* osthread = thread->osthread(); if (!osthread->interrupted()) { // 设置interrupted状态位true osthread->set_interrupted(true); // More than one thread can get here with the same value of osthread, // resulting in multiple notifications. We do, however, want the store // to interrupted() to be visible to other threads before we execute unpark(). OrderAccess::fence(); // _SleepEvent就是对应Thread.sleep方法 ParkEvent * const slp = thread->_SleepEvent ; if (slp != NULL) slp->unpark() ; } // For JSR166. Unpark even if interrupt status already was set if (thread->is_Java_thread()) // 对应LockSupport.park ((JavaThread*)thread)->parker()->unpark(); // _ParkEvent对应synchronized同步块以及Object.wait方法 ParkEvent * ev = thread->_ParkEvent ; if (ev != NULL) ev->unpark() ; } ``` #### 3.3.2 interrupted方法 ```java /** * Tests whether the current thread has been interrupted. The * interrupted status of the thread is cleared by this method. In * other words, if this method were to be called twice in succession, the * second call would return false (unless the current thread were * interrupted again, after the first call had cleared its interrupted * status and before the second call had examined it). * *

A thread interruption ignored because a thread was not alive * at the time of the interrupt will be reflected by this method * returning false. * * @return true if the current thread has been interrupted; * false otherwise. * @see #isInterrupted() * @revised 6.0 */ public static boolean interrupted() { // 这里拿到的是当前执行的线程的中断标志位,并且会把标志位重置 return currentThread().isInterrupted(true); } ``` #### 3.3.3 isInterrupted方法 ```java /** * Tests whether this thread has been interrupted. The interrupted * status of the thread is unaffected by this method. * *

A thread interruption ignored because a thread was not alive * at the time of the interrupt will be reflected by this method * returning false. * * @return true if this thread has been interrupted; * false otherwise. * @see #interrupted() * @revised 6.0 */ public boolean isInterrupted() { // 不清除中断标志位 return isInterrupted(false); } /** * Tests if some Thread has been interrupted. The interrupted state * is reset or not based on the value of ClearInterrupted that is * passed. */ private native boolean isInterrupted(boolean ClearInterrupted); ``` #### 3.3.4 interrupted方法小测试 ```java package threadcorekownledage.stopthread; /** * 注意Thread.interrupted()方法的目标对象是"当前线程",而不管本方法来自哪个对象 */ public class RightWayInterrupted { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(() -> { for (; ; ) { } }); // 启动线程 thread.start(); // 设置中断标志 thread.interrupt(); // 获取中断标志 System.out.println("isInterrupted:" + thread.isInterrupted()); // true // 获取中断标志并重置 System.out.println("isInterrupted:" + thread.interrupted()); // false 实际执行的是 currentThread().isInterrupted(true) 这里也就是获取main线程的中断标志位 // 获取中断标志并重置 System.out.println("isInterrupted:" + Thread.interrupted()); // false 同上也是获取main线程的中断标志位 // 获取中断标志 System.out.println("isInterrupted:" + thread.isInterrupted()); // true thread.join(); System.out.println("main thread is over"); } } ``` ## 4. 线程的声明周期 ### 4.1 线程的6个状态 - New - Runnable - Blocked - Waiting - Timed Waiting - Terminated ### 4.2 线程状态转换 ![](.README_images/2694c6e2.png) ​ 其中New->Runnable->Terminated之间是不可逆的。Block、Waiting、Timed_Waiting之间不能直接转换,必须同Runnable这个中间状态进行转换。 #### 4.2.1 NEW RUNNABLE TERMINATED 状态演示 ```java package threadcorekownledage.threadstatus; /** * 演示NEW RUNNABLE TERMINATED这三个线程状态 */ public class NewRunnableTerminated { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(() -> { System.out.println(Thread.currentThread().getState()); // RUNNABLE }); System.out.println(thread.getState()); // NEW thread.start(); thread.join(); System.out.println(thread.getState()); // TERMINATED } } ``` #### 4.2.2 BLOCKED WAITING TIMED_WAITING 状态演示 ```java package threadcorekownledage.threadstatus; /** * 演示BLOCKED WAITING TIMED_WAITING这三个线程状态 */ public class BlockedWaitingTimedwaiting implements Runnable{ public static void main(String[] args) throws InterruptedException { BlockedWaitingTimedwaiting runnable = new BlockedWaitingTimedwaiting(); Thread thread1 = new Thread(runnable); thread1.start(); Thread thread2 = new Thread(runnable); thread2.start(); System.out.println("thread1 state: " + thread1.getState()); // TIMED_WAITING System.out.println("thread2 state: " + thread2.getState()); // BLOCKED Thread.sleep(1300); System.out.println("thread1 state: " + thread1.getState()); // WAITING } public synchronized void blockedMethod() { try { Thread.sleep(1000); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void run() { blockedMethod(); } } ``` ## 5. Thread和Object类中线程相关的方法 方法概览 | 类 | 方法名 | 简介 | | ------ | --------------------------- | ------------------------------------------------------------ | | Thread | sleep相关: | 本表的“相关”,指的是重载方法,也就是方法名相同,但是参数不同,例如sleep有多个方法,只是参数不同,实际作用大同小异。 | | | join | 等待其他线程执行完毕 | | | yield相关 | 放弃已经获取到的CPU资源 | | | currentThread | 获取当前执行线程的引用 | | | start,run相关 | 启动线程相关 | | | interrupt相关 | 中断线程 | | | stop(),suspend,resume()相关 | 已废弃 | | Object | wait/notify/notifyAll相关 | 让线程暂时休眠和唤醒 | ### 5.1 wait、notify、notifyAll方法详解 #### 5.1.1 wait的作用和用法 作用:阻塞当前线程并释放monitor锁 简单用法: ```java package threadcorekownledage.blockcoremethod; /** * wait和notify基本用法 */ public class Wait { public static void main(String[] args) throws InterruptedException { new Thread1().start(); Thread.sleep(100); new Thread2().start(); } public static Object object = new Object(); static class Thread1 extends Thread { @Override public void run() { synchronized (object) { System.out.println(Thread.currentThread().getName() + " start"); try { object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 获取到了锁"); } } } static class Thread2 extends Thread { @Override public void run() { System.out.println(Thread.currentThread().getName() + "准备获取锁进入同步代码块"); synchronized (object) { System.out.println(Thread.currentThread().getName() + "获取到了锁,准备执行notify。这里能获取到锁也说明了wait方法调用后会释放锁"); object.notify(); System.out.println(Thread.currentThread().getName() + "执行了notify。注意:synchronized代码块执行完毕线程才会被唤醒"); } } } } ``` 线程被唤醒的四种情况: - 另一个线程调用了这个对象的notify方法且刚好唤醒的是本线程 - 另一个线程调用了这个对象的notifyAll方法 - 过了wait(long timeout)设置的超时时间,如果传入0就是永久等待 - 线程自身调用了interrupt方法 #### 5.1.2 notify和notifyAll的区别 notify只会唤醒一个等待该monitor锁线程,notifyAll会唤醒所有等待该monitor锁的所有线程。 ```java package threadcorekownledage.blockcoremethod; /** * 3个线程,线程1和线程2首先被阻塞,线程3唤醒它们。notify notifyAll */ public class WaitNotifyAll implements Runnable{ private static final Object resouceA = new Object(); public static void main(String[] args) throws InterruptedException { WaitNotifyAll r = new WaitNotifyAll(); new Thread(r).start(); new Thread(r).start(); Thread.sleep(100); new Thread(()->{ synchronized (resouceA) { System.out.println(Thread.currentThread().getName() + " 获取到resouceA,并唤醒其他线程"); // resouceA.notify(); resouceA.notifyAll(); } }).start(); } @Override public void run() { synchronized (resouceA) { try { System.out.println(Thread.currentThread().getName() + " 获取到resouceA,阻塞当前线程并释放resouceA"); resouceA.wait(); System.out.println(Thread.currentThread().getName() + " 被唤醒"); } catch (InterruptedException e) { e.printStackTrace(); } } } } ``` ​ notifyAll把线程1和线程2都唤醒了,但是notify只唤醒了一个线程,另一个线程仍然处于等待 ``` "Thread-1" #14 prio=5 os_prio=31 tid=0x00007ff9f4865000 nid=0xa603 in Object.wait() [0x00007000054c1000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x000000076adf2dc0> (a java.lang.Object) at java.lang.Object.wait(Object.java:502) at threadcorekownledage.blockcoremethod.WaitNotifyAll.run(WaitNotifyAll.java:31) - locked <0x000000076adf2dc0> (a java.lang.Object) at java.lang.Thread.run(Thread.java:748) ``` wait/notify的原理、特点 用wait/notify实现生产者消费者模式 ​ 为什么要使用生产者和消费者模式? ​ 消息(数据)的生产和消费的速率的会有所不同,会出现消息的消费方不知道何时有数据可以进行消费,而消息的生产方不知道消费放是否能够消费掉已经生产的消息。而生产者和消费者模式将解决这种信息不对称的问题。该模式将采用一个阻塞队列用来存储消息。 ![](.muke-learn_images/d895bb1b.png) #### 5.1.3 wait高频面试题 - 用程序实现**两个线程交替**打印0-100的奇偶数 ```java package threadcorekownledage.blockcoremethod; import java.util.concurrent.atomic.AtomicInteger; /** * 两个线程交替打印奇偶 */ public class PrintOddEvenByturn { public static void main(String[] args) { // printOddEventByturnUseSynchorized(); printOddEventByturnUseWaitAndNotify(); } private static void printOddEventByturnUseSynchorized() { AtomicInteger i = new AtomicInteger(); new Thread(()->{ // 奇数 while (i.get() < 100) { synchronized (i) { if ((i.get() & 1) == 1 ) { System.out.println(Thread.currentThread().getName() + " print " + i.getAndIncrement()); } } } }).start(); new Thread(() -> { // 偶数 while (i.get() <= 100) { synchronized (i) { if ((i.get() & 1) == 0) { System.out.println(Thread.currentThread().getName() + " print " + i.getAndIncrement()); } } } }).start(); } public static void printOddEventByturnUseWaitAndNotify() { AtomicInteger i = new AtomicInteger(); Runnable runnable = () -> { while (i.get() < 101) { synchronized (i) { System.out.println(Thread.currentThread().getName() + " print " + i.getAndIncrement()); // 唤醒另一个线程进行打印 i.notify(); if (i.get() < 101) { try { // 释放锁,进入阻塞 i.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }; new Thread(runnable).start(); new Thread(runnable).start(); } } ``` - 手写**生产者消费者设计模式** - 使用wait/notify的方法实现 ```java package threadcorekownledage.producerconsumermodel; import java.util.Date; import java.util.LinkedList; public class ProducerConsumerModel { public static void main(String[] args) { MyBlockingQueue myBlockingQueue = new MyBlockingQueue(); new Thread(new Producer(myBlockingQueue)).start(); new Thread(new Consumer(myBlockingQueue)).start(); } } class Producer implements Runnable { private MyBlockingQueue storage; public Producer(MyBlockingQueue myBlockingQueue) { this.storage = myBlockingQueue; } @Override public void run() { for (int i = 0; i < 100; i++) { storage.put(new Date()); } } } class Consumer implements Runnable { private MyBlockingQueue storage; public Consumer(MyBlockingQueue myBlockingQueue) { this.storage = myBlockingQueue; } @Override public void run() { for (int i = 0; i < 100 ; i++) { storage.take(); } } } class MyBlockingQueue { private int maxSize; private LinkedList storage; public MyBlockingQueue() { this.maxSize = 10; this.storage = new LinkedList<>(); } public synchronized void put(Date integer) { // 如果队列满了,所有put线程进入阻塞 while (storage.size() == maxSize) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } storage.add(integer); System.out.println("队列中有了" + storage.size() + "个元素"); // 通知唤醒其他线程 notify(); } public synchronized void take() { // 当队列为空,则阻塞所有take线程 while (storage.size() == 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("获取到了" + storage.poll() + "元素,还剩下" + storage.size()); notify(); } } ``` - 使用阻塞队列实现 ```java package threadcorekownledage.producerconsumermodel; import java.util.concurrent.ArrayBlockingQueue; /** * 使用阻塞队列实现生产者消费者模式 */ public class ProducerConsumerModel2 { public static void main(String[] args) { ArrayBlockingQueue queue = new ArrayBlockingQueue(10); new Thread(new Producer2(queue)).start(); new Thread(new Consumer2(queue)).start(); } } class Producer2 implements Runnable { private ArrayBlockingQueue queue; public Producer2(ArrayBlockingQueue queue) { this.queue = queue; } @Override public void run() { try { for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName() + " 生产 " + i);; queue.put(i); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Consumer2 implements Runnable { private ArrayBlockingQueue queue; public Consumer2(ArrayBlockingQueue queue) { this.queue = queue; } @Override public void run() { try { for (int i = 0; i < 1000; i++) { System.out.println(Thread.currentThread().getName() + " 消费掉 " + queue.take());; } } catch (InterruptedException e) { e.printStackTrace(); } } } ``` - 为什么wait()需要在**同步代码块内**使用,而sleep()不需要 - **还没想好怎么回答** - 为什么线程通信的方法wait(),notify()和notifyAll()被定义在Object类里?而Sleep定义在Thread类里? - wait(),notify()和notifyAll()是琐级别的操作。 ### 5.2 sleep方法详解 作用:让线程进入waiting状态,释放CPU资源但不释放锁,直到规定时间后再执行,休眠期间如果被中断,会抛出异常并清除中断状态。 特点:不释放锁,包括synchronized和lock。这与wait不同。 #### 5.2.1 wait和sleep方法的异同 - 相同 - wait和sleep都能使线程阻塞,对应线程状态是waiting或者time_waiting - wait和sleep都可以响应中断Thread.interrupt() - 不同 - wait方法的执行必须在同步方法中进行,而sleep则不需要 - 在同步方法里执行sleep方法时,不会释放monitor锁,但是wait方法会释放monitor锁 - sleep方法短暂休眠后会主动退出阻塞,而没有指定时间和wait方法则需要被其他线程中断后才能退出阻塞 - wait、notify、notifyAll都是Object类的方法,sleep和yield时Thread类的方法 ### 5.3 join方法 作用:加入新的线程,也就意味着需要等待新的线程执行完,再继续main线程后续的操作。(main线程等待其他线程执行完毕) 注意:JUC中的CountDownLatch或CyclicBarrier类可以完成join。优先考虑使用JUC中现成的工具类。 #### 5.3.1 join的简单用法 ```java package threadcorekownledage.blockcoremethod; public class Join { public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "执行完毕"); }); Thread thread2 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "执行完毕"); }); thread1.start(); thread2.start(); System.out.println(Thread.currentThread().getName() + "开始等待子线程执行完毕"); thread1.join(); thread2.join(); System.out.println("所有子线程执行完毕"); } } ``` #### 5.3.2 join遇到中断的处理 ```java package threadcorekownledage.blockcoremethod; public class JoinInterrupt { public static void main(String[] args) { Thread thread = Thread.currentThread(); Thread thread1 = new Thread(() -> { try { thread.interrupt(); Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + "执行完毕"); } catch (InterruptedException e) { e.printStackTrace(); } }); thread1.start(); try { thread1.join(); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + "线程中断了"); // 将中断标志传递给子线程,否则子线程会正常执行完 thread1.interrupt(); e.printStackTrace(); } System.out.println("子线程运行完毕"); } } ``` #### 5.3.3 探索join过程中main线程的状态 ```java package threadcorekownledage.blockcoremethod; /** * 先join再 mainThread.getState */ public class JoinThreadState { public static void main(String[] args) throws InterruptedException { Thread mainThread = Thread.currentThread(); Thread thread1 = new Thread(() -> { try { Thread.sleep(1000); // 这里获取到的main线程的状态为WAIT System.out.println(mainThread.getName() + "线程状态为:" + mainThread.getState()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "执行完毕"); }); thread1.start(); System.out.println(Thread.currentThread().getName() + "开始等待子线程执行完毕"); thread1.join(); System.out.println("所有子线程执行完毕"); } } ``` #### 5.3.4 join源码分析 ​ ##### 5.3.4.1 join源码分析 ```java public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { // 实际调用的是wait方法 wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } } ``` ​ join方法中并没有notify这样的唤醒方法,那么线程是怎么被唤醒的呢?实际上每个Thread在执行run方法后会自动的调用notifyAll方法。 ```c++ void JavaThread::exit(booldestory_vm,ExitTypeexit_type); // JavaThread::exit中调用了 static void ensure_join(JavaThread*thread){ Handle threadObj(thread, thread->threadObj()); ObjeckLocker lock(threadObj,thread); thread->clear_pending_exception(); java_lang_Thread::set_thread_status(threadObj(),java_lang_Thread::TERMINATED); java_lang_Thread::set_thread(threadObj(),NULL); lock.notify_all(thread);// 这里执行了notify_all,进行了wait的唤醒 thread->clear_pending_exception(); } ``` ##### 5.3.4.2 join的等价写法 ```java package threadcorekownledage.blockcoremethod; public class JoinPrinciple { public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "执行完毕"); }); thread1.start(); System.out.println(Thread.currentThread().getName() + "开始等待子线程执行完毕"); // thread1.join(); 等价写法 // 主线程拿到锁执行wait方法。子线程执行完后调用notifyAll方法将唤醒主线程 synchronized (thread1) { thread1.wait(); } System.out.println("所有子线程执行完毕"); } } ``` ### 5.4 yield方法 ​ 作用:让线程释放所占用CPU时间片,但是JVM不保证遵循,CPU资源比较充裕的话可能会忽略yield请求。 ## 6. 线程各属性 线程各属性纵览 | 属性名称 | 用途 | 注意事项 | | -------------------------- | ------------------------------------------------------------ | --------------------------------------------------------- | | 编号(ID) | 每个线程有自己的ID,用于标识不同的线程 | 被后续创建的线程使用;唯一性;不允许被修改 | | 名称(Name) | 让用户或程序员在开发、调试和运维过程中,更容易区分每个不同的线程以便于问题定位 | 尽量取清晰有意义的名字 | | 是否是守护线程(isDaemon) | true代表线程是【守护线程】,false表示线程为非守护线程,也就是【用户线程】 | 该属性默认继承父线程;也可以通过setDaemon设置 | | 优先级(Priority) | 优先级这个属性的目的是告诉线程调度器,用户希望哪些线程相对多运行、哪些少运行。通常使用默认的优先级。这个属性生效取决于操作系统的支持。优先级也可能被操作系统改变。 | 默认和父线程的优先级相等,共10个等级,默认值为5;不应依赖 | ## 7. 线程异常处理 思考题 - Java异常体系图 - 实际工作中,如何全局处理异常?为什么要做全局处理?不处理行不行? 线程的未捕获异常UncaughtException应该如何处理? - 为什么需要UncaughtExceptionHandler? - 主线程可以轻松发现异常,子线程却不行 - 子线程异常无法用传统方法捕获 - 不能直接捕获会有不好的后果 - 两种解决办法 - 方案一(不推荐):手动在每个run方法里进行try catch。 - 方案二(推荐):利用Uncaught ExceptionHandler 以下就是子线程异常处理默认的实现 ```java public class ThreadGroup implements Thread.UncaughtExceptionHandler { public void uncaughtException(Thread t, Throwable e) { if (parent != null) { parent.uncaughtException(t, e); } else { Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler(); if (ueh != null) { ueh.uncaughtException(t, e); } else if (!(e instanceof ThreadDeath)) { System.err.print("Exception in thread \"" + t.getName() + "\" "); e.printStackTrace(System.err); } } } } ``` 几种实现方式 - 给程序**统一**设置 - 给每个线程**单独**设置 - 给**线程池**设置 ```java package threadcorekownledage.uncaughtexception; import java.util.logging.Level; import java.util.logging.Logger; /** * 自定义UncaughtExceptionHandler */ public class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { private String name; public MyUncaughtExceptionHandler(String name) { this.name = name; } @Override public void uncaughtException(Thread t, Throwable e) { // 统一异常逻辑处理 Logger logger = Logger.getAnonymousLogger(); logger.log(Level.WARNING, "线程异常,终止了 " + t.getName(), e); System.out.println(name + "捕获了异常"); } } ``` ```java package threadcorekownledage.uncaughtexception; public class UseMyUncaughtExceptionHandler implements Runnable { public static void main(String[] args) throws InterruptedException { // try catch捕获不到子线程抛出的异常 // 设置异常处理方法 Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler("MyUncaughtExceptionHandler")); try { new Thread(new UseMyUncaughtExceptionHandler(), "mythread-1").start(); Thread.sleep(200); new Thread(new UseMyUncaughtExceptionHandler(), "mythread-2").start(); Thread.sleep(200); new Thread(new UseMyUncaughtExceptionHandler(), "mythread-3").start(); Thread.sleep(200); new Thread(new UseMyUncaughtExceptionHandler(), "mythread-4").start(); } catch (RuntimeException e) { System.out.println("catch RuntimeException"); } } @Override public void run() { throw new RuntimeException(); } } ``` ## 8. 多线程带来的问题 ### 8.1 线程安全问题 什么是线程安全 ​ 当多个线程访问一个对象时,如果**不考虑**这些线程在运行时环境的调度和交替执行,也**不需要进行额外的同步**,或者在调用进行任何其他的协调操作,调用这个对象的行为都可以获得正确的结果,那这个对象是线程安全的。 什么时候会出现线程安全问题 #### 8.1.1 数据竞争 ​ 如果两个或者多个任务在临界段之外对一个共享变量进行写入操作,也就是说没有使用任何同步机制,那么应用程序可能存在**数据竞争**(也叫做**竞争条件**)。 ​ 在这些情况下,应用程序的最终结果可能取决于任务的执行顺序。 ```java package part1.concurrent_matter; public class ResourceCompete { private int ii; public void add(int add) { ii += add; } public static void main(String[] args) throws InterruptedException { ResourceCompete resourceCompete = new ResourceCompete(); Thread thread1 = new Thread(() -> { for (int i = 0; i < 1000; i++) { resourceCompete.add(10); } }); Thread thread2 = new Thread(() -> { for (int i = 0; i < 1000; i++) { resourceCompete.add(10); } }); thread1.start(); thread2.start(); thread1.join(); thread2.join(); // 这里得到的值可能会小于2000 System.out.println(resourceCompete.ii); } } ``` ​ 假设有两个不同的任务执行了同一个add方法。add方法不是原子的,ResourceCompete也不是线程安全的。 #### 8.1.2 死锁 ​ 当两个(或多个)任务正在等待必须有另一线程释放的某个共享资源,而该线程又正在等待必须由前述任务之一释放的另一共享资源时,并发应用程序就出现了死锁。当系统中同时出现如下四种条件时,就会导致这种情形。我们将其称为Coffman条件。 - 互斥:死锁中涉及的资源必须是不可共享的。一次只有一个任务可以使用该资源 - 占用并等待条件:一个任务在占有某一互斥的资源时由请求另一互斥的资源。当它在等待时,不会释放任何资源 - 不可剥夺:资源只能被那些持有它们的任务释放 - 循环等待:任务1正在等待任务2所占用的资源,而任务2又正在等待任务3所占有的资源,以此类推,最终任务n又在等待又任务1所占的资源,这样就出现了循环等待 ​ 有些机制可以用来避免死锁: - 忽略它们:这是最常用的机制。你可以假设自己的系统绝对不会出现死锁。如果发生死锁,结果就是你可以停止应用程序并且重新执行它 - 检测:系统中有一项专门分析系统状态的任务,可以检测是否发生死锁,如果检测到了死锁,可以采取一些措施来修复该问题,例如,结束某个任务或者强制释放某一资源。 - 预防:如果你想防止系统出现死锁,就必须预防Coffman条件中一条或者多条出现 - 规避:如果你可以在某一任务执行之前得倒该任务所使用资源的相关信息,那么死锁是可以规避的。当一个任务要开始执行时,你可以对系统中空闲的资源和任务所需的资源进行分析,这样就可以判断任务是否能够开始执行。 ##### 8.1.2.1 最简单的死锁 ```java package deadlock; /** * 最简单的死锁案例 */ public class MustDeadLock implements Runnable { int flag = 1; static Object o1 = new Object(); static Object o2 = new Object(); public static void main(String[] args) { MustDeadLock r1 = new MustDeadLock(); MustDeadLock r2 = new MustDeadLock(); r1.flag = 1; r2.flag = 2; Thread thread1 = new Thread(r1); Thread thread2 = new Thread(r2); thread1.start(); thread2.start(); // ThreadMXBean检查死锁 Thread.sleep(1000); ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); long[] deadlockedThreads = threadMXBean.findDeadlockedThreads(); if (deadlockedThreads != null && deadlockedThreads.length > 0) { for (int i = 0; i < deadlockedThreads.length; i++) { ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThreads[i]); System.out.println("发现死锁" + threadInfo.getThreadName()); } } } @Override public void run() { System.out.println("flag=" + flag); if (flag == 1) { synchronized (o1) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (o2) { System.out.println(Thread.currentThread().getName() + "成功拿到两把锁"); } } } if (flag == 2) { synchronized (o2) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (o1) { System.out.println(Thread.currentThread().getName() + "成功拿到两把锁"); } } } } } ``` ```c++ Found one Java-level deadlock: ============================= "Thread-1": waiting to lock monitor 0x00007fcf3c01af68 (object 0x000000076adf1350, a java.lang.Object), which is held by "Thread-0" "Thread-0": waiting to lock monitor 0x00007fcf3c01d8a8 (object 0x000000076adf1360, a java.lang.Object), which is held by "Thread-1" Java stack information for the threads listed above: =================================================== "Thread-1": at deadlock.MustDeadLock.run(MustDeadLock.java:48) - waiting to lock <0x000000076adf1350> (a java.lang.Object) - locked <0x000000076adf1360> (a java.lang.Object) at java.lang.Thread.run(Thread.java:748) "Thread-0": at deadlock.MustDeadLock.run(MustDeadLock.java:35) - waiting to lock <0x000000076adf1360> (a java.lang.Object) - locked <0x000000076adf1350> (a java.lang.Object) at java.lang.Thread.run(Thread.java:748) Found 1 deadlock. ``` ##### 8.1.2.2 多人转账案例 ```java package deadlock; import java.util.Random; /** * 多人转账的时候遇到死锁 */ public class MultiTransferMoney { private static final int NUM_ACCOUNTS = 500; private static final int NUM_MONEY = 1000; private static final int NUM_ITERATIONS = 1000000; private static final int NUM_THREAD = 20; int flag = 1; public static void main(String[] args) throws InterruptedException { Random random = new Random(); TransferMoney.Account[] accounts = new TransferMoney.Account[NUM_ACCOUNTS]; for (int i = 0; i < accounts.length; i++) { accounts[i] = new TransferMoney.Account(NUM_MONEY); } class TransferThread extends Thread { @Override public void run() { for (int i = 0; i < NUM_ITERATIONS; i++) { int fromAcct = random.nextInt(NUM_ACCOUNTS); int toAcct = random.nextInt(NUM_ACCOUNTS); int amount = random.nextInt(200); TransferMoney.Account from = accounts[fromAcct]; TransferMoney.Account to = accounts[toAcct]; synchronized (from) { synchronized (to) { if (from.balance - amount < 0) { System.out.println("余额不足,转账失败!"); return; } from.balance -= amount; to.balance += amount; System.out.println("转账成功" + amount + "元"); } } } } } for (int i = 0; i < NUM_THREAD; i++) { new TransferThread().start(); } } } ``` 思路:避免相反的获取锁的顺序 ```java package deadlock; import java.util.Random; /** * 多人转账的时候遇到死锁 */ public class MultiTransferMoneyFix { private static final int NUM_ACCOUNTS = 500; private static final int NUM_MONEY = 1000; private static final int NUM_ITERATIONS = 1000000; private static final int NUM_THREAD = 20; private static Object lock = new Object(); public static void main(String[] args) { Random random = new Random(); TransferMoney.Account[] accounts = new TransferMoney.Account[NUM_ACCOUNTS]; for (int i = 0; i < accounts.length; i++) { accounts[i] = new TransferMoney.Account(NUM_MONEY); } class TransferThread extends Thread { @Override public void run() { for (int i = 0; i < NUM_ITERATIONS; i++) { int fromAcct = random.nextInt(NUM_ACCOUNTS); int toAcct = random.nextInt(NUM_ACCOUNTS); int amount = random.nextInt(200); TransferMoney.Account from = accounts[fromAcct]; TransferMoney.Account to = accounts[toAcct]; transferMoney(from, to, amount); } } } for (int i = 0; i < NUM_THREAD; i++) { new TransferThread().start(); } } public static void transferMoney(TransferMoney.Account from, TransferMoney.Account to, int amount) { class Helper { public void transfer() { if (from.balance - amount < 0) { System.out.println("余额不足,转账失败!"); return; } from.balance -= amount; to.balance += amount; System.out.println("转账成功" + amount + "元"); } } // 根据对象锁的id大小给定获取锁的顺序 int fromHash = System.identityHashCode(from); int toHash = System.identityHashCode(to); if (fromHash < toHash) { synchronized (from) { synchronized (to) { new Helper().transfer(); } } } else if (fromHash > toHash) { synchronized (to) { synchronized (from) { new Helper().transfer(); } } } // 如果hash值一样的话,加同步锁 else { synchronized (lock) { synchronized (from) { synchronized (to) { new Helper().transfer(); } } } } } } ``` ##### 8.1.2.3 哲学家就餐 要吃饭得先拿起左边的筷子再拿起。要思考的时候先放下右边的筷子再放下左边的筷子 ```java // 伪代码 while(true){ // thinking about life think(); // prepare to eat pick_up_left_fork(); pick_up_right_fork(); eat(); put_down_right_fork(); put_down_left_fork(); // not hungry,go to thinking } ``` ![](.muke-learn_images/20dc8ae4.png) 解决方案 - 服务员检查(避免策略) - 改变一个哲学家拿筷子的顺序(避免策略) - 餐票(避免策略) - 领导调节(检测与恢复策略) ```java package deadlock; /** * 哲学家就餐问题导致的死锁 */ public class DiningPhilosophers { public static class Philosopher implements Runnable { private Object leftChopstick; private Object rightChopstick; public Philosopher(Object leftChopstick, Object rightChopstick) { this.leftChopstick = leftChopstick; this.rightChopstick = rightChopstick; } @Override public void run() { while (true) { // thinking try { doAction("thinking"); // get leftChopstick synchronized (leftChopstick) { doAction("get leftChopstick"); //Picked up right chopstick; synchronized (rightChopstick) { doAction("get rightChopstick"); doAction("put down rightChopstick"); } doAction("put down leftChopstick"); } } catch (InterruptedException e) { e.printStackTrace(); } } } private void doAction(String action) throws InterruptedException { System.out.println(Thread.currentThread().getName() + " " + action); Thread.sleep((long)(Math.random()*10)); } } public static void main(String[] args) { // 定义5个哲学家 Philosopher[] philosophers = new Philosopher[5]; Object[] chopsticks = new Object[philosophers.length]; for (int i = 0; i < chopsticks.length; i++) { chopsticks[i] = new Object(); } // 5根筷子 for (int i = 0; i < philosophers.length; i++) { Object leftChopstick = chopsticks[i]; Object rightChopstick = chopsticks[(i + 1) % chopsticks.length]; // if (i == philosophers.length - 1) { // philosophers[i] = new Philosopher(rightChopstick, leftChopstick); // } else { // philosophers[i] = new Philosopher(leftChopstick, rightChopstick); // } philosophers[i] = new Philosopher(leftChopstick, rightChopstick); new Thread(philosophers[i], "哲学家" + (i + 1) + "号").start(); } } } ``` 改变一个哲学家拿筷子的顺序 ```java public static void main(String[] args) { // 定义5个哲学家 Philosopher[] philosophers = new Philosopher[5]; Object[] chopsticks = new Object[philosophers.length]; for (int i = 0; i < chopsticks.length; i++) { chopsticks[i] = new Object(); } // 5根筷子 for (int i = 0; i < philosophers.length; i++) { Object leftChopstick = chopsticks[i]; Object rightChopstick = chopsticks[(i + 1) % chopsticks.length]; // 改变一个哲学家拿筷子的顺序 if (i == philosophers.length - 1) { philosophers[i] = new Philosopher(rightChopstick, leftChopstick); } else { philosophers[i] = new Philosopher(leftChopstick, rightChopstick); } new Thread(philosophers[i], "哲学家" + (i + 1) + "号").start(); } } ``` 检测与恢复策略 - 允许发生死锁 - 每次调用锁都记录 - 定时检查“锁的调用链路图”中是否存在环路 - 一旦发现死锁,就用死锁恢复机制进行恢复 - 进程终止:逐个终止线程,直到死锁消除 - 终止顺序: - 优先级(是前台交互还是后台处理) - 已占用资源,还需要的资源 - 已经运行时间 - 资源抢占 - 把已经分发出去的锁收回来 - 让线程回退几步,这样就不用结束整个线程,成本比较低 ##### 8.1.2.4 实际工程中如何避免死锁 tips1:设置超时时间 - Lock的tryLock(long timeout, TimeUnit unit) - synchronized不具备尝试锁的能力 - 造成超时的可能性多:发生了死锁、线程陷入死循环、线程执行很慢 - 获取锁失败:打日志、发警报邮件、重启等 tips2:多使用并发类而不是自己设计锁 tips3:尽量降低锁的粒度,用不同的锁而不是一个锁 tips4:如果能使用同步代码块,就不使用同步方法,可以自己指定锁对象 tips5:给线程起有意义的名字,排查问题事半功倍 tips6:避免锁的嵌套 tips7:分配资源钱看能不能回收回来 Tips8:尽量不要几个功能使用同一把锁:专锁专用 #### 8.1.3 活锁 ​ 虽然线程并没有阻塞,也始终在运行,但是程序却得不到进展,因为线程始终重复做同样的事。 ​ 如果系统中有两个任务,它们总是因为对方的行为而改变自己的状态,那么就出现了活锁。最终结果是它们陷入了状态变更的循环而无法继续向下执行。 ​ 例如,有两个任务:任务1和任务2,它们都需要用到两个资源:资源1和资源2。假设任务1对资源1加了一个锁,而任务2对资源2加了一个锁。当它们无法访问所需的资源时,就会释放自己的资源并且重新开始循环。这种情况可以无限地持续下去,所以这两个任务都不会结束自己的执行过程。 ```java package deadlock; import java.util.Random; /** * 演示活锁问题 */ public class LiveLock { static class Spoon { private Diner owner; public Spoon(Diner owner) { this.owner = owner; } public Diner getOwner() { return owner; } public void setOwner(Diner owner) { this.owner = owner; } public synchronized void use() { System.out.printf("%s吃完了!", owner.name); } } static class Diner { private String name; private boolean isHungry; public Diner(String name) { this.name = name; isHungry = true; } public void eatWith(Spoon spoon, Diner spouse) { while (isHungry) { if (spoon.owner != this) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } continue; } Random random = new Random(); // if (spouse.isHungry && random.nextInt(10) < 9) { 添加随机因素避免活锁 if (spouse.isHungry) { System.out.println(name + ": 亲爱的" + spouse.name + "你先吃吧"); spoon.setOwner(spouse); continue; } spoon.use(); isHungry = false; System.out.println(name + ": 我吃完了"); spoon.setOwner(spouse); } } } public static void main(String[] args) { Diner husband = new Diner("牛郎"); Diner wife = new Diner("织女"); Spoon spoon = new Spoon(husband); new Thread(() -> husband.eatWith(spoon, wife)).start(); new Thread(() -> wife.eatWith(spoon, husband)).start(); } } ``` #### 8.1.4 资源不足(饥饿) ​ 当某个任务在系统中无法获取维持其继续执行所需的资源时,就会出现资源不足。当有多个任务在等待某一资源且该资源被释放时,系统需要选择下一个可以使用该资源的任务。如果你的系统中没有设计良好的算法,那么系统中有些线程很可能要为获取该资源而等待很长时间。 ​ 要解决这一问题就要确保公平原则。所有等待某一资源的任务必须在某一给定时间之内占有该资源。可选方案之一就是实现一个算法,在选择下一个将占有某一资源的任务时,对任务已等待该资源的时间因素加以考虑。然而,实现锁的公平需要增加额外的开销,这可能会降低程序的吞吐量。 #### 8.1.5 优先权反转 ​ 当一个低优先权的任务持有了一个高优先级任务所需的资源时,就会发生优先权反转。这样的话,低优先权的任务就会在高优先权的任务之前执行。 ### 8.2 性能问题 上下问切换问题 # JUC部分 ### 1 线程池 #### 1.1 什么是线程池 ​ 线程池就是创建一个缓冲池存放线程,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等候下次任务来临,这使得线程池比手动创建线程有着更多的优势: - 降低系统资源消耗:通过重用已存在的线程,降低线程创建和销毁造成的消耗; - 提高系统响应速度:当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行; - 方便线程并发数的管控:因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM; - 节省cpu切换线程的时间成本(需要保持当前执行线程的现场,并恢复要执行线程的现场); - 提供更强大的功能:比如延时定时线程池。 Timer vs ScheduledThreadPoolExecutor #### 1.2 线程池构造函数分析 线程池构造函数的参数 | 参数名 | 类型 | 含义 | | ------------- | ------------------------ | ------------------------------------------------------- | | corePoolSize | int | 核心线程数 | | maxPoolSize | int | 最大线程数 | | keepAliveTime | long | 存活时间 | | workQueue | BlockingQueue | 任务存储队列 | | threadFactory | ThreadFactory | 当线程需要新的线程时,会使用threadFactory来生成新的线程 | | Handler | RejectedExecutionHandler | 由于线程池无法接受你所提交的任务的拒绝策略 | - corePoolSize指的是核心线程数:线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新的线程去执行任务; - 线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新的线程数有一个上限,这个上限就是maxPoolSize; - 如果线程池当前的线程数多于corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,它们就会被销毁 - 新的线程时有ThreadFactory创建的,默认使用Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否时守护线程等。 - 有三种常见类型的队列 - 直接交接:SynchronousQueue - 无界队列:LinkedBlockingQueue - 有界队列:ArrayBlockingQueue #### 1.3 添加线程的规则 1. 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务。 2. 如果线程数等于(或大于)corePoolSize但少于maxPoolSize,则将任务放入队列。 3. 如果队列满了,且线程数小于maxPoolSize,则创建一个新线程来运行任务。 4. 如果队列满了,且线程数大于或等于maxPoolSize则拒绝该任务。 #### 1.4 线程池应该手动创建还是自动创建 结论:手动创建更好,因为这样可以让我们更加明确线程池的运行规则,避免资源耗尽的风险。 阿里巴巴规范提示: ```xml 线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors返回的线程池对象的弊端如下: 1)FixedThreadPool和SingleThreadPool:   允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。 2)CachedThreadPool:   允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。 Positive example 1: //org.apache.commons.lang3.concurrent.BasicThreadFactory ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()); Positive example 2: ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); //Common Thread Pool ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); pool.execute(()-> System.out.println(Thread.currentThread().getName())); pool.shutdown();//gracefully shutdown Positive example 3: //in code userThreadPool.execute(thread); ``` #### 1.5 线程池核心线程数的设定 - CPU密集型(加密/计算hash等):最佳线程数为CPU核心数的1-2倍左右。 - 耗时IO型(读取数据库、文件、网络读写等):最佳线程数一般大于CPU核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法: - 线程数=CPU核心数*(1 + 平均等待时间/平均工作时间) #### 1.6 常见线程池的对比 ```java public class Executors { // 固定线程数的线程池,队列时用的无界队列。任务数过多会导致OOM public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } // 单个线程的线程池,队列时用的无界队列。任务数过多会导致OOM public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } // 缓存线程池,最大线程数为Integer.MAX_VALUE,线程过多容易发生OOM public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } // 定时器线程池(可以延迟/时间间隔 执行任务),最大线程数为Integer.MAX_VALUE,线程过多容易发生OOM public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } } ``` #### 1.7 停止线程池 shutdown #### 1.8 拒绝任务 线程池拒绝任务的时机有两个: - 当Executor关闭时,提交新任务会被拒绝 - 当Executor对最大线程的工作队列容量使用有边界并且已饱和时 线程池拒绝策略有以下4种 - AbortPolicy (丢弃任务,并抛出异常) - DiscardPolicy (丢弃任务,不抛异常) - DiscardOldestPolicy (丢弃最老的任务) - CallerRunsPolicy (让调用方执行该任务) #### 1.9 线程池的钩子方法 ```java package threadpool; import java.util.concurrent.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 每个任务执行前后可以执行钩子函数 */ public class PauseableThreadPool extends ThreadPoolExecutor { private boolean isPaused; private Lock lock = new ReentrantLock(); private Condition unPaused = lock.newCondition(); public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } /** * 复写beforeExecute或者afterExecute,书写自己的钩子函数 */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); lock.lock(); try { while (isPaused) { unPaused.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private void pause() { lock.lock(); try { isPaused = true; } finally { lock.unlock(); } } private void resume() { lock.lock(); try { isPaused = false; unPaused.signalAll(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); Runnable runnable = () -> { System.out.println(Thread.currentThread().getName() + " 执行"); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }; for (int i = 0; i < 10000; i++) { pauseableThreadPool.execute(runnable); } Thread.sleep(1500L); pauseableThreadPool.pause(); System.out.println("线程池被暂停了"); Thread.sleep(1500L); pauseableThreadPool.resume(); } } ``` #### 1.10 实现原理 线程池组成部分 - 线程池管理器 - 工作线程 - 任务队列 - 任务接口(Task) Executor家族 线程池、ThreadPoolExecutor、ExecutorService、Executor、Executors等这么多和线程池相关的类,它们之间有什么关系。 Executor Executor是一个接口 ``` public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); } ``` ExecutorService继承了Executor,并拥有一定管理线程池的功能 ![](.muke-learn_images/ae306d69.png) Executors是创建线程池的工具类 ![](.muke-learn_images/48eadedc.png) 线程池实现任务复用的原理 相同线程执行不同任务 ```java ExecutorService executorService = Executors.newFixedThreadPool(4); executorService.submit(new Task()); // 实际调用的是AbstractExecutorService.submit(Runnable runnable) // 上面的方法又调用了ThreadPoolExecutor.execute(Runnable runnable) public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); // 池中线程数少于核心线程数,则新增work线程 if (workerCountOf(c) < corePoolSize) { // true表示跟corePoolSize比较,false表示跟maximumPoolSize比较 if (addWorker(command, true)) return; c = ctl.get(); } // 线程池运行中,且队列还能添加任务 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 双重判断线程池状态,如果状态不是RUNNING,则移除队列中的任务删除并拒收任务 if (! isRunning(recheck) && remove(command)) reject(command); // 如果work线程损耗导致线程数为0,则添加一个工作线程保证线程池的正常运行 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 线程池运行中,但是队列满了;或者线程池停止运行了 else if (!addWorker(command, false)) reject(command); } // 以下是添加线程的逻辑 /** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // Worker对象实现了Runnable接口,重写了run方法. w = new Worker(firstTask); // Worker创建会创建一个实现自身的一个工作线程。这样做就可以调用runWoker方法 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } // Worker对象实际调用的run方法 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 不断的从阻塞队列获取并执行任务 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } ``` 所以线程池是通过Worker对象(线程)去消费传进来的Runnable任务。 #### 1.11 线程池的状态 - RUNNING:接受新任务并处理排队任务 - SHUTDOWN:不接受新任务,但是处理排队任务 - STOP:不接受新任务,也不处理排队任务,并中断正在进行的任务 - TIDYING:中文是整理,所有任务都已终止,workerCount为零时,线程会转换到TIDYING状态,并将运行terminate()钩子方法。 - TERMINATED:terminate()运行完成 1.12 使用线程池的注意点 - 避免任务堆积 - 避免线程数过度增加 - 排查线程泄漏(线程回收不了,一般是任务逻辑问题) ### 2 ThreadLocal 两大使用场景 - 典型场景1:每个线程需要一个独立的对象(通常是工具类,典型需要使用的类有SimpleDateFormat和Random),即处理线程安全问题 - 典型场景2:每个线程内需要保存全局变量(例如在拦截器中获取用户信息),可以让不同方法直接使用,避免参数传递的麻烦,即处理线程内共享变量 #### 2.1 ThreadLocal处理线程安全 ```java package threadlocal; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 使用线程池打印日期,演示线程安全问题 */ public class ThreadLocalNormalUsage01 { // 所有线程都使用同一个SimpleDateFormat对象,会产生线程安全问题 static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); public String date(int seconds) { // 参数的单位是毫秒 从 1970.1.1 00:00:00 GMT计时 Date date = new Date(1000 * seconds); return dateFormat.format(date); } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 1000; i++) { int finalI = i; executorService.submit(() -> { String date = new ThreadLocalNormalUsage01().date(finalI); System.out.println(date); }); } } } ``` 使用ThreadLocal解决线程安全问题 ```java package threadlocal; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 使用ThreadLocal避免线程安全问题 */ public class ThreadLocalNormalUsage02 { public String date(int seconds) { // 参数的单位是毫秒 从 1970.1.1 00:00:00 GMT计时 Date date = new Date(1000 * seconds); return ThreadSafeFormatter.dateFormatThreadLocal.get().format(date); } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 1000; i++) { int finalI = i; executorService.submit(() -> { String date = new ThreadLocalNormalUsage02().date(finalI); System.out.println(date); }); } } } class ThreadSafeFormatter { public static ThreadLocal dateFormatThreadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")); } ``` #### 2.2 ThreadLocal实现线程共享变量 ```java package threadlocal; public class ThreadLocalNormalUsage03 { public static void main(String[] args) { Service1 service1 = new Service1(); service1.process(); } } class Service1 { public void process() { User user = new User("nihao"); UserContextHolder.holder.set(user); new Service2().process(); } } class Service2 { public void process() { User user = UserContextHolder.holder.get(); System.out.println(user); } } class UserContextHolder { public static ThreadLocal holder = new ThreadLocal<>(); } class User { String name; public User(String name) { this.name = name; } @Override public String toString() { return "User{" + "name='" + name + '\'' + '}'; } } ``` 总结: - 让某个需要用到的对象在**线程间隔离**(每个线程都有自己的独立的对象) - 让任何方法中都能轻松的获取到该对象 #### 2.3 ThreadLocal原理 ```java // 每个Thread对象中都持有ThreadLocalMap成员 public class Thread implements Runnable { /* ThreadLocal values pertaining to this thread. This map is maintained * by the ThreadLocal class. */ ThreadLocal.ThreadLocalMap threadLocals = null; } public class ThreadLocal { // ... static class ThreadLocalMap { /** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ static class Entry extends WeakReference> { /** The value associated with this ThreadLocal. */ Object value; // map结构体现,key是ThreadLocal的引用,value是要保持的对象 Entry(ThreadLocal k, Object v) { super(k); value = v; } } private Entry[] table; /** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {@link #initialValue} method. * * @return the current thread's value of this thread-local */ public T get() { Thread t = Thread.currentThread(); // 获取当前线程的 ThreadLocalMap ThreadLocalMap map = getMap(t); if (map != null) { // this是ThreadLocal, 上例中的“UserContextHolder.holder”.get(); ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } ThreadLocalMap getMap(Thread t) { return t.threadLocals; } public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) // 这里是为当前线程的ThreadLocalMap map.set(this, value); else createMap(t, value); } } ``` ​ Thread对象中有`threadLocals`成员变量其类型为`ThreadLocal.ThreadLocalMap`。`ThreadLocal.set`方法存储 ThreadLocalMap,这里的map是当前线程中的ThreadLocalMap对象,key为当前ThreadLocal,value为存入的对象。`ThreadLocal.get`方法获取当前线程的ThreadLocalMap,并通过key(当前ThreadLocal)获取值。 #### 2.4 ThreadLocal使用注意点 ##### 2.4.1 内存泄漏 ​ ThreadLocalMap 中定义的 Entry 由`(ThreadLocal k, Object v)`两个变量组成。Entry继承了WeakReference,在给`k`赋值时实际上是给ThreadLocal对象贴上弱引用的标签,当发生GC时这个对象会被回收掉。但是给`v`赋值则用的是强引用,所以内存泄漏可能会出现在`v`上。正常情况下线程执行完退出,`v`会被垃圾回收掉。但是,如果线程不终止(比如线程需要保持很久),那么key对应的value就不能被回收,因为有以下的调用链: ``` Thread -> ThreadLocalMap -> Entry(key为null) -> value ``` ​ 实际上,java中已经考虑过这个问题了,所以在set、remove、rehash方法中会扫描key为null的Entry,并把对应的value设置为null,这样value对象就可以被回收。 ```java /** * Re-pack and/or re-size the table. First scan the entire * table removing stale entries. If this doesn't sufficiently * shrink the size of the table, double the table size. */ private void rehash() { expungeStaleEntries(); // Use lower threshold for doubling to avoid hysteresis if (size >= threshold - threshold / 4) resize(); } /** * Double the capacity of the table. */ private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal k = e.get(); // 这里会坚持k是否是null,如果是则将value置为null if (k == null) { e.value = null; // Help the GC } else { int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } setThreshold(newLen); size = count; table = newTab; } ``` ​ 但是如果一个ThreadLocal不被使用,那么实际上set、remove、rehash方法也不会被调用,如果同时线程又不停止,那么调用链就一直存在,那么就导致了value的内存泄漏。 ​ 那么如何避免呢,阿里规约中提到,调用remove方法,就会删除对应的Entry对象,可以避免内存泄漏,所以使用完ThreadLocal之后,应该调用remove方法。 ##### 2.4.2 共享变量 ​ 如果在每个线程中ThreadLocal.set进去的东西本来就是多线程共享的同一个对象,比如static对象,那么多线程的ThreadLocal.get()获取到的还是这个共享变量本身,还是有并发访问问题。 ##### 2.4.3 优先使用框架的支持 优先使用框架的支持,而不是自己创造,例如在Spring中,如果可以使用RequestContextHolder,那么就不需要自己维护ThreadLocal,因为自己可能忘记调用remove方法等,造成内存泄漏。 ThreadLocal在Spring中的应用: - RequestContextHolder - DateTimeContextHolder ### 3 Lock ​ Lock和synchronized,这两个是最常见的锁,它们都可以达到线程安全的目的,但是使用上和功能上又有比较大的不同。Lock并不是代替synchronized的,而是synchronized不适用的情况下,来提供高级功能的。 ​ synchronized的一些局限性: - 效率低:锁的释放情况少,试图获得锁是不能设定超时时间、不能中断一个正在试图获取锁的线程。 - 不够灵活(读写锁更灵活):加锁和释放的时机单一,每个锁仅有单一的条件(某个对象),可能是不够的。 - 无法知道是否成功获取到锁,只能等待。 ​ 在Lock中声明了四个方法来获取锁 - lock() lock()就是最普通的获取锁。如果锁已被其他线程获取,则进行等待。Lock不会像synchronized一样在异常是自动释放锁。因此最佳实践是,在finally中释放锁,以保证发生异常时一定被释放。lock()方法不能被中断,这会带来很大的隐患:一旦陷入死锁lock()就会陷入永久等待。 tryLock()用来尝试获取锁,如果当前锁没有其他线程占用,则获取成功返回true,反之则返回false表示获取锁失败。相比于lock(),这样的方法显得更加的优秀,我们可以根据是否能获取到锁来决定后续程序的行为。该方法会直接返回获取锁的结果,不会被一直阻塞。 - tryLock/tryLock(long time, TimeUnit unit) tryLock(long time, TimeUnit unit):超时就放弃。 tryLock可以避免死锁问题 ```java package lock.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TryLocalDeadLock implements Runnable { int flag = 1; private static Lock lock1 = new ReentrantLock(); private static Lock lock2 = new ReentrantLock(); public static void main(String[] args) { TryLocalDeadLock r1 = new TryLocalDeadLock(); TryLocalDeadLock r2 = new TryLocalDeadLock(); r1.flag = 1; r2.flag = 2; Thread thread1 = new Thread(r1); Thread thread2 = new Thread(r2); thread1.start(); thread2.start(); } @Override public void run() { System.out.println("flag=" + flag); while (true) { if (flag == 1) { try { if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) { try { System.out.println(Thread.currentThread().getName() + "获取锁1成功"); if (lock2.tryLock(800, TimeUnit.MILLISECONDS)) { try { System.out.println(Thread.currentThread().getName() + "成功拿到两把锁"); break; } finally { lock2.unlock(); } } else { System.out.println(Thread.currentThread().getName() + "获取锁2失败"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock1.unlock(); } } else { System.out.println(Thread.currentThread().getName() + "获取锁1失败"); } } catch (InterruptedException e) { e.printStackTrace(); } } if (flag == 2) { try { // 调整获取锁的时间,避免活锁 if (lock2.tryLock(200, TimeUnit.MILLISECONDS)) { System.out.println(Thread.currentThread().getName() + "获取锁2成功"); try { if (lock1.tryLock(200, TimeUnit.MILLISECONDS)) { try { System.out.println(Thread.currentThread().getName() + "成功拿到两把锁"); break; } finally { lock1.unlock(); } } else { System.out.println(Thread.currentThread().getName() + "获取锁1失败"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock2.unlock(); } } else { System.out.println(Thread.currentThread().getName() + "获取锁2失败"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } } ``` - lockInterruptibly() lockInterruptibly():相当于tryLock(long time, TimeUnit unit)把超时时间设置为无限。在等待锁的过程中,线程可以被中断。 ```java package deadlock; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class LockInterruptibly implements Runnable{ private Lock lock = new ReentrantLock(); public static void main(String[] args) throws InterruptedException { LockInterruptibly lockInterruptibly = new LockInterruptibly(); Thread thread1 = new Thread(lockInterruptibly); Thread thread2 = new Thread(lockInterruptibly); thread1.start(); thread2.start(); Thread.sleep(2000); thread2.interrupt(); } @Override public void run() { System.out.println(Thread.currentThread().getName() + " 尝试获取锁"); try { lock.lockInterruptibly(); try { System.out.println(Thread.currentThread().getName() + " 获取了锁"); Thread.sleep(5000L); } catch (InterruptedException e) { // 这里是获取到了锁,但是在睡眠期间被打断 java.lang.InterruptedException: sleep interrupted System.out.println(Thread.currentThread().getName() + " 睡眠期间被中断"); e.printStackTrace(); } finally { lock.unlock(); System.out.println(Thread.currentThread().getName() + " 释放锁"); } } catch (InterruptedException e) { // 这里是获取锁期间被打断 java.lang.InterruptedException System.out.println(Thread.currentThread().getName() + " 获取锁期间被中断"); e.printStackTrace(); } } } ``` lock可见性保证 #### 3.1 锁的分类 ![](.muke-learn_images/3734f3ff.png) #### 3.2 乐观锁和悲观锁(非互斥同步锁) 互斥同步锁的劣势: - 阻塞和唤醒会带来性能的损耗 - 可能永久阻塞 - 可能出现优先级反转 互斥锁的代表是ReentrantLock,ReetrantLock相关类之间的继承关系,如下图所示 ![](.muke-learn_images/61f303db.png) ​ ReentrantLock本身没有代码逻辑,实现都在内部类Sync中: ```java public class ReentrantLock implements Lock, java.io.Serializable { /** Synchronizer providing all implementation mechanics */ private final Sync sync; public void lock() { sync.lock(); } public void unlock() { sync.release(1); } } ``` ​ ​ 乐观锁的思想是不锁住被操作的对象,在更新的时候,去对比在我修改的期间数据有没有被其他线程改变,如果没有被改过就说明只有我在操作数据,那么就正常去修改数据。如果数据在更新数据前发现数据不是我之间拿到的,说明其他线程修改过数据,那么就选择放弃修改、报错、重试等策略。乐观锁的实现一般都是利用CAS算法来实现。比如AtomicInteger类的一些方法用的就是乐观锁。 悲观锁:适合并发写入多的情况,适用于临界区持锁时间比较长的情况,悲观锁可以避免大量的无用的自旋等消耗,典型情况: - 临界区有IO操作 - 临界区代码复杂或者循环量大 - 临界区竞争非常激烈 乐观锁:适用于并发写入少,大部分都是读取的场景,不加锁的能让读取性能大幅提高。 #### 3.3 可重入锁和非可重入锁 重入锁,表示支持重新进入的锁,也就是说,如果当前线程t1通过调用lock方法获取了锁之后,再次调用lock,是不会再阻塞去获取锁的,直接增加重试次数就行了。synchronized和ReentrantLock都是可重入锁。 ```java package lock.lock; public class ReentrantLockDemo { public synchronized void m1(){ System.out.println("m1 run"); m2(); } public synchronized void m2() { System.out.println("m2 run"); m3(); } public void m3() { synchronized (this) { System.out.println("m3 run"); } } public static void main(String[] args) { ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo(); reentrantLockDemo.m1(); } } ``` 如果synchronized不是可重入锁的话,上面的方法调用必然会生产死锁。 源码对比:可重入锁ReetrantLock以及非可重入锁ThreadPoolExecutor的Worker类. 非可重入锁 ```java protected boolean tryAcquire(int unused) { // 直接尝试获取锁 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); // 释放时也是直接修改状态 setState(0); return true; } ``` 可重入锁 ```java /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 线程首次进入把自己设置为占用锁的线程,并将status改为acquires(通常是1) if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果锁状态不是0,获取锁时先判断线程是否是占用锁的线程,如果是则status+1,并返回true else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 释放锁时也判断当前线程是否是已经占用锁的线程,然后再判断status,如果status等于0,才是正真的释放锁 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } ``` ReentrantLock其他方法介绍 - isHeldByCurrentThread可以看出锁是否被当前线程持有 - getQueueLength可以返回当前正在等待这把锁的队列有多长,一般这两个方法是开发和调试时候使用。 #### 3.4 公平锁和非公平锁 ​ 公平指的是按照线程请求的顺序,来分配锁;非公平指的是,不完全按照请求顺序,在一定情况下可以插队。非公平也同样不提倡“插队”行为,这里的非公平,指的是“在合适的时机”插队,而不是盲目插队。插队的线程获取锁如果没有获取到则乖乖的去排队,如果获取到了就直接往下执行。Java设计者这样设计的目的是为了提高效率。在锁释放后,唤醒线程是需要时间的,而在这个时间如果“插队”线程能拿到锁执行完再释放锁的话,就避免了唤醒带来的空档期。 ```java package lock.lock; import java.util.Random; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class FairLock { public static void main(String[] args) throws InterruptedException { PrintQueue printQueue = new PrintQueue(); Thread thread[] = new Thread[10]; Job job = new Job(printQueue); for (Thread t : thread) { t = new Thread(job); t.start(); Thread.sleep(100); } } } class Job implements Runnable { PrintQueue printQueue; public Job(PrintQueue printQueue) { this.printQueue = printQueue; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "准备打印"); printQueue.printJob(new Object()); System.out.println(Thread.currentThread().getName() + "打印完毕"); } } class PrintQueue { private Lock queueLock = new ReentrantLock(true); public void printJob(Object document) { // 这个for循环表示一个线程会两次获取锁,如果是公平锁的话,他会排在第二轮去执行 for (int i = 0; i < 2; i++) { queueLock.lock(); try { int duration = new Random().nextInt(10) + 1; System.out.println(Thread.currentThread().getName() + " 正在打印, 需要:" + duration + "s"); Thread.sleep(duration * 1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { queueLock.unlock(); } } } } ``` 公平锁和非公平锁的优缺点 | | 优势 | 劣势 | | -------- | ---------------------------------------------------- | ---------------------------------------------------------- | | 公平锁 | 各线程平等,每个线程在等待一段时间后,总有执行的机会 | 更慢,吞吐量更小 | | 不公平锁 | 更快,吞吐量更大 | 有可能产生线程饥饿,也就是某些线程在长时间内始终得不到执行 | ```java // 非公平锁 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 公平锁 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 先判断队列中是否有线程在等待,如果没有才让获取锁 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } ``` #### 3.5 共享锁和排他锁 ​ 排它锁,又称独占锁,独享锁。共享锁,又称为度锁,获得共享锁后,可以查看但是无法修改和删除数据,其他线程此时也可以获取到共享锁,也可以查看但是无法修改和删除数据。共享锁和排它锁的典型是读写锁ReentrantReadWriteLock,其中读锁是共享锁,写锁是独享锁。 读写锁的规则 - 多个线程只申请读锁,都可以申请到 - 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。 - 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。 ```java package lock.lock; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockDemo { static ReentrantReadWriteLock rwl=new ReentrantReadWriteLock(); static Lock read = rwl.readLock(); static Lock write = rwl.writeLock(); private static void read() { read.lock(); try { System.out.println(Thread.currentThread().getName() + "获取到读锁,正在读取"); TimeUnit.SECONDS.sleep(1L); } catch (InterruptedException e) { e.printStackTrace(); } finally { read.unlock(); System.out.println(Thread.currentThread().getName() + "释放读锁"); } } private static void write() { write.lock(); try { System.out.println(Thread.currentThread().getName() + "获取到写锁,正在写入"); TimeUnit.SECONDS.sleep(1L); } catch (InterruptedException e) { e.printStackTrace(); } finally { write.unlock(); System.out.println(Thread.currentThread().getName() + "释放写锁"); } } public static void main(String[] args) { new Thread(()->read(), "thread-1").start(); new Thread(()->read(), "thread-2").start(); new Thread(()->write(), "thread-3").start(); new Thread(()->write(), "thread-4").start(); new Thread(()->read(), "thread-5").start(); } } ``` 读锁插队策略 ​ 假设线程2和线程4正在同时读取,线程3想要写入但拿不到锁于是进入等待队列,线程5不在队列里,现在过来获取读锁。现在有两种情况,情况一:如果允许插队,能提高程序整体的运行效率,但是如果连连不断的线程来获取读锁,那么线程3迟迟拿不到写锁产生饥饿。情况二:不允许插队,将后续像获取读锁的线程放入等待队列,等前面的写锁获取并释放后再执行后续队列中的读锁。 ```java /** * Nonfair version of Sync */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; // writers can always barge } final boolean readerShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ // 判断队列头部是否是需要获取独占锁的线程(写锁) return apparentlyFirstQueuedIsExclusive(); } } /** * Fair version of Sync */ static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; // 无论读锁还是写锁都得排队 final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } } ``` 总结: - ReentrantReadWriteLock公平锁:不允许插队 - ReentrantReadWriteLock非公平锁: - 写锁可以随时插队 - 读锁仅在等待队列头部不是想获取写锁的线程是可以插队 读锁插队案例 ```java package lock.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class NonfairBargeDemo { static ReentrantReadWriteLock rwl=new ReentrantReadWriteLock(); static Lock read = rwl.readLock(); static Lock write = rwl.writeLock(); private static void read() { System.out.println(Thread.currentThread().getName() + " try get read lock"); read.lock(); try { System.out.println(Thread.currentThread().getName() + "获取到读锁,正在读取"); TimeUnit.MILLISECONDS.sleep(20L); } catch (InterruptedException e) { e.printStackTrace(); } finally { read.unlock(); } } private static void write() { System.out.println(Thread.currentThread().getName() + " try get write lock"); write.lock(); try { System.out.println(Thread.currentThread().getName() + "获取到写锁,正在写入"); TimeUnit.MILLISECONDS.sleep(20L); } catch (InterruptedException e) { e.printStackTrace(); } finally { write.unlock(); } } public static void main(String[] args) { new Thread(()->write(), "thread-1").start(); // 子线程先于下面4个线程获取到锁,则说明读锁插队成功 new Thread(()->read(), "thread-2").start(); new Thread(()->read(), "thread-3").start(); new Thread(()->read(), "thread-4").start(); new Thread(()->read(), "thread-5").start(); // 不断的产生子线程去尝试获取读锁 new Thread(() -> { Thread thread[] = new Thread[1000]; for (int i = 0; i < 1000; i++) { thread[i] = new Thread(() -> read(), "chird thread " + i); } for (Thread t : thread) { t.start(); } }).start(); } } ``` ##### 3.5.1 锁的升级和降级 ​ 锁降级指当前线程把持住写锁再获取到读锁,随后释放先前拥有的写锁的过程。 ```java package lock.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class LockUpgrading { static ReentrantReadWriteLock rwl=new ReentrantReadWriteLock(); static Lock read = rwl.readLock(); static Lock write = rwl.writeLock(); private static void read() { read.lock(); try { System.out.println(Thread.currentThread().getName() + "获取到读锁,正在读取"); TimeUnit.SECONDS.sleep(1L); // 这里发生阻塞,说明读锁不能升级为写锁 write.lock(); try { System.out.println(Thread.currentThread().getName() + "获取到读锁的同时,拿到了写锁"); } finally { write.unlock(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { read.unlock(); System.out.println(Thread.currentThread().getName() + "释放读锁"); } } private static void write() { write.lock(); try { System.out.println(Thread.currentThread().getName() + "获取到写锁,正在写入"); TimeUnit.SECONDS.sleep(1L); // 这里能获取到读锁并往下执行,说明写锁能降级称为读锁 read.lock(); try { System.out.println(Thread.currentThread().getName() + "获取到写锁的同时,拿到了读锁"); } finally { read.unlock(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { write.unlock(); System.out.println(Thread.currentThread().getName() + "释放写锁"); } } public static void main(String[] args) { new Thread(()->write(), "thread-2").start(); new Thread(()->read(), "thread-1").start(); } } ``` ​ 为什么不支持读锁升级呢? ​ 读写锁的原则是读和写这两个操作不能同时进行。如果多个读锁同时想升级为写锁,那势必要等到其他线程释放读锁,那么就会出现死锁的情况。 ##### 3.5.2 总结 - ReentrantReadWriteLock实现了ReadWriteLock接口,最主要有两个方法:readLock()和writeLock()用来获取读锁和写锁 - 锁申请和释放策略 - 多个线程只申请读锁,都可以申请到 - 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁 - 如果有一个线程已经占用了写锁,则此时其他线程池如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。 - 总结:要么一个或者多个线程同时有读锁,要么是一个线程有写锁,但是两者不会同时出现。 - 插队策略:为了防止饥饿,读锁不能插队 - 升降级策略:只能降级,不能升级 - ReentrantLock适用于一般场合,ReentrantReadWriteLock适用于读多写少的情况,合理使用可以提高并发效率。 #### 3.6 自旋锁和阻塞锁 ​ 阻塞或者唤醒一个java线程需要操作系统切换CPU状态来完成,这中状态转换是需要耗费处理器时间的。如果同步代码块中的内容过于简单,状态转换耗时可能比用户代码执行的时间还要长,这种情况下挂起线程和恢复线程的花费将得不尝失。所以有一种做法就是让这种线程不放弃CPU的执行时间,通过自旋的方式不断的尝试获取锁。这时线程就不必阻塞而是直接获取锁,从而避免切换线程的开销。这就是自旋锁。而阻塞锁就是如果没拿到锁的情况就直接进入阻塞状态直到被唤醒。 ​ 当然自旋锁缺点也很明显,如果等待的锁迟迟不释放,那么自旋的线程只会白白浪费处理器资源。在自旋的过程中,一直消耗CPU,所以虽然自旋锁开销低于悲观锁,但是随着自旋的时间的增长,开销也是线性增长的。 ​ 在java1.5版本及以上的并发框架juc中的atmoic包下的类基本都是自旋锁的实现。AtomicInteger的实现:自旋塑的实现原理是CAS,AtomicInteger中调用unsafe进行自增操作的源码中的do-while循环就是一个自旋操作,如果修改过程中遇到其他线程竞争导致修改失败,就在while里死循环,直至修改成功。 ```java public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } ``` ##### 3.6.1 自定义自旋锁 ```java package lock.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; public class SpinLock { private AtomicReference sign = new AtomicReference<>(); public void lock() { Thread current = Thread.currentThread(); while (!sign.compareAndSet(null, current)) { System.out.println(Thread.currentThread().getName() + " 获取自旋锁失败"); } } public void unlock() { Thread current = Thread.currentThread(); sign.compareAndSet(current, null); } public static void main(String[] args) { SpinLock spinLock = new SpinLock(); Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 开始尝试获取自旋锁"); spinLock.lock(); System.out.println(Thread.currentThread().getName() + " 获取自旋锁成功"); try { TimeUnit.MILLISECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } finally { spinLock.unlock(); System.out.println(Thread.currentThread().getName() + " 释放了自旋锁"); } } }; new Thread(runnable).start(); new Thread(runnable).start(); } } ``` 自旋锁使用的场景 - 自旋锁一般用于多核服务器,在并发度不是特别高的情况下,比阻塞锁的效率高 - 另外,自旋锁适用于临界区较小的情况,否则如果临界区很大,线程一旦拿到锁很久后才释放,那么其他线程自旋的损耗就特别的大。 #### 3.7 可中断锁 ​ 在java中,synchronized时不可中断锁,而Lock时可中断锁,因为tryLock(time)和lockInterruptibly()都能响应中断。如果某个线程A在执行锁中的代码,另一个线程B正在等待获取锁,可能由于等待时间过长,线程B不想等待了,那么我们就可以中断它,这就是可中断锁。 ```java // Lock 接口定义 public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); } ``` 3.8 锁实现的基本原理 ​ Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器(AQS),这个类非常重要,该类的父类时AbstractOwnableSynchronizer #### 3.8 Condition 3.8.1 Condition 与 Lock之间的关系 3.8.2 Condition 的使用场景 3.8.3 Condition 的实现原理 3.9 StampedLock 3.9.1 为什么引入 StampedLock 3.9.2 使用场景 3.9.3 “乐观锁”的实现原理 ### 4 Atomic类 #### 4.1 AtomicInteger 和 AtomicLong ​ 通常并行条件下对整数的加减操作需要考虑线程安全,需要通过加锁的方式进行。但是有了Concurrent包的Atomic相关的类之后,我们可以通过不加锁的方式对整数进行运算。 ```java public class MyClass { private int count1 = 0; private AtomicInteger count2 = new AtomicInteger(0); // 需要将加减操作放在同步方法中 public void synchronized increment() { count1++; } public void add(){ count2.getAndIncrement(); } } ``` 源码分析 ```java /** * Atomically increments by one the current value. * * @return the previous value */ public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } // Unsafe对象中的方法实现 public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { // 拿到更新的值 var5 = this.getIntVolatile(var1, var2); // 判断要更新的值是否有变动,有变动则进行自旋,直到修改成功为止 } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } ``` CAS方法中 1. 第一个参数表示要修改哪个对象的属性 2. 第二个参数时对象属性在内存的偏移量 3. 第三个参数表示期望值 4. 第四个参数表示要设置为的目标值 #### 4.2 AtomicBoolean 和 AtomicReference ​ 对于int或者long类型变量,需要进行加减操作,所以要加锁;但对于一个boolean类型来说,true or false的赋值操作,加上volatile关键字就够了,为什么还需要AtomicBoolean? ​ 原因在于volatile修饰的boolean在赋值操作确实是线程安全的,但是volatile并不保证原子性。如果要实现以下功能,则不能保证原子性导致线程安全。而CAS正好能保证原子性。 ```java if(!flag) { flag = true; } ``` ​ 在Unsafe类中,只提供了三种类型的CAS操作:int、long、Object(也就是引用类型)。如下所示。其他类型的CAS操作都要转换为这三种之一进行操作。 ```java public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5); public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6); ``` #### 4.3 AtomicStampedReference 和 AtomicMarkableReference ​ 这两个类是为了处理ABA问题。 ```java // AtomicStampedReference的实现 /** * Atomically sets the value of both the reference and stamp * to the given update values if the * current reference is {@code ==} to the expected reference * and the current stamp is equal to the expected stamp. * * @param expectedReference the expected value of the reference * @param newReference the new value for the reference * @param expectedStamp the expected value of the stamp * @param newStamp the new value for the stamp * @return {@code true} if successful */ public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); } // 当expectedReference != 对象当前的reference时,说明该数据肯定被其他线程修改过;当expectedReference == 对象当前的reference时,再进一步比较expectedStamp是否等于对象当前的版本号,以此判断是否被其他线程修改过。 // AtomicReference的实现 /** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(V expect, V update) { return unsafe.compareAndSwapObject(this, valueOffset, expect, update); } ``` ```java public class AtomicStampedReference { // 传进来的参数会转换成Pair对象 private static class Pair { final T reference; // 这个字段可以理解为版本号 final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static Pair of(T reference, int stamp) { return new Pair(reference, stamp); } } private volatile Pair pair; /** * Creates a new {@code AtomicStampedReference} with the given * initial values. * * @param initialRef the initial reference * @param initialStamp the initial stamp */ public AtomicStampedReference(V initialRef, int initialStamp) { pair = Pair.of(initialRef, initialStamp); } } ``` ​ AtomicMarkableReference与AtomicStampedReference原理类似,只是Pair里面的版本号是 boolean类型的,而不是整型的累加变量,如下所示: ```java public class AtomicMarkableReference { private static class Pair { final T reference; // 这里只有true、false两个版本号,所以并不能完全避免ABA问题,只是降低了ABA发生的概率 final boolean mark; private Pair(T reference, boolean mark) { this.reference = reference; this.mark = mark; } static Pair of(T reference, boolean mark) { return new Pair(reference, mark); } } private volatile Pair pair; /** * Creates a new {@code AtomicMarkableReference} with the given * initial values. * * @param initialRef the initial reference * @param initialMark the initial mark */ public AtomicMarkableReference(V initialRef, boolean initialMark) { pair = Pair.of(initialRef, initialMark); } } ``` #### 4.4 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater ​ 如果一个类是自己编写的,则可以在编写的时候把成员变量定义为Atomic类型。但如果是一个已经有的类,在不能更改其源代码的情况下,要想实现对其成员变量的原子操作,就需要 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater。 ```java package atomic; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; public class AtomicIntegerFieldUpdaterDemo implements Runnable{ static Demo demo1; static Demo demo2; AtomicIntegerFieldUpdater atomici = AtomicIntegerFieldUpdater.newUpdater(Demo.class, "score"); public static void main(String[] args) throws InterruptedException { demo1 = new Demo(); demo2 = new Demo(); AtomicIntegerFieldUpdaterDemo atomicIntegerFieldUpdaterDemo = new AtomicIntegerFieldUpdaterDemo(); Thread thread1 = new Thread(atomicIntegerFieldUpdaterDemo); Thread thread2 = new Thread(atomicIntegerFieldUpdaterDemo); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("demo1:" + demo1.score); System.out.println("demo2:" + demo2.score); } @Override public void run() { for (int j = 0; j < 1000; j++) { demo2.score++; atomici.incrementAndGet(demo1); } } public static class Demo { // 该变量需要进行反射,所以不能声明为private。 // IllegalAccessException: Class atomic.AtomicIntegerFieldUpdaterDemo can not access a member of class atomic.AtomicIntegerFieldUpdaterDemo$Demo with modifiers "private" // IllegalArgumentException: Must be volatile type volatile int score; } } ``` ​ 限制条件说明: ```java // 变量必须为int,不能是Integer if (field.getType() != int.class) throw new IllegalArgumentException("Must be integer type"); // 必须用volatile修饰 if (!Modifier.isVolatile(modifiers)) throw new IllegalArgumentException("Must be volatile type"); ``` #### 4.5 AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray ​ Concurrent包提供了AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray 三个数组元素的原子操作。注意,这里并不是说对整个数组的操作是原子的,而是针对数组中的一个元素的原子操作而言。 ```java package atomic; import java.util.concurrent.atomic.AtomicIntegerArray; /** * 演示 AtomicIntegerArray的基本使用 */ public class AtomicIntegerArrayDemo implements Runnable{ private AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(100); private int[] array = new int[100]; @Override public void run() { increment(); decrement(); } public void increment() { for (int i = 0; i < 100; i++) { // getAndIncrement方法需要传数组下角标 atomicIntegerArray.getAndIncrement(i); array[i]++; } } public void decrement() { for (int i = 0; i < 100; i++) { atomicIntegerArray.getAndDecrement(i); array[i]--; } } public static void main(String[] args) throws InterruptedException { AtomicIntegerArrayDemo atomicIntegerArrayDemo = new AtomicIntegerArrayDemo(); Thread[] threads = new Thread[500]; for (int i = 0; i < 500; i++) { threads[i] = new Thread(atomicIntegerArrayDemo); } for (int i = 0; i < 500; i++) { threads[i].start(); } // 这里的join不能放在上面的循环中,不然线程会按顺序执行。 for (int i = 0; i < 500; i++) { threads[i].join(); } for (int i = 0; i < atomicIntegerArrayDemo.array.length; i++) { if (atomicIntegerArrayDemo.array[i] != 0) { // 多线程中这里有可能被打印出来 System.out.println("array["+ i +"] is not zore"); } if (atomicIntegerArrayDemo.atomicIntegerArray.get(i) != 0) { System.out.println("atomicIntegerArray["+i+"] is not zore"); } } } } ``` #### 4.6 Striped64 与 LongAdder ​ 从JDK8开始,针对Long型的原子操作,Java又提供了LongAdder、LongAccumulator;针对Double类型,Java提供了DoubleAdder、DoubleAccumulator。Striped64相关的类的继承层次如下图所示: ![](.muke-learn_images/75d47cee.png) ​ AtomicLong内部是一个volatile long型变量,由多个线程对这个变量进行CAS操作。多个线程同时对一个变量进行CAS操作,在高并发的场景下仍然不够快,如果要提高性能,该怎么做? ​ 把变量拆分多份,变为多个变量。把Long类型拆成一个base变量外加多个Cell,每个Cell包装了一个Long型变量。当多个线程并发累加的时候,如果并发度低,就直接加到base变量上;如果并发度高,冲突大,平摊到这些Cell上。在最后取值的时候,再把base和这些Cell求sum运算。 #### 4.7 Accumulator累加器 ```java package atomic; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.LongAccumulator; import java.util.stream.IntStream; public class LongAccumulatorDemo { public static void main(String[] args) { // 可以传入表达式 LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 100); ExecutorService executorService = Executors.newFixedThreadPool(8); IntStream.range(1, 10).forEach(i -> executorService.submit(() -> longAccumulator.accumulate(i))); executorService.shutdown(); while (!executorService.isTerminated()) { } System.out.println(longAccumulator.getThenReset()); } } ``` 适用于并发计算 ### 5 final关键词 #### 5.1 什么是不变性(Immutable) ​ 如果对象在被创建后,状态就不能被修改,那么它就是不可变的。 #### 5.2 final的作用 - 类防止被继承、方法防止被重写、变量防止被修改 - 天生是线程安全的,而不需要额外的同步开销 #### 5.3 3种用法:修饰变量、方法、类 final修饰:3种变量 - final instance variable(类中的final属性) - final static variable(类中的static final属性) - final local variable(方法中的final变量) final修饰变量的赋值时机 - final instance variable(类中的final属性) - 第一种是在声明变量的等号右边直接赋值 - 第二种是在构造函数中赋值 - 第三种是在类的初始化代码块中赋值(不常用) - 注意:final语法规定必须要为变量赋值 - final static variable(类中的static final属性) - 两个赋值时机:除了在声明变量的等号右边直接赋值外,static final变量还可以用static初始代码块赋值,但不能用普通代码块进行赋值 - final local variable(方法中的final变量) - 因为变量在方法里,所以没有构造函数也不存在初始化代码块。所以final local variabel不规定赋值时机,只要求在使用前必须赋值,这和方法中的非final变量的要求是一样的。 final修饰方法 - 构造方法不允许final修饰 - 不可被重写,也就是不能被override,即便子类有同样名字的方法,那也不是override,这个和static方法是一个道理 final修饰类 - 不可被继承 final注意点 - final修饰对象的时候,只是对象引用不可变,而对象本身的属性是可以变化的 - final使用原则:良好的编程习惯 #### 5.4 不变性和final的关系 ​ 不变性并不意味着,简单的利用final修饰就不可变。对于基本数据类型,确实final修饰后就具有不变性,但是对于对象类型,需要该对象保证自身被创建后,状态永远不会变才可以。 ​ 那么如何利用final实现对象不可变? - 对象创建后,其状态就不能修改 - 所有属性都是final修饰 - 对象创建过程中没有发生逸出 ```java package immutable; import java.util.HashSet; import java.util.Set; public class ImmutableDemo { // private final修饰 private final Set students = new HashSet<>(); public ImmutableDemo() { students.add("李小妹"); students.add("李中妹"); students.add("李大妹"); } // 方法中不提供修改类中的变量 public boolean isStudent(String name) { return students.contains(name); } } ``` #### 5.5 栈封闭 ​ 在方法新建的局部变量,实际上存储在每个线程私有的栈空间,而每个栈的栈空间是不能被其他线程所访问的,所以不会有线程安全问题。这就是著名的“栈封闭”技术,是“线程封闭”技术的一种情况。 ```java package immutable; /** * 栈封闭的两种情况,基本变量和对象 * 先演示线程争抢带来的错误结果,然后吧变量放到方法内,情况就变了 */ public class StackConfinement implements Runnable{ int index = 0; public void inThread() { int neverGoOut = 0; for (int i = 0; i < 10000; i++) { neverGoOut++; } System.out.println("栈内保护的数据是线程安全的:" + neverGoOut); } @Override public void run() { for (int i = 0; i < 10000; i++) { index++; } inThread(); } public static void main(String[] args) throws InterruptedException { StackConfinement stackConfinement = new StackConfinement(); Thread thread1 = new Thread(stackConfinement); Thread thread2 = new Thread(stackConfinement); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(stackConfinement.index); } } ``` ```java package immutable; public class Mst { public static void main(String[] args) { String a = "abc2"; final String b = "abc"; String d = "abc"; String c = b + 2; // b变量会被认为是常量,在编译时就完成字符串拼接 String e = d + 2; System.out.println(a == c); // true System.out.println(a == e); // false } } // 用IDEA打开Mst.class文件。var1 表示 a,var3 表示 d,var4 表示 c,var5 表示 e //public class Mst { // public Mst() { // } // // public static void main(String[] var0) { // String var1 = "abc2"; // String var3 = "abc"; // String var4 = "abc2"; // String var5 = var3 + 2; // System.out.println(var1 == var4); // System.out.println(var1 == var5); // } //} ``` ### 6 并发容器 #### 6.1 集合类的演进过程 Vector和Hashtable Vector是ArrayList的线程安全版本,而HashTable是HashMap的线程安全版本。看源码我们会发现所有的方法都加上了Synchronized关键字修饰。他们两的性能会比较差。 ArrayList和HashMap 虽然这两个类不是线程安全的,但是可以用Collections.synchronizedList(new ArrayList()) 和 Collections.synchronizedMap(new HashMap()) 使之变成线程安全的。但是看源码发现对元素的操作还是加上了synchronized同步代码块。本质上跟Vector和HashTable并没有多大区别。 ConcurrentHashMap和CopyOnWriteArrayList 用于取代同步的HashMap和同步的ArrayList,绝大多数并发情况下,ConcurrentHashMap和CopyOnWriteArrayList的性能都更好。 #### 6.2 ConcurrentHashMap ##### 6.2.1 Map简介 - HashMap - Hashtable - LinkedHashMap - TreeMap ![各map之间的关系](.muke-learn_images/76bfe5fe.png) HashMap是线程不安全的 - 同时put**碰撞**导致数据丢失(hash碰撞,可能导致多个线程的key相同,那么一些线程想设置的value可能会被其他线程覆盖) - 同时put**扩容**导致数据丢失(多个线程操作Map导致map需要扩容,只有其中的一次扩容会被保存下来) - 死循环造成(死循环)CPU100% (仅在JDK7及以前存在 https://coolshell.cn/articles/9606.html) ##### 6.2.2 HashMap HashMap 主要用来存放键值对,它基于哈希表的 Map 接口实现,是常用的 Java 集合之一。 JDK1.8 之前 HashMap 由 数组+链表 组成的,数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的(“拉链法(链表法)”解决冲突)。 ![](.muke-learn_images/ef23ba18.png) JDK1.8 之后 HashMap 的组成多了红黑树,在满足下面两个条件之后,会执行链表转红黑树操作,以此来加快搜索速度。 - 链表长度大于阈值(默认为 8) - HashMap 数组长度超过 64 ![](.muke-learn_images/fc42ebd6.png) ##### 6.2.3 ConcurrentHashMap ​ 在jdk1.7中ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类似,仍然是数组和链表组成的拉链法。每个segment独立上ReentrantLock,每个segment之间互不影响,提高了并发效率。默认有16个segment,所以最多同时支持16个线程并发写(操作分别分布在不同的Segment上)。这个默认值可以在初始化的时候设置为其他值,但是一旦初始化后,是不可以扩容的。 ![](.muke-learn_images/213b8f06.png) ​ 在jdk1.8中ConcurrentHashMap的结构如下。不再是之前的 **Segment 数组 + HashEntry 数组 + 链表**,而是 **Node 数组 + 链表 / 红黑树**。当冲突链表达到一定长度时,链表会转换成红黑树。 ![](.muke-learn_images/a4feb903.png) ConcurrentHashMap升级说明 - 数据结构 hash碰撞处理由拉链法变为拉链法+红黑树,防止链表过长导致查询效率的降低。 - 保证并发安全 jdk1.7中ConcurrentHashMap采用segment的方式引入了分段锁默认的写并发为16,而jdk1.8没有沿用segment,而是通过CAS+synchronized的方式来保证线程安全,这样做提高了并发度。 - 为什么在链表长度超过8时转红黑树 红黑树占用的存储空间比链表大,所以刚开始hash冲突还是用拉链法处理。通常链表的长度超过8的概率低于万分之一,在这种极端情况下将链表转成红黑树以达到提高查询效率的作用。 ![](.muke-learn_images/380e706f.png) #### 6.3 CopyOnWriteArrayList ​ CopyOnWriteArrayList代替Vector和SynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因一样。Vector和SynchronizedList的锁粒度比较大,并发效率相对比较低,并且迭代时无法编辑。Copy-On-Write并发容器还包括CopyOnWriteArraySet,用来代替同步Set。 ​ 使用场景: - 读操作可以尽可能的块,而写即使慢一些也没多大关系 - 读多写少:黑名单,每日更新;监听器:迭代操作远多于修改操作 CopyOnWriteArrayList读写规则:读取时完全不用加锁的,并且更厉害的是,写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。 ```java package concurrent.collectors.copyonwrite; import java.util.ArrayList; import java.util.Iterator; import java.util.concurrent.CopyOnWriteArrayList; /** * 演示CopyOnWriteArrayList可以在迭代过程中修改数组类型,但是ArrayList不行 */ public class CopyOnWriteArrayListDemo1 { public static void main(String[] args) { // ArrayList arrayList = new ArrayList<>(); CopyOnWriteArrayList arrayList = new CopyOnWriteArrayList<>(); arrayList.add("1"); arrayList.add("2"); arrayList.add("3"); arrayList.add("4"); arrayList.add("5"); Iterator iterator = arrayList.iterator(); while (iterator.hasNext()) { System.out.println("list is " + arrayList); String next = iterator.next(); System.out.println(next); if ("2".equals(next)) { arrayList.remove("5"); } if (next.equals("3")) { arrayList.add("333"); } // 如果是ArrayList 会抛出java.util.ConcurrentModificationException,不能在迭代的时间里进行数据操作 } } } // 打印结果 //list is [1, 2, 3, 4, 5] //1 //list is [1, 2, 3, 4, 5] //2 //list is [1, 2, 3, 4] //3 //list is [1, 2, 3, 4, 333] //4 //list is [1, 2, 3, 4, 333] //5 ``` ​ 上面的打印结果说明迭代时的数据并不会受到迭代过程中修改的影响,仍然是原先的值。 ​ CopyOnWrite的含义是拷贝一个副本用于写入操作,再通过悲观锁或者乐观锁的方式写回。这种读写分离的方式也就解释了上面的数据过期的问题。 ​ CopyOnWriteArrayList的缺点: - 数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。 - 内存占用问题:因为CopyOnWrite的写是复制机制,所以进行写操作的时候,内存里会同时驻扎两个对象的内存。 源码分析 ​ 和ArrayList一样,CopyOnWriteArrayList的核心数据结构也是一个数组,代码如下: ```java public class CopyOnWriteArrayList implements List, RandomAccess, Cloneable, java.io.Serializable { // ... private volatile transient Object[] array; } ``` ​ 下面是类中几个读取的方法: ```java private E get(Object[] a, int index) { return (E) a[index]; } public E get(int index) { return get(getArray(), index); } public boolean isEmpty() { return size() == 0; } final Object[] getArray() { return array; } ``` ​ 并没有加锁操作,那它又是怎样保证线程安全的呢? ```java public class CopyOnWriteArrayList implements List, RandomAccess, Cloneable, java.io.Serializable { // ... private volatile transient Object[] array; public boolean add(E e) { // 加锁 final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; // 拷贝数组一个新数组 Object[] newElements = Arrays.copyOf(elements, len + 1); // 在新数组尾部加入元素 newElements[len] = e; // 将修改后的数组覆盖原数组 setArray(newElements); return true; } finally { lock.unlock(); } } final void setArray(Object[] a) { array = a; } } ``` ​ 写的操作是拷贝一个副本然后在副本上进行数据操作,操作完成后再将操作后的数据赋值给原数组。这样做使得其他线程在不加锁的情况读取时并不需要考虑计算是否完成,未完成则拿到的是原来未被修改的组数,完成了则是获取新的数组。 ​ CopyOnWriteArraySet原理跟CopyOnWriteArrayList一样。只是保证了数组中数据不重复。 #### 6.4 并发队列 ​ 用队列可以在线程间传递数据:生产者消费者模式、银行转账。考虑锁等线程安全问题的重任从“你”转移到了“队列”上。 ​ 各并发队列关系图: ![](.muke-learn_images/e70fe61c.png) ​ 在所有的并发容器中,BlockingQueue是最常见的一种。BlockingQueue是一个带阻塞功能的队 列,当入队列时,若队列已满,则阻塞调用者;当出队列时,若队列为空,则阻塞调用者。 ​ ​ 在Concurrent包中,BlockingQueue是一个接口,有许多个不同的实现类 | 常见的阻塞队列 | 特点 | | --------------------- | ------------------------------------------------------------ | | ArrayBlockingQueue | 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则 对元素进行排序。 | | LinkedBlockingQueue | 链表实现的有界阻塞队列, 此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行 排序 | | PriorityBlockingQueue | 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序 升序排列。也可以自定义类实现 compareTo()方法来指定元素 排序规则,或者初始化 PriorityBlockingQueue 时,指定构造 参数 Comparator 来对元素进行排序。 | | DelayQueue | 优先级队列实现的无界阻塞队列 | | SynchronousQueue | 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操 作,否则不能继续添加元素。 | | LinkedTransferQueue | 链表实现的无界阻塞队列 | | LinkedBlockingDeque | 链表实现的双向阻塞队列 | ​ 阻塞队列中为插入、移除元素各提供四种处理方法。 1. 插入操作 - add(e):添加元素到队列中,如果队列满了,继续插入元素会报错,IllegalStateException。 - offer(e):添加元素到队列,同时会返回元素是否插入成功的状态,如果成功则返回true。 - put(e):当阻塞队列满了以后,生产者继续通过put添加元素,队列会一直阻塞生产线程,直到队列可用。 - offer(e,time,unit):当阻塞队列满了以后继续添加元素,生产者线程会被阻塞指定时间,如果超时,则线程直接退出。 2. 移除操作 - remove():当队列为空时,调用remove会返回false,如果元素移除成功,则返回true。 - poll():当队列中存在元素,则从队列中取出一个元素,如果队列为空,则直接返回null。 - take():基于阻塞的方法获取队列中的元素,如果队列为空,则take方法会一直阻塞,直到队列中有新的数据可以消费 - poll(time,unit)带超时机制的获取数据,如果队列为空,则会等待指定的时间再去获取元素返回。 ##### 6.4.1 ArrayBlockingQueue ###### 6.4.1.1 基本用法 ```java package queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class ArrayBlockingQueueDemo { public static void main(String[] args) { ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3); Interviewer interviewer = new Interviewer(queue); Consumer consumer = new Consumer(queue); new Thread(interviewer).start(); new Thread(consumer).start(); } } class Interviewer implements Runnable { BlockingQueue queue; public Interviewer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { for (int i = 0; i < 10; i++) { String candidate = "candidate" + i; try { queue.put(candidate); } catch (InterruptedException e) { e.printStackTrace(); } } try { queue.put("end"); } catch (InterruptedException e) { e.printStackTrace(); } } } class Consumer implements Runnable { BlockingQueue queue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { TimeUnit.SECONDS.sleep(1L); } catch (InterruptedException e) { e.printStackTrace(); } try { String msg; while (!"end".equals(msg = queue.take())) { System.out.println(msg + "面试完毕"); } System.out.println("all面试完毕!"); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` ###### 6.4.1.2 原理分析 核心数据结构 ```java public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { //... final Object[] items; // 队头指针 int takeIndex; // 队尾指针 int putIndex; // 元素个数 int count; // 核心为1个锁外加两个条件 final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; //... } ``` 构造方法 ```java // capacity:表示数组的长度,也就是队列的长度 public ArrayBlockingQueue(int capacity) { this(capacity, false); } // fair:表示是否为公平阻塞队列,默认是非公平的阻塞队列 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; // 重入锁,出队入队持有这把锁 lock = new ReentrantLock(fair); // 初始化非空等待队列 notEmpty = lock.newCondition(); // 初始化非满等待队列 notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } ``` add方法 ```java public boolean add(E e) { // 实际上调用的offer方法,但是在添加元素失败后会抛出IllegalStateException if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } ``` offer方法 ```java public boolean offer(E e) { checkNotNull(e); // 添加重入锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 队列满了直接返回false if (count == items.length) return false; else { // 调用enqueue将元素添加到队列中 enqueue(e); return true; } } finally { lock.unlock(); } } ``` enqueue方法 ```java private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 通过putIndex对数据赋值 items[putIndex] = x; // 当putIndex等于数组长度时,将putIndex重置为0(ArrayBlockingQueue是FIFO的队列) if (++putIndex == items.length) putIndex = 0; // 记录队列元素的个数 count++; // 唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素 notEmpty.signal(); } ``` put和take方法 ```java public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // 获取锁,但这跟lock的区别在于这个方法允许在等待时有其他线程调用等待线程的interrupt方法来中断等待直接返回。而lock方法则是尝试获得锁成功后才响应中断。 lock.lockInterruptibly(); try { while (count == items.length) // 队列满了的情况下,当前线程将会被notFull条件对象挂起加到等待队列中 notFull.await(); enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 如果队列为空则通过await方法阻塞。一旦有元素加入enqueue的notEmpty.signal()将唤醒take线程获取元素。 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } ``` dequeue方法 ```java private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // 将takeIndex位置的元素置空 items[takeIndex] = null; // 拿了数组最末端的元素后重新从数组头部开始获取元素 if (++takeIndex == items.length) takeIndex = 0; // 记录队列中元素个数 count--; if (itrs != null) // 更新迭代器中的元素数据 itrs.elementDequeued(); // 唤醒添加元素被阻塞的线程 notFull.signal(); return x; } ``` remove方法 ```java public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { // 获取下个要添加元素的索引 final int putIndex = this.putIndex; // 获取当前要被移除的元素的索引 int i = takeIndex; do { // 从takeIndex下标开始查找要被删除的元素 if (o.equals(items[i])) { // 移除指定元素 removeAt(i); return true; } // 如果遍历完,将i置为0 if (++i == items.length) i = 0; } while (i != putIndex); //继续查找,直到找到最后一个元素 } return false; } finally { lock.unlock(); } } ``` ##### 6.4.2 LinkedBlockingQueue ​ LinkedBlockingQueue是一种基于**单向链表**的阻塞队列。因为队头和队尾时2个指针分开操作的,所以用了2把锁和2个条件,同时有一个AtomicInteger的原子变量记录count数。 ```java public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { // ... private final int capacity; // 原子变量 private final AtomicInteger count = new AtomicInteger(0); // 单向链表的头部 private transient Node head; // 单向链表的尾部 private transient Node last; // 两把锁,两个条件 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFUll = putLock.newCondition(); // ... } ``` ###### 6.4.2.1 原理分析 ```java /** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) // 如果队列还有剩余空间则通知其他put线程 notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) // 如果还有元素,则通知其他take线程 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } ``` ###### 6.4.2.2 LinkedBlockingQueue和ArrayBlockingQueue的差异 - 为了提高并发,用了2把锁,分别控制队头、队尾的操作。意味着在put和put之间、take和take之间是互斥的,put和take之间并不互斥。但对于count变量,双方都需要操作,所以必须是原子类型。 - 因为各自拿了一把锁,所以当需要调用对方的condition的signal时,还必须再加上对方的锁,就是signalNotEmpty和signalNotFull方法。 ```java /** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; // 获取到takeLock才可以调用notEmpty.signal() takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } ``` ```java /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; // 获取到putLock才可以调用notFull.signal() putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } ``` - 不仅put会通知take,take也会通知put。当put发现非满的时候,也会通知其他put线程;当take发现非空的时候,也会通知其他take线程。 ##### 6.4.3 PriorityBlockingQueue ​ 队列通常是FIFO,而PriorityBlockingQueue是按照元素的优先级从小到大出队列的。所以PriorityBlockingQueue可以接收Comparable接口的实现来对元素进行优先级排序。 ```java public class PriorityBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { //... // 用数组实现的二插小根堆 private transient Object[] queue; private transient int size; private transient Comparator comparator; // 1个锁+一个条件,没有非满条件 private final ReentrantLock lock; private final Condition notEmpty; //... } ``` ##### 6.4.4 DelayQueue ​ DelayQueue即延迟队列,也就是一个按延迟时间从小到大出队的PriorityQueue。所谓延迟时间就是“未来将要执行的时间” 减去 “当前时间”。为此,放入DelayQueue中的元素,必须实现Delayed接口。 ##### 6.4.5 SynchronousQueue ​ SynchronousQueue是一种特殊的BlockingQueue,它本身没有容量。先调用put(...)线程会阻塞,直到另外一个线程调用了take(),两个线程才都解锁。对于多个线程而言,例如3个线程调用3次put,3个线程都会阻塞,直到另外的线程调用3次take,6个线程才全部解锁。 ### 7 同步工具类 ​ 控制并发流程的工具类(同步工具类),作用就是帮助我们更容易的协调线程之间的合作,从而达到满足业务逻辑的要求。比如让线程A等待线程B执行完后再执行等合作策略。 ​ 常用同步工具类汇总: | 类 | 作用 | 说明 | | -------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | | Semaphore | 信号量,可以通过控制“许可证”的数量,来保证线程之间的配合 | 线程只有在拿到“许可证”后才能继续运行。相比于其他的同步器,更灵活 | | CyclicBarrier | 线程会等待,直到足够多线程达到了事先规定的数目。一旦达到触发条件,就可以进行下一步的动作 | 适合于线程之间相互等待处理结果就绪的场景 | | Phaser | 和CyclicBarrier类似,但是计数可变 | Java7加入的 | | CountDownLatch | 和CyclicBarrier类似,数量递减至0,触发动作 | 不可重复使用 | | Exchanger | 让两个线程在合适时交换对象 | 使用场景:当两个线程工作在同一个类的不同实例上时,用于交换数据 | | Condition | 可以控制线程的“等待”和“唤醒” | 是Object.wait()的升级版 | ​ #### 7.1 CountDownLatch倒计门闩 ​ 倒数结束前,一直处于等待状态。知道“倒计时”结束,此线程才继续工作。 关键方法介绍: - CountDownLatch(int count):仅有一个构造函数,参数count为需要倒数的数值 - await():调用await()方法的线程会被挂起,它会等待知道count值为0才继续执行 - countDown():将count值减1,直到减至0,等待线程会被唤醒。 应用场景一:异步完成工作后,进入后续的工作 ```java package flowcontrol.countdownlatch; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 工厂中,质检,5道工序完成后再继续后面的工作 */ public class CountDownLatchDemo1 { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(5); ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable runnable = new Runnable() { @Override public void run() { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("No." + no + " 检验完成!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } }; executorService.submit(runnable); } System.out.println("等待5道工序执行---"); latch.await(); System.out.println("所有工序都执行完毕"); } } ``` 场景二:所有的异步工作都等待开始信号 ```java package flowcontrol.countdownlatch; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 工厂中,质检比赛,比赛人员等待开始指令 */ public class CountDownLatchDemo2 { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch2 = new CountDownLatch(5); ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("No." + no + " 准备完毕!"); latch.await(); Thread.sleep((long) (Math.random() * 10000)); System.out.println("No." + no + " 完成比赛!"); latch2.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } }; executorService.submit(runnable); } // 2s 后比赛开始 Thread.sleep(2000); latch.countDown(); System.out.println("比赛开始---"); latch2.await(); System.out.println("比赛结束--"); executorService.shutdown(); } } ``` ​ CountDownLatch是不能够重用的,如果需要重新计数,可以考虑使用CyclicBarrier或者创建新的CountDownLatch实例。 #### 7.2 Semaphore信号量 ​ Semaphore可以用来**限制**或管理数量**有限的资源**的使用情况。 ```java // 一开始有5份共享资源。第二个参数表示是否是公平 Semaphore myResources = new Semaphore(5, true); // 工作线程每获取一份资源,就在该对象上记下来 // 在获取的时候是按照公平的方式还是非公平的方式,就要看上一行代码的第二个参数了。 // 一般非公平抢占效率较高。 myResources.acquire(); // 工作线程每归还一份资源,就在该对象上记下来 // 此时资源可以被其他线程使用 myResources.release(); /* 释放指定数目的许可,并将它们归还给信标。 可用许可数加上该指定数目。如果线程需要获取N个许可,在有N个许可可用之前,该线程阻塞。 如果线程获取了N个许可,还有可用的许可,则依次将这些许可赋予等待获取许可的其他线程。 */ semaphore.release(2); /* 从信标获取指定数目的许可。如果可用许可数目不够,则线程阻塞,直到被中断。 该方法效果与循环相同, for (int i = 0; i < permits; i++) acquire(); 只不过该方法是原子操作。 如果可用许可数不够,则当前线程阻塞,直到:(二选一) 1. 如果其他线程释放了许可,并且可用的许可数满足当前线程的请求数字; 2. 其他线程中断了当前线程。 permits – 要获取的许可数 */ semaphore.acquire(3); ``` 使用案例: ```java package flowcontrol.semaphore; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreDemo { static Semaphore semaphore = new Semaphore(3, true); public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(50); for (int i = 0; i < 100; i++) { service.submit(new Task()); } } static class Task implements Runnable { @Override public void run() { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "拿到了信号量"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "释放了信号量"); semaphore.release(); } } } ``` #### 7.3 Condition接口(又称条件对象) ​ Condition本身也是一个接口,其功能和wait/notify类似,如下所示: ```java public interface Condition { void await() throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; long awaitNanos(long nanosTimeout) throws InterruptedException; void awaitUninterruptibly(); boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); } ``` wait()/notify()必须和synchronized一起使用,Condition也必须和Lock一起使用。因此,在Lock的接口中,有一个与Condition相关的接口: ```java public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; // 所有的Condition都是从Lock中构造出来的 Condition newCondition(); boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); } ``` #### 7.4 CyclicBarrier循环栅栏 ​ CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程。当有大量线程相互配合,分别计算不同任务,并且需要最后统一汇总的时候,我们可以使用CyclicBarrier。CyclicBarrier可以等待,直到所有线程都到了集合点,那么该栅栏就被撤销,所有线程再统一出发,继续执行剩下的任务。 ```java package flowcontrol.cyclicbarrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(5, () -> { // 每5个线程到达栅栏,开始执行后续操作 System.out.println("5个线程都集结完毕,let's dosomething"); }); for (int i = 0; i < 10; i++) { new Thread(new Task(i, barrier)).start(); } } static class Task implements Runnable { private int id; private CyclicBarrier cyclicBarrier; public Task(int id, CyclicBarrier cyclicBarrier) { this.id = id; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程" + id + "前往集合地"); try { Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + id + "到达集合地,等待其他线程到达"); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } } ``` CyclicBarrier和CountDownLatch的区别 - 作用不同:CyclicBarrier要等固定数目的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需等待数字到0,也就是说,CountDownLatch用于事件,但是CyclicBarrier是用于线程的。 - 可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建新的实例;而CyclicBarrier可以重复使用。 #### 7.5 Exchanger ​ Exchanger用于线程之间交换数据,其使用代码很简单,是一个exchange(...)方法,使用示例如下: ```java package flowcontrol.exchanger; import java.util.Random; import java.util.concurrent.Exchanger; public class ExchangerDemo { public static void main(String[] args) { Exchanger exchanger = new Exchanger<>(); new Thread(()->{ while (true) { try { String data = exchanger.exchange("thread1 data"); System.out.println(Thread.currentThread().getName() + "get " + data); Thread.sleep((long)(Math.random()*10000)); } catch (InterruptedException e) { e.printStackTrace(); } } }, "thread1").start(); new Thread(()->{ while (true) { try { String data = exchanger.exchange("thread2 data"); System.out.println(Thread.currentThread().getName() + "get " + data); Thread.sleep((long)(Math.random()*10000)); } catch (InterruptedException e) { e.printStackTrace(); } } }, "thread2").start(); new Thread(()->{ while (true) { try { String data = exchanger.exchange("thread3 data"); System.out.println(Thread.currentThread().getName() + "get " + data); Thread.sleep((long)(Math.random()*10000)); } catch (InterruptedException e) { e.printStackTrace(); } } }, "thread3").start(); } } // thread2get thread1 data // thread1get thread2 data // thread2get thread3 data // thread3get thread2 data // thread3get thread2 data // thread2get thread3 data // thread2get thread1 data // thread1get thread2 data // thread1get thread3 data // thread3get thread1 data ``` ​ 两两线程都可能进行数据交换。 #### 7.6 Phaser ​ 从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。 ##### 7.6.1 用Phaser替代CountDownLatch ```java package flowcontrol.phaser; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; /** * 工厂中,质检,5道工序完成后再继续后面的工作 */ public class PhaserDemo1 { public static void main(String[] args) { Phaser phaser = new Phaser(5); ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable runnable = () -> { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("No." + no + " 检验完成!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { phaser.arrive(); } }; executorService.submit(runnable); } System.out.println("等待5道工序执行---"); phaser.awaitAdvance(phaser.getPhase()); System.out.println("所有工序都执行完毕"); } } ``` ##### 7.6.2 用Phaser替代CyclicBarrier ```java package flowcontrol.phaser; import java.util.concurrent.Phaser; public class PhaserDemo2 { public static void main(String[] args) { Phaser phaser = new Phaser(5); for (int i = 0; i < 5; i++) { new Thread(new Task(i, phaser)).start(); } } static class Task implements Runnable { private int id; private Phaser phaser; public Task(int id, Phaser phaser) { this.id = id; this.phaser = phaser; } @Override public void run() { System.out.println("线程" + id + "前往集合地"); try { Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + id + "到达集合地,等待其他线程到达"); // 到达同步点等待其他线程 phaser.arriveAndAwaitAdvance(); Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + id + "到达集合地,开始dosomething1"); phaser.arriveAndAwaitAdvance(); Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + id + "到达集合地,开始dosomething2"); } catch (InterruptedException e) { e.printStackTrace(); } } } } ``` ##### 7.6.3 Phaser新特性 动态调整线程个数 ​ CyclicBarrier 所要同步的线程个数是在构造方法中指定的,之后不能更改,而 Phaser 可以在运行期间动态地调整要同步的线程个数。Phaser 提供了下面这些方法来增加、减少所要同步的线程个数。 - register() // 注册一个 - bulkRegister(int parties) // 注册多个 - arriveAndDeregister() // 解除注册 ```java package flowcontrol.phaser; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; /** * 工厂中,质检,5道工序完成后再继续后面的工作 */ public class PhaserDemo3 { public static void main(String[] args) { Phaser phaser = new Phaser(5); ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable runnable = () -> { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("No." + no + " 检验完成!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { phaser.arrive(); } }; executorService.submit(runnable); } new Thread(() -> { try { // 再注册一个 phaser.register(); Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " 检验完成"); } catch (InterruptedException e) { e.printStackTrace(); } finally { phaser.arrive(); } }).start(); System.out.println("等待6道工序执行---"); phaser.awaitAdvance(phaser.getPhase()); System.out.println("所有工序都执行完毕"); } } ``` ### 8 AQS(AbstractQueuedSynchronizer) ​ 锁和协作类由共同点:闸门。事实上,不仅是ReentranLock和Semaphore,包括CountDownLatch、ReetrantReadWriteLock都有这样类似的“协作”(或者叫“同步”)功能,其实它们底层都用了一个共同的基类,这就是AQS。以上的协作类,它们有很多工作都是类似的,所以如果能提取出一个工具类,那么就可以直接用,对于ReentrantLock和Semaphore而言就可以屏蔽很多细节,只关注它们自己的“业务逻辑”就可以了。 AQS最核心的3大部分: - state - 控制线程**抢锁和配合**的FIFO队列 - 期望协作工具类去实现的获取/释放等重要方法 state ``` /** * The synchronization state. */ private volatile int state; ``` ​ 这里的state的具体含义,会根据具体实现类的不同而不同,比如在Semaphore里,它表示“剩余的许可证的数量”,而在CountDownLatch里,它表示“还需要倒数的数量”。在ReetrantLock中state表示“锁”的占用情况,包括可重入计数,当state的值为0时,表示该Lock不被任何线程所占有。 FIFO队列 ​ 这个队列用来存放“等待的线程”,AQS就是“排队管理器”,当多个线程征用同一把锁匙,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程占有这个刚刚释放的锁。AQS会维护一个等待的线程队列,把线程都放在这个队列里。 ![](.muke-learn_images/e719eeff.png) AQS的应用 AQS用法 1. 写一个类,想好协作的逻辑,实现获取/释放方法。 2. 内部写一个Sync类继承AbstractQueuedSynchronizer。 3. 根据是否独占来重写tryAcquire/tryRelease或tryAcquireShared(int acquires)和tryReleaseShared(int release)等方法,在之前写的获取/释放方法中调用AQS的acquire/release或者Shared方法。 CountDownLatch源码分析 CountDownLatch的构造方法 ```java /** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } ``` getCount方法 ```java /** * Returns the current count. * *

This method is typically used for debugging and testing purposes. * * @return the current count */ public long getCount() { return sync.getCount(); } ``` ```java /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } // 当geState==0时返回正数,反之则返回负数 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // state--操作 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 自旋+cas,state减至0返回true for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } ``` await 方法解析 ```java public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 注意:该方法在AQS中实现 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // tryAcquireShared方法是CountDownLatch.Sync内部类重写AQS的方法 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 注意:该方法在AQS中实现 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 构建一个节点放入队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } ``` countDown 方法解析 ```java public void countDown() { sync.releaseShared(1); } // 注意:该方法在AQS中实现 public final boolean releaseShared(int arg) { // state--操作,如果state为0,则执行doReleaseShared if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // 注意:该方法在AQS中实现 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } ``` ### 9 Future获取子线程执行结果 #### 9.1 Future和Callable Runnable的缺陷 - 不能返回一个返回值 - 不能抛出checked Exception Callable接口 ​ 类似于Runnable,被其它线程执行的任务。实现call方法。有返回值。 Future类的作用:Future相当于一个存储器,它存储了call()这个任务的结果而这个任务的执行时间是无法提前确定的,因为这完全取决于call()方法执行的情况。 Callable和Future的关系:我们可以通过Future.get来获取Callable接口返回的执行结果,还可以通过Future.isDone()来判断任务是否已经执行完了,以及取消这个任务,限时获取任务的结果等。在call()未执行完毕之前,调用get()的线程(假定此时是主线程)会被阻塞,直到call()方法返回了结果后,此时future.get()才会得到该结果,然后主线程才会切换到runnable状态。 Future类的重要方法: - get()方法的行为取决于Callable任务的状态,只有以下5种情况: 1. 任务正常完成:get方法会立即返回结果 2. 任务尚未完成(任务还没开始货进行中):get将阻塞并直到任务完成。 3. 任务执行过程中抛出Exception:get方法会抛出ExecutionException:这里的抛出异常,是call()执行时产生的那个异常,看到这个异常类型是`java.util.concurrent.ExecutionException`。不论call()执行时抛出的异常类型是什么,最后get方法抛出的异常类型都是ExecutionException。 4. 任务被取消:get方法会抛出CancellationException 5. 任务超时:get方法有个重载方法,是传入一个延迟时间的,如果时间到了还没有获取到结果,get(long timeout, TimeUnit unit)方法就会抛出TimeoutException。(但是任务其实还是在执行的,需要手动取消。cancel方法用于取消任务的执行) - cancel()方法:用于取消任务的执行 - isDone()方法:判断线程是否执行完毕(不代表任务成功与否) - isCancelled()方法:判断任务是否被取消 #### 9.2 线程池获取Future ```java public class OneFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> { TimeUnit.SECONDS.sleep(3); return new Random().nextInt(); }); Integer integer = future.get(); System.out.println(integer); executorService.shutdown(); } } ``` ```java public class MultiFutures { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); Callable callable = () -> { TimeUnit.SECONDS.sleep(3); return new Random().nextInt(); }; ArrayList futures = new ArrayList<>(); for (int i = 0; i < 20; i++) { futures.add(executorService.submit(callable)); } for (int i = 0; i < 20; i++) { System.out.println(futures.get(i).get()); } executorService.shutdown(); } } ``` ##### 9.2.1 Future.get方法超时处理 ```java package future; import java.util.Random; import java.util.concurrent.*; /** * get的超时方法 */ public class Timeout { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); Future future = executorService.submit(() -> { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { // 执行cancel(true) 时,这里会接受到中断信号,下边的语句将被打印 System.out.println("执行被中断"); // 注意:被中断的值不会被用作最终的结果 return 1; } return new Random().nextInt(); }); Integer integer = null; try { integer = future.get(2, TimeUnit.SECONDS); } catch (InterruptedException e) { System.out.println("----------InterruptedException---------"); } catch (ExecutionException e) { System.out.println("----------ExecutionException---------"); } catch (TimeoutException e) { System.out.println("----------TimeoutException---------"); integer = 0; System.out.println("init new integer for result"); // 取消任务的执行。避免不必要的等待时间 boolean cancel = future.cancel(true); System.out.println("cancel结果:" + cancel); } executorService.shutdown(); System.out.println("result:" + integer); } } ``` ##### 9.2.2 Future.cancel方法使用原则 cancel方法执行可能遇到的情况 1. 如果这个任务还没有开始执行,那么任务会被正常的取消,未来也不会被执行,方法返回true。 2. 如果任务已完成,或者已取消。那么cancel方法会执行失败,方法返回false。 3. 如果这个任务已经开始执行了,那么这个取消方法将不会直接取消该任务,而是会根据我们填的参数mayInterruptIfRunning做判断。 Future.cancel(true)适用于: 1. 任务能处理interrupt Future.cancel(false)仅用于避免启动尚未启动的任务,适用于: 1. 未能处理interrupt的任务 2. 不清楚任务是否支持取消 3. 需要等待已经开始的任务执行完成 #### 9.3 FutureTask来创建Future ​ 用FutureTask来获取Future和任务的结果。FutureTask是一种包装器,可以把Callable转化成Future和Runnable,它同时实现两者的接口。把Callable实例当作参数,生成FutureTask的对象,然后把这个对象当作一个Runnable对象,用线程池或另起线程去执行这个Runnable对象,最后通过FutureTask获取刚才执行的结果。 ![](.muke-learn_images/f011b532.png) ```java package future; import java.util.concurrent.*; public class FutureTaskDemo { public static void main(String[] args) { Task task = new Task(); FutureTask futureTask = new FutureTask<>(task); // new Thread(futureTask).start(); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(futureTask); try { System.out.println("futureTask:" + futureTask.get().intValue()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } class Task implements Callable { @Override public Integer call() throws Exception { System.out.println("子线程正在计算"); Thread.sleep(3000); int sum = 0; for (int i = 0; i < 100; i++) { sum += i; } return sum; } } ```