Future及ForkJoin框架

Future

1. Future的应用场景

在并发编程中,我们经常用到非阻塞的模型,在之前的多线程的三种实现中,不管是继承thread类还是实现runnable接口,都无法保证获取到之前的执行结果。通过实现Callback接口,并用Future可以来接收多线程的执行结果。

2. Future的类图结构

Future接口定义了主要的5个接口方法,有RunnableFuture和SchedualFuture继承这个接口,以及CompleteFuture和ForkJoinTask继承这个接口。

1.png

RunnableFuture

  • 这个接口同时继承Future接口和Runnable接口,在成功执行run()方法后,可以通过Future访问执行结果。这个接口都实现类是FutureTask,一个可取消的异步计算,这个类提供了Future的基本实现,后面我们的demo也是用这个类实现,它实现了启动和取消一个计算,查询这个计算是否已完成,恢复计算结果。计算的结果只能在计算已经完成的情况下恢复。如果计算没有完成,get方法会阻塞,一旦计算完成,这个计算将不能被重启和取消,除非调用runAndReset方法。

  • FutureTask能用来包装一个Callable或Runnable对象,因为它实现了Runnable接口,而且它能被传递到Executor进行执行。为了提供单例类,这个类在创建自定义的工作类时提供了protected构造函数。

SchedualFuture

  • 这个接口表示一个延时的行为可以被取消。通常一个安排好的future是定时任务SchedualedExecutorService的结果

CompleteFuture

  • 一个Future类是显示的完成,而且能被用作一个完成等级,通过它的完成触发支持的依赖函数和行为。当两个或多个线程要执行完成或取消操作时,只有一个能够成功。

ForkJoinTask

  • 基于任务的抽象类,可以通过ForkJoinPool来执行。一个ForkJoinTask是类似于线程实体,但是相对于线程实体是轻量级的。大量的任务和子任务会被ForkJoinPool池中的真实线程挂起来,以某些使用限制为代价。

3. Future的主要方法

Future接口主要包括5个方法:

  • get()方法可以当任务结束后返回一个结果,如果调用时,工作还没有结束,则会阻塞线程,直到任务执行完毕

  • get(long timeout,TimeUnit unit)做多等待timeout的时间就会返回结果

  • cancel(boolean mayInterruptIfRunning)方法可以用来停止一个任务,如果任务可以停止(通过mayInterruptIfRunning来进行判断),则可以返回true,如果任务已经完成或者已经停止,或者这个任务无法停止,则会返回false.

  • isDone()方法判断当前方法是否完成

  • isCancel()方法判断当前方法是否取消

4.Future实例

public static void main(String[] args) throws InterruptedException, ExecutionException {
	long start = System.currentTimeMillis();
	
	// 等凉菜 
	Callable ca1 = new Callable(){

		@Override
		public String call() throws Exception {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			return "凉菜准备完毕";
		}
	};
	FutureTask<String> ft1 = new FutureTask<String>(ca1);
	new Thread(ft1).start();
	
	// 等包子 -- 必须要等待返回的结果,所以要调用join方法
	Callable ca2 = new Callable(){

			@Override
			public Object call() throws Exception {
				try {
					Thread.sleep(1000*3);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				return "包子准备完毕";
		}
	};
	FutureTask<String> ft2 = new FutureTask<String>(ca2);
	new Thread(ft2).start();
	
	System.out.println(ft1.get());
	System.out.println(ft2.get());
	
	long end = System.currentTimeMillis();
	System.out.println("准备完毕时间:"+(end-start));
}

ForkJoin

JDK1.7提供了一个将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果或者不合并的功能:RecursiveAction:用于没有返回结果的任务(类似Runnable)、RecursiveTask:用于有返回结果的任务(类似Callable) 2.png

1、变量说明

ForkJoin框架中,深入理解上面的3个类就可以基本对框架有更好的认识。下面先来大致介绍一下每个类中我们需要关注的字段和相关联系。

ForkJoinPool类

  • ADD_WORKER: 100000000000000000000000000000000000000000000000 -> 1000 0000 0000 0000,用来配合ctl在控制线程数量时使用

  • ctl: 控制ForkJoinPool创建线程数量,(ctl & ADD_WORKER) != 0L 时创建线程,也就是当ctl的第16位不为0时,可以继续创建线程

  • defaultForkJoinWorkerThreadFactory: 默认线程工厂,默认实现是DefaultForkJoinWorkerThreadFactory

  • runState: 全局锁控制,全局运行状态

  • workQueues: 工作队列数组WorkQueue[]

  • config: 记录并行数量和ForkJoinPool的模式(异步或同步)

ForkJoinTask类

  • status: 任务的状态,对其他工作线程和pool可见,运行正常则status为负数,异常情况为正数

WorkQueue类

  • qlock: 并发控制,put任务时的锁控制

  • array: 任务数组ForkJoinTask<?>[]

  • pool: ForkJoinPool,所有线程和WorkQueue共享,用于工作窃取、任务状态和工作状态同步

  • base: array数组中取任务的下标

  • top: array数组中放置任务的下标

  • owner: 所属线程,ForkJoin框架中,只有一个WorkQueue是没有owner的,其他的均有具体线程owner

ForkJoinWorkerThread类

  • pool: ForkJoinPool,所有线程和WorkQueue共享,用于工作窃取、任务状态和工作状态同步

  • workQueue: 当前线程的任务队列,与WorkQueue的owner呼应

3.png

ForkJoinPool作为最核心的组件,维护了所有的任务队列WorkQueues,workQueues维护着所有线程池的工作线程,工作窃取算法就是在这里进行的。每一个WorkQueue对象中使用pool保留对ForkJoinPool的引用,用来获取其WorkQueues来窃取其他工作线程的任务来执行。同时WorkQueue对象中的owner是ForkJoinWorkerThread工作线程,绑定ForkJoinWorkerThread和WorkQueue的一对一关系,每个工作线程会优先完成自己队列的任务,当自己队列中的任务为空时,才会通过工作窃取算法从其他任务队列中获取任务。WorkQueue中的ForkJoinTask<?>[] array,是每一个具体的任务,插入array中的第一个任务是最大的任务。

2、实例代码

package com.example.forkjoinpool;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ForkjoinpoolApplication {


    public static void main(String[] args) throws InterruptedException, ExecutionException {
        PrintAction printAction = new PrintAction(1, 100);
        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(printAction);
        //线程阻塞,等待所有任务完成
        pool.awaitTermination(2, TimeUnit.SECONDS);
        pool.shutdown();

//        PrintTask printTask = new PrintTask(1, 100);
//        ForkJoinPool pool = new ForkJoinPool();
//        pool.submit(printTask);
//        System.out.println( printTask.get());
        //线程阻塞,等待所有任务完成
//        pool.awaitTermination(2, TimeUnit.SECONDS);
//        pool.shutdown();

    }
}

class PrintAction extends RecursiveAction {
    private final int THRESHOLD = 50; //最多只能打印50个数
    private int start;
    private int end;
    public AtomicInteger atomicInteger = new AtomicInteger(0);

    public PrintAction(int start, int end) {
        super();
        this.start = start;
        this.end = end;
    }


    @Override
    protected void compute() {
        if (end - start < THRESHOLD) {
            for (int i = start; i < end; i++) {
                atomicInteger.addAndGet(i);
                System.out.println(Thread.currentThread().getName() + "的i值:" + i);
                System.out.println(Thread.currentThread().getName() + "的atomicInteger值:" + atomicInteger);
            }
        } else {
            int middle = (start + end) / 2;
            PrintAction left = new PrintAction(start, middle);
            PrintAction right = new PrintAction(middle, end);
            //并行执行两个“小任务”
            left.fork();
            right.fork();
        }

    }

}

class PrintTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 50; //最多只能打印50个数
    private int start;
    private int end;


    public PrintTask(int start, int end) {
        super();
        this.start = start;
        this.end = end;
    }


    @Override
    protected Integer compute() {
        int sum = 0;
        if (end - start < THRESHOLD) {
            for (int i = start; i < end; i++) {
                sum += i;
                System.out.println(Thread.currentThread().getName() + "的i值:" + i);
            }
            return sum;
        } else {
            int middle = (start + end) / 2;
            PrintTask left = new PrintTask(start, middle);
            PrintTask right = new PrintTask(middle, end);
            //并行执行两个“小任务”
            left.fork();
            right.fork();
            return left.join() + right.join();
        }
    }
}

3、实现亮点

Work-Steal算法

相比其他线程池实现,这个是ForkJoin框架中最大的亮点。当空闲线程在自己的WorkQueue没有任务可做的时候,会去遍历其他的WorkQueue,并进行任务窃取和执行,提高程序响应和性能。

取2的n次幂作为长度的实现

//代码位于java.util.concurrent.ForkJoinPool#externalSubmit
if ((rs & STARTED) == 0) {
    U.compareAndSwapObject(this, STEALCOUNTER, null,
                           new AtomicLong());
    // create workQueues array with size a power of two
    int p = config & SMASK; // ensure at least 2 slots
    int n = (p > 1) ? p - 1 : 1;
    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
    workQueues = new WorkQueue[n];
    ns = STARTED;
}

这里的p其实就是设置的并行线程数,在为ForkJoinPool创建WorkQueue[]数组时,会对传入的p进行一系列位运算,最终得到一个大于等于2p的2的n次幂的数组长度

内存屏障

//代码位于java.util.concurrent.ForkJoinPool#externalSubmit
if ((a != null && a.length > s + 1 - q.base) ||
    (a = q.growArray()) != null) {
    int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
    //通过Unsafe进行内存值的设置,高效,且屏蔽了处理器和Java编译器的指令乱序问题
    U.putOrderedObject(a, j, task);
    U.putOrderedInt(q, QTOP, s + 1);
    submitted = true;
}

这里在对单个WorkQueue的array进行push任务操作时,先后使用了putOrderedObject和putOrderedInt,确保程序执行的先后顺序,同时这种直接操作内存地址的方式也会更加高效。

高并发:细粒度WorkQueue的锁

//代码位于java.util.concurrent.ForkJoinPool#externalSubmit
//如果qlock为0,说明当前没有其他线程操作改WorkQueue
//尝试CAS操作,修改qlock为1,对这个WorkQueue进行加锁
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
    ForkJoinTask<?>[] a = q.array;
    int s = q.top;
    boolean submitted = false; // initial submission or resizing
    try {                      // locked version of push
        if ((a != null && a.length > s + 1 - q.base) ||
            (a = q.growArray()) != null) {
            int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);
            U.putOrderedInt(q, QTOP, s + 1);
            submitted = true;
        }
    } finally {
    	  //finally将qlock置为0,进行锁的释放,其他线程可以使用
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    if (submitted) {
        signalWork(ws, q);
        return;
    }
}

已有 0 条评论

    欢迎您,新朋友,感谢参与互动!