凌月风的个人博客

记录精彩人生

Open Source, Open Mind,
Open Sight, Open Future!
  menu

Java笔记系列——05-并发编程(2)

0 浏览

线程通信

  • 通过内存共享实现通信。一个进程包含多个线程,线程之间共享同一块内存

  • 通过wait/notify机制实现通信: 基于某个条件来等待或者唤醒
    • 依赖于Monitor对象,所以必须配合Synchronized关键字使用
      image-20220609185708742

  • 通过join实现通信。join也是基于wait/notify来实现,wait的调用实在java代码中,notify调用是在线程销毁之后通过C语言调用的的。

  • 通过Condition实现通信
    • Condition 是JUC(java.util.concurrent)包下用来实现线程通信的机制,等价于JUC包下的wait/notify。由于wait/notify依赖于Synchronized关键字,而JUC包下使用锁是Lock,所以JUC包使用Condition通信。
    • 原理跟wait/notify相似,使用的方法是awaitsignal。线程A和线程B基于同一把锁实现Condition通信步骤
      1. 线程A抢占到了锁,AQSstate=1,独占锁的线程是A线程。B线程位于AQS等待队列中
      2. **线程A调用Condition的 **await()方法后,将自己加入到Condition中的等待队列(单向链表),完整释放自己持有的锁(重入锁情况),使自己处于阻塞状态。
      3. **线程A释放锁之后,等待队列中的线程B抢占到锁。线程B调用 **signal()方法,唤醒Condition等待队列中的线程A。把等待队列中被唤醒的线程A转移到AQS队列中
      4. 线程B执行完毕释放锁之后,线程A从AQS队列中抢占到锁,然后继续执行线程A的逻辑image-20220610121423822image-20220609202526085

JUC工具包

阻塞队列

  • **队列是一种只允许在一端进行删除操作,在另一端进行插入操作的线性表,允许插入的一端称为队尾、 允许删除的一端称为队头。 那么阻塞队列,实际上是在队列的基础上增加了两个操作。 **
    • 支持阻塞插入:队列满了的情况下,会阻塞继续往队列中添加数据的线程,直到队列元素被释放。 当阻塞队列满了之后的不同的处理策略,针对不同策略提供了不同的通用方法
      • **add -> 如果队列满了,抛出异常 **
      • **offer -> true/false , 添加成功返回true,否则返回false **
      • **put -> 如果队列满了,则一直阻塞 **
      • offer(timeout) , 带了一个超时时间。如果添加一个元素,队列满了,此时会阻塞timeout 时长,超过阻塞时长,返回false。
    • 支持阻塞移除:队列为空的情况下,会阻塞从队列中获取元素的线程,直到队列添加了新的元素。当队列空了还要移除元素的时候,也提供了不同的策略
      • **element-> 队列为空,抛异常 **
      • **peek -> true/false , 移除成功返回true,否则返回false **
      • **take -> 一直阻塞 **
      • poll(timeout) -> 如果超时了,还没有元素,则返回null
  • ArrayBlockingQueue :存储元素的结构为Object数组
  • LinkedBlockingQueue :存储元素的结构为链表
  • LinkedBlockingDeque:存储元素的结构双向链表,支持双向插入和移除。 在一定程度上能够解决多线程的竞争问题。
  • PriorityBlcokingQueue :存储元素的结构为Object数组,可以传入比较器,以此来对元素进行排序
  • DelayQueue:延时队列,队列存储的元素必须为Delay接口的实现类 ,会根据实现类的具体实现,决定多长时间之后执行任务
  • SynchronousQueue :没有任何存储结构的的队列,通过消息传递阻塞生产者和消费者。如果没有生产者,那么阻塞所有消费者,如果没有消费者那么阻塞所有生产者

CountDownLatch

  • **实际应用 :在启动应用的时候,去对第三方的应用做健康检测 **
  • 实现原理 :
    1. 使用new CountDownLatch(3)创建时,会将AQS中的state设为3。
    2. 当线程调用await()方法时,判断AQS中的state是否为0,不为0会将当前线程放到AQS队列中,并进行阻塞。
    3. 多个线程调用时,都会在AQS队列中阻塞。
    4. 当线程调用countDown()时,会对AQS中的state进行减一操作,如果state值变为0。就会唤醒所有AQS队列中的阻塞线程
    5. 该实现原理可以允许多个线程同时抢占到锁,阻塞等待,然后等到计数器归零的时候,同时唤醒属于是共享锁的实现

Semaphore

  • 实际应用:可以作为信号灯、 限流器、限制资源的访问.。

  • 实现原理:

    1. 使用Semaphore semaphore = new Semaphore(20);创建时会将AQS中的state设为20。
    2. 当线程调用acquire(x)方法时,操作AQS中的state减x,如果减x之后state<0,则将当前线程加入到AQS队列中,并进行阻塞。
    3. 当线程调用release(x)方法时,操作AQS中的state加x,并且唤醒所有AQS队列中的阻塞线程。被唤醒的线程会继续尝试获取令牌,即操作AQS中的state减x,此x是阻塞之前线程调用acquire(x)命令的x。
      • 如果减x之后state<0,那么继续阻塞
      • 如果state=0,那么就继续执行
    • 本质上通过令牌抢占,初始化的时候约定有多少令牌,抢占到令牌就允许执行,执行完之后归还令牌。没抢到就阻塞。

CyclicBarrier

  • 代码示例:
    public class CyclicBarrierExample {
    
     public static void main(String[] args) {
         CyclicBarrier cb = new CyclicBarrier(4, () -> {
             System.out.println("4个都执行完毕");
         });
    
         for (int i = 0; i < 4; i++) {
             int finalI = i;
             new Thread(() -> {
                 try {
                     for (int j = 0; j < finalI; j++) {
                         Thread.sleep(1000);
                     }
                     cb.await();
                     System.out.println(Thread.currentThread().getName() + "线程执行完毕");
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 } catch (BrokenBarrierException e) {
                     e.printStackTrace();
                 }
             }, i + "").start();
         }
     }
    }
    

ConcurrentHashMap

  • 采用数组+链表的存储结构
  • 数组默认长度为16,
  • 链表在数组长度64,链表长度为8的时候转为红黑树
  • 初始化代码
    private final Node<K,V>[] initTable() {
      Node<K,V>[] tab; int sc;
      while ((tab = table) == null || tab.length == 0) {
          if ((sc = sizeCtl) < 0)
              Thread.yield(); // lost initialization race; just spin
          else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
              try {
                  if ((tab = table) == null || tab.length == 0) {
                      int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                      @SuppressWarnings("unchecked")
                      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                      table = tab = nt;
                      sc = n - (n >>> 2);
                  }
              } finally {
                  sizeCtl = sc;
              }
              break;
          }
      }
      return tab;
    }
    
  • 扩容的时候会使用多线程锁定数组的某一段进行扩容

Fork/Join

  • Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
  • 原理
    • 创建一个ForkJoinTaskForkJoinTask是一个抽象类,封装了一个双端阻塞的工作队列,执行ForkJoinTask.fork()方法时会将这个任务加入到这个工作队列
    • ForkJoinTask交给 ForkJoinPool去执行。ForkJoinPool本质是一个线程池的实现,专门用来执行ForkJoinTask。在线程去执行任务时,会有个工作窃取的思想,当前线程执行完成之后,会从其他的任
    • ForkJoinTask交给 ForkJoinPool去执行后,会返回一个ForkJoinTask,调用这个get()方法,完成结果获取
  • 代码示例
    package com.gupaoedu.concurrent;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * 咕泡学院,只为更好的你
     * 咕泡学院-Mic: 2082233439
     * http://www.gupaoedu.com
     **/
    public class ForkJoinExample {
    
        //java8 parallStream
    
        //针对一个数字,做计算。
        private static final Integer MAX=200;
    
        static class CalcForJoinTask extends RecursiveTask<Integer> {
            private Integer startValue; //子任务的开始计算的值
            private Integer endValue; //子任务结束计算的值
    
            public CalcForJoinTask(Integer startValue, Integer endValue) {
                this.startValue = startValue;
                this.endValue = endValue;
            }
            @Override
            protected Integer compute() {
                //如果当前的数据区间已经小于MAX了,那么接下来的计算不需要做拆分
                if(endValue-startValue<MAX){
                    System.out.println("开始计算:startValue:"+startValue+" ; endValue:"+endValue);
                    Integer totalValue=0;
                    for(int i=this.startValue;i<=this.endValue;i++){
                        totalValue+=i;
                    }
                    return totalValue;
                }
                CalcForJoinTask subTask=new CalcForJoinTask(startValue,(startValue+endValue)/2);
                subTask.fork();
                CalcForJoinTask calcForJoinTask=new CalcForJoinTask((startValue+endValue)/2+1,endValue);
                calcForJoinTask.fork();
                return subTask.join()+calcForJoinTask.join();
            }
        }
    
        public static void main(String[] args) {
            CalcForJoinTask calcForJoinTask=new CalcForJoinTask(1,10000);
            ForkJoinPool pool=new ForkJoinPool();
            ForkJoinTask<Integer> taskFuture=pool.submit(calcForJoinTask);
            try {
                Integer result=taskFuture.get();
                System.out.println("result:"+result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    

    代码流程

    • 为了更清晰的了解fork/join的原理,我们通过一个图形来理解。整体思想其实就是拆分与合并
    • 图中最顶层的任务使用submit方式被提交到Fork/Join框架中,
    • Fork/Join把这个任务放入到某个线程 中运行,工作任务中的compute方法的代码开始对这个任务T1进行分析。
    • 如果当前任务需要累加的数 字范围过大(代码中设定的是大于200),则将这个计算任务拆分成两个子任务(T1.1和T1.2),每个 子任务各自负责计算一半的数据累加,请参见代码中的fork方法。
    • 如果当前子任务中需要累加的数字范 围足够小(小于等于200),就进行累加然后返回到上层任务中。

    image-20220610221017910

  • 采用了工作窃取的思想

FutureTask/Callable

  • 提供了一个带返回值的线程封装
  • 实现原理
    • 本质上还是对于线程的封装和使用,在FuturTask中的线程没有执行完成时调用get()方法,会将调用线程放入到一个等待队列中并使用LockSupport.park()进行阻塞,等到线程执行完成返回结果时,将等待队列中阻塞的线程进行唤醒。 image-20220613151147303

CompletableFuture

  • CompletableFuture属于对于Future接口的增强,添加了许多增强的方法,支持异步回调。
  • 代码结构image-20220613155026404
    • 通过Future同步等待执行结果
    • 通过CompletionStage,增强异步回调的功能。
  • 构建方法
    • CompletableFuture.supplyAsync(runnable) :异步执行一个任务,提供返回值
    • CompletableFuture.supplyAsync(runnable,Executor executor):异步执行一个任务,可以自定义线程池,提供返回值
    • CompletableFuture.runAsync(runnable,Executor executor):异步执行一个任务,可以自定义线程池,没有返回值
    • CompletableFuture.runAsync(runnable): 异步执行一个任务,没有返回值
    • new CompletableFuture()
  • 实现自CompletionStage的方法
    • **纯消费类型的方法 **
      • 纯消费类型的方法,指依赖上一个异步任务的结果作为当前函数的参数进行下一步计算,它的特点 是不返回新的计算值
      • CompletionStage中包含9个方法,又可以分为三类:依赖单个CompletionStage任务完成,依赖两 个CompletionStage任务都完成,依赖两个CompletionStage中的任何一个完成。
      • //当前线程同步执行
        public CompletionStage<Void> thenAccept(Consumer<? super T> action);
        //使用ForkJoinPool.commonPool线程池执行action
        public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
        //使用自定义线程池执行action
        public CompletionStage<Void> thenAcceptAsync(Consumer<? super T>
                                                     action,Executor executor);
        public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U>
                                                        other,BiConsumer<? super T, ? super U> action);
        public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<?
                                                             extends U> other,BiConsumer<? super T, ? super U> action);
        public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<?
                                                             extends U> other,BiConsumer<? super T, ? super U> action,Executor executor);
        public CompletionStage<Void> acceptEither(CompletionStage<? extends T>
                                                  other,Consumer<? super T> action);
        public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T>
                                                       other,Consumer<? super T> action);
        public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T>
                                                       other,Consumer<? super T> action,Executor executor);
        
    • 有返回值类型的方法
      • 有返回值类型的方法,就是用上一个异步任务的执行结果进行下一步计算,并且会产生一个新的有 返回值的CompletionStage对象。
      • CompletionStage中,定义了9个带有返回结果的方法,同样也可以分为三个类型:依赖单个CompletionStage任务完成,依赖两个CompletionStage任务都完成,依赖两个CompletionStage 中的任何一个完成。
      • public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
        public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U>
                                                     fn);
        public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U>
                                                     fn,Executor executor);
        public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U>
                                                    other,BiFunction<? super T,? super U,? extends V> fn);
        public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends
                                                         U> other,BiFunction<? super T,? super U,? extends V> fn);
        public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends
                                                         U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
        public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T>
                                                    other,Function<? super T, U> fn);
        public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends
                                                         T> other,Function<? super T, U> fn);
        public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends
                                                         T> other,Function<? super T, U> fn,Executor executor);
        
    • 不消费也不返回的方法
      • public CompletionStage<Void> thenRun(Runnable action);
        public CompletionStage<Void> thenRunAsync(Runnable action);
        public CompletionStage<Void> thenRunAsync(Runnable action,Executor
                                                  executor);
        public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable
                                                  action);
        public CompletionStage<Void> runAfterBothAsync(CompletionStage<?>
                                                       other,Runnable action);
        public CompletionStage<Void> runAfterBothAsync(CompletionStage<?>
                                                       other,Runnable action,Executor executor);
        public CompletionStage<Void> runAfterEither(CompletionStage<?>
                                                    other,Runnable action);
        public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?>
                                                         other,Runnable action);
        public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?>
                                                         other,Runnable action,Executor executor);
        
    • 多任务组合
      • public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends
                                                  CompletionStage<U>> fn);
        public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends
                                                       CompletionStage<U>> fn);
        public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends
                                                       CompletionStage<U>> fn,Executor executor);
        
  • 异常处理方法
    • whenComplete():表示当任务执行完成后,会触发的方法。
      • public static void main(String[] args) {
            CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("异常");
            }).whenComplete((r, th) -> {
                System.out.println(th);
            });
        }
        
    • handle():表示前置任务执行完成后,不管前置任务执行状态是正常还是异常,都会执行handle中的函数,它和whenComplete的作用几乎一致,不同点在于,handle是一个有返回值类型的方 法。
      • public static void main(String[] args) {
            CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("异常");
            }).handle((r, th) -> {
                System.out.println(th);
                return th;
            });
        }
        
    • exceptionally()exceptionally()接受一个 fn 函数,当上一个CompletionStage出现异常时,会把该异常作为参数传 递到 fn 函数
      • public static void main(String[] args) {
            CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("异常");
            }).exceptionally(e -> {
                System.out.println(e);
                return e;
            });
        }
        
  • 基于Treiber stack结构,实现任务的存储。

线程池

  • 池化技术的核心是复用,线程池通过 ThreadPoolExecutor 实现,多个线程的复用:
    public ThreadPoolExecutor(int corePoolSize, //核心线程数
                           int maximumPoolSize, //最大线程数
                           long keepAliveTime, //存活时间
                           TimeUnit unit, //存活单位
                           BlockingQueue<Runnable> workQueue, //阻塞队列
                           ThreadFactory threadFactory, //线程工厂,用来创建工作线程的。 默认实现(自定义线程池中线程的名字)
                           RejectedExecutionHandler handler) { //拒绝执行策略 。默认实现
         if (corePoolSize < 0 ||
             maximumPoolSize <= 0 ||
             maximumPoolSize < corePoolSize ||
             keepAliveTime < 0)
             throw new IllegalArgumentException();
     if (workQueue == null || threadFactory == null || handler == null)
         throw new NullPointerException();
     this.acc = System.getSecurityManager() == null ?
         null :
     AccessController.getContext();
     this.corePoolSize = corePoolSize;
     this.maximumPoolSize = maximumPoolSize;
     this.workQueue = workQueue;
     this.keepAliveTime = unit.toNanos(keepAliveTime);
     this.threadFactory = threadFactory;
     this.handler = handler;
    }
    
  • 运行步骤
    1. 检查当前运行线程有没有超过核心线程数,如果没有则创建核心线程
    2. 如果核心线程数已满,则检查阻塞队列的容量,如果还有容量则直接加入队列
    3. 如果队列已满,检查当前线程有没有超过最大线程数,如果没有超过,则创建新线程执行
    4. 如果超过最大线程数,执行拒绝策略
  • 线程池参数设置
    • IO密集型,CPU利用率不高。可以设为核心数*2+1
    • CPU密集型,CPU利用率很高,线程过多会增加上下文切换。可以设为核心数+1
image/svg+xml