线程通信
- 通过内存共享实现通信。一个进程包含多个线程,线程之间共享同一块内存
- 通过wait/notify机制实现通信: 基于某个条件来等待或者唤醒
- 依赖于Monitor对象,所以必须配合Synchronized关键字使用
- 依赖于Monitor对象,所以必须配合Synchronized关键字使用
- 通过join实现通信。join也是基于wait/notify来实现,wait的调用实在java代码中,notify调用是在线程销毁之后通过C语言调用的的。
- 通过Condition实现通信
- Condition 是
JUC(java.util.concurrent)
包下用来实现线程通信的机制,等价于JUC
包下的wait/notify。由于wait/notify依赖于Synchronized关键字,而JUC
包下使用锁是Lock
,所以JUC
包使用Condition通信。 - 原理跟wait/notify相似,使用的方法是
await
和signal
。线程A和线程B基于同一把锁实现Condition通信步骤- 线程A抢占到了锁,
AQS
的state=1
,独占锁的线程是A线程。B线程位于AQS
等待队列中 - **线程A调用Condition的 **
await()
方法后,将自己加入到Condition中的等待队列(单向链表),完整释放自己持有的锁(重入锁情况),使自己处于阻塞状态。 - **线程A释放锁之后,等待队列中的线程B抢占到锁。线程B调用 **
signal()
方法,唤醒Condition等待队列中的线程A。把等待队列中被唤醒的线程A转移到AQS
队列中 - 线程B执行完毕释放锁之后,线程A从
AQS
队列中抢占到锁,然后继续执行线程A的逻辑
- 线程A抢占到了锁,
- Condition 是
JUC
工具包
阻塞队列
- **队列是一种只允许在一端进行删除操作,在另一端进行插入操作的线性表,允许插入的一端称为队尾、 允许删除的一端称为队头。 那么阻塞队列,实际上是在队列的基础上增加了两个操作。 **
- 支持阻塞插入:队列满了的情况下,会阻塞继续往队列中添加数据的线程,直到队列元素被释放。 当阻塞队列满了之后的不同的处理策略,针对不同策略提供了不同的通用方法
- **add -> 如果队列满了,抛出异常 **
- **offer -> true/false , 添加成功返回true,否则返回false **
- **put -> 如果队列满了,则一直阻塞 **
- offer(timeout) , 带了一个超时时间。如果添加一个元素,队列满了,此时会阻塞timeout 时长,超过阻塞时长,返回false。
- 支持阻塞移除:队列为空的情况下,会阻塞从队列中获取元素的线程,直到队列添加了新的元素。当队列空了还要移除元素的时候,也提供了不同的策略
- **element-> 队列为空,抛异常 **
- **peek -> true/false , 移除成功返回true,否则返回false **
- **take -> 一直阻塞 **
- poll(timeout) -> 如果超时了,还没有元素,则返回null
- 支持阻塞插入:队列满了的情况下,会阻塞继续往队列中添加数据的线程,直到队列元素被释放。 当阻塞队列满了之后的不同的处理策略,针对不同策略提供了不同的通用方法
ArrayBlockingQueue
:存储元素的结构为Object数组LinkedBlockingQueue
:存储元素的结构为链表LinkedBlockingDeque
:存储元素的结构双向链表,支持双向插入和移除。 在一定程度上能够解决多线程的竞争问题。PriorityBlcokingQueue
:存储元素的结构为Object数组,可以传入比较器,以此来对元素进行排序DelayQueue
:延时队列,队列存储的元素必须为Delay接口
的实现类 ,会根据实现类的具体实现,决定多长时间之后执行任务SynchronousQueue
:没有任何存储结构的的队列,通过消息传递阻塞生产者和消费者。如果没有生产者,那么阻塞所有消费者,如果没有消费者那么阻塞所有生产者
CountDownLatch
- **实际应用 :在启动应用的时候,去对第三方的应用做健康检测 **
- 实现原理 :
- 使用
new CountDownLatch(3)
创建时,会将AQS
中的state
设为3。 - 当线程调用
await()
方法时,判断AQS
中的state
是否为0,不为0会将当前线程放到AQS队列
中,并进行阻塞。 - 多个线程调用时,都会在
AQS队列
中阻塞。 - 当线程调用
countDown()
时,会对AQS
中的state
进行减一操作,如果state
值变为0。就会唤醒所有AQS队列
中的阻塞线程 - 该实现原理可以允许多个线程同时抢占到锁,阻塞等待,然后等到计数器归零的时候,同时唤醒属于是共享锁的实现
- 使用
Semaphore
-
实际应用:可以作为信号灯、 限流器、限制资源的访问.。
-
实现原理:
- 使用
Semaphore semaphore = new Semaphore(20);
创建时会将AQS
中的state
设为20。 - 当线程调用
acquire(x)
方法时,操作AQS
中的state
减x,如果减x之后state<0
,则将当前线程加入到AQS队列
中,并进行阻塞。 - 当线程调用
release(x)
方法时,操作AQS
中的state
加x,并且唤醒所有AQS队列
中的阻塞线程。被唤醒的线程会继续尝试获取令牌,即操作AQS
中的state
减x,此x是阻塞之前线程调用acquire(x)
命令的x。- 如果减x之后
state<0
,那么继续阻塞 - 如果
state=0
,那么就继续执行
- 如果减x之后
- 本质上通过令牌抢占,初始化的时候约定有多少令牌,抢占到令牌就允许执行,执行完之后归还令牌。没抢到就阻塞。
- 使用
CyclicBarrier
- 代码示例:
public class CyclicBarrierExample { public static void main(String[] args) { CyclicBarrier cb = new CyclicBarrier(4, () -> { System.out.println("4个都执行完毕"); }); for (int i = 0; i < 4; i++) { int finalI = i; new Thread(() -> { try { for (int j = 0; j < finalI; j++) { Thread.sleep(1000); } cb.await(); System.out.println(Thread.currentThread().getName() + "线程执行完毕"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }, i + "").start(); } } }
ConcurrentHashMap
- 采用数组+链表的存储结构
- 数组默认长度为16,
- 链表在数组长度64,链表长度为8的时候转为红黑树
- 初始化代码
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
- 扩容的时候会使用多线程锁定数组的某一段进行扩容
Fork/Join
- Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
- 原理
- 创建一个
ForkJoinTask
,ForkJoinTask
是一个抽象类,封装了一个双端阻塞的工作队列,执行ForkJoinTask.fork()
方法时会将这个任务加入到这个工作队列 - 将
ForkJoinTask
交给ForkJoinPool
去执行。ForkJoinPool
本质是一个线程池的实现,专门用来执行ForkJoinTask
。在线程去执行任务时,会有个工作窃取的思想,当前线程执行完成之后,会从其他的任 - 将
ForkJoinTask
交给ForkJoinPool
去执行后,会返回一个ForkJoinTask
,调用这个get()
方法,完成结果获取
- 创建一个
- 代码示例
package com.gupaoedu.concurrent; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; /** * 咕泡学院,只为更好的你 * 咕泡学院-Mic: 2082233439 * http://www.gupaoedu.com **/ public class ForkJoinExample { //java8 parallStream //针对一个数字,做计算。 private static final Integer MAX=200; static class CalcForJoinTask extends RecursiveTask<Integer> { private Integer startValue; //子任务的开始计算的值 private Integer endValue; //子任务结束计算的值 public CalcForJoinTask(Integer startValue, Integer endValue) { this.startValue = startValue; this.endValue = endValue; } @Override protected Integer compute() { //如果当前的数据区间已经小于MAX了,那么接下来的计算不需要做拆分 if(endValue-startValue<MAX){ System.out.println("开始计算:startValue:"+startValue+" ; endValue:"+endValue); Integer totalValue=0; for(int i=this.startValue;i<=this.endValue;i++){ totalValue+=i; } return totalValue; } CalcForJoinTask subTask=new CalcForJoinTask(startValue,(startValue+endValue)/2); subTask.fork(); CalcForJoinTask calcForJoinTask=new CalcForJoinTask((startValue+endValue)/2+1,endValue); calcForJoinTask.fork(); return subTask.join()+calcForJoinTask.join(); } } public static void main(String[] args) { CalcForJoinTask calcForJoinTask=new CalcForJoinTask(1,10000); ForkJoinPool pool=new ForkJoinPool(); ForkJoinTask<Integer> taskFuture=pool.submit(calcForJoinTask); try { Integer result=taskFuture.get(); System.out.println("result:"+result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
代码流程
- 为了更清晰的了解fork/join的原理,我们通过一个图形来理解。整体思想其实就是拆分与合并
- 图中最顶层的任务使用submit方式被提交到Fork/Join框架中,
- Fork/Join把这个任务放入到某个线程 中运行,工作任务中的compute方法的代码开始对这个任务T1进行分析。
- 如果当前任务需要累加的数 字范围过大(代码中设定的是大于200),则将这个计算任务拆分成两个子任务(T1.1和T1.2),每个 子任务各自负责计算一半的数据累加,请参见代码中的fork方法。
- 如果当前子任务中需要累加的数字范 围足够小(小于等于200),就进行累加然后返回到上层任务中。
- 采用了工作窃取的思想
FutureTask/Callable
- 提供了一个带返回值的线程封装
- 实现原理
- 本质上还是对于线程的封装和使用,在
FuturTask
中的线程没有执行完成时调用get()
方法,会将调用线程放入到一个等待队列中并使用LockSupport.park()
进行阻塞,等到线程执行完成返回结果时,将等待队列中阻塞的线程进行唤醒。
- 本质上还是对于线程的封装和使用,在
CompletableFuture
CompletableFuture
属于对于Future接口的增强,添加了许多增强的方法,支持异步回调。- 代码结构
- 通过Future同步等待执行结果
- 通过
CompletionStage
,增强异步回调的功能。
- 构建方法
CompletableFuture.supplyAsync(runnable)
:异步执行一个任务,提供返回值CompletableFuture.supplyAsync(runnable,Executor executor)
:异步执行一个任务,可以自定义线程池,提供返回值CompletableFuture.runAsync(runnable,Executor executor)
:异步执行一个任务,可以自定义线程池,没有返回值CompletableFuture.runAsync(runnable)
: 异步执行一个任务,没有返回值new CompletableFuture()
- 实现自
CompletionStage
的方法- **纯消费类型的方法 **
- 纯消费类型的方法,指依赖上一个异步任务的结果作为当前函数的参数进行下一步计算,它的特点 是不返回新的计算值
- 在
CompletionStage
中包含9个方法,又可以分为三类:依赖单个CompletionStage
任务完成,依赖两 个CompletionStage
任务都完成,依赖两个CompletionStage
中的任何一个完成。 -
//当前线程同步执行 public CompletionStage<Void> thenAccept(Consumer<? super T> action); //使用ForkJoinPool.commonPool线程池执行action public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); //使用自定义线程池执行action public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor); public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,Executor executor); public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
- 有返回值类型的方法
- 有返回值类型的方法,就是用上一个异步任务的执行结果进行下一步计算,并且会产生一个新的有 返回值的
CompletionStage
对象。 - 在
CompletionStage
中,定义了9个带有返回结果的方法,同样也可以分为三个类型:依赖单个CompletionStage
任务完成,依赖两个CompletionStage
任务都完成,依赖两个CompletionStage
中的任何一个完成。 -
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor); public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor); public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
- 有返回值类型的方法,就是用上一个异步任务的执行结果进行下一步计算,并且会产生一个新的有 返回值的
- 不消费也不返回的方法
-
public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor); public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor); public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
-
- 多任务组合
-
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor);
-
- **纯消费类型的方法 **
- 异常处理方法
whenComplete()
:表示当任务执行完成后,会触发的方法。-
public static void main(String[] args) { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("异常"); }).whenComplete((r, th) -> { System.out.println(th); }); }
-
handle()
:表示前置任务执行完成后,不管前置任务执行状态是正常还是异常,都会执行handle中的函数,它和whenComplete
的作用几乎一致,不同点在于,handle是一个有返回值类型的方 法。-
public static void main(String[] args) { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("异常"); }).handle((r, th) -> { System.out.println(th); return th; }); }
-
exceptionally()
:exceptionally()
接受一个fn
函数,当上一个CompletionStage
出现异常时,会把该异常作为参数传 递到fn
函数-
public static void main(String[] args) { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("异常"); }).exceptionally(e -> { System.out.println(e); return e; }); }
-
- 基于
Treiber stack
结构,实现任务的存储。
线程池
- 池化技术的核心是复用,线程池通过 ThreadPoolExecutor 实现,多个线程的复用:
public ThreadPoolExecutor(int corePoolSize, //核心线程数 int maximumPoolSize, //最大线程数 long keepAliveTime, //存活时间 TimeUnit unit, //存活单位 BlockingQueue<Runnable> workQueue, //阻塞队列 ThreadFactory threadFactory, //线程工厂,用来创建工作线程的。 默认实现(自定义线程池中线程的名字) RejectedExecutionHandler handler) { //拒绝执行策略 。默认实现 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
- 运行步骤
- 检查当前运行线程有没有超过核心线程数,如果没有则创建核心线程
- 如果核心线程数已满,则检查阻塞队列的容量,如果还有容量则直接加入队列
- 如果队列已满,检查当前线程有没有超过最大线程数,如果没有超过,则创建新线程执行
- 如果超过最大线程数,执行拒绝策略
- 线程池参数设置
- IO密集型,CPU利用率不高。可以设为
核心数*2+1
- CPU密集型,CPU利用率很高,线程过多会增加上下文切换。可以设为
核心数+1
- IO密集型,CPU利用率不高。可以设为