Executors及ExecutorService使用

Executor的UML图

20180530120605822.png Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize, // 1
                          int maximumPoolSize,  // 2
                          long keepAliveTime,  // 3
                          TimeUnit unit,  // 4
                          BlockingQueue<Runnable> workQueue, // 5
                          ThreadFactory threadFactory,  // 6
                          RejectedExecutionHandler handler ) { //7
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
名称含义备注
corePoolSize核心线程池大小有新任务进来小于corePoolSize时,会自动创建新线程
workQueue线程等待队列任务超过corePoolSize时,会加入到队列中,该队列可设置公平或不公平
threadFactory线程创建工厂可以自定义线程创建工厂
maximumPoolSize最大线程池大小当corePoolSize和workQueue都放满后,没有超过maximumPoolSize的情况下会创建新线程执行
keepAliveTime线程最大空闲时间当所有任务都执行完后,超过keepAliveTime时会释放
unit时间单位和keepAliveTime配合使用,可支持纳秒级别
handler拒绝策略单超过corePoolSize+workQueue+maximumPoolSize时,可以定义拒绝策略

Executors

Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

方法说明
newCachedThreadPool()缓存型池子,先查看池中有没有以前建立的线程,如果有,就 reuse.如果没有,就建一个新的线程加入池中-缓存型池子通常用于执行一些生存期很短的异步型任务 因此在一些面向连接的daemon型SERVER中用得不多。但对于生存期短的异步任务,它是Executor的首选。-能reuse的线程,必须是timeout IDLE内的池中线程,缺省     timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。  注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。
newFixedThreadPool(int)        newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程-其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子-和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器-从方法的源代码看,cache池和fixed 池调用的是同一个底层 池,只不过参数不同:fixed池线程数固定,并且是0秒IDLE(无IDLE)    cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE  
newScheduledThreadPool(int)调度型线程池-这个池子里的线程可以按schedule依次delay执行,或周期执行
SingleThreadExecutor()单例线程,任意时间池中只能有一个线程-用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)

Executor VS ExecutorService VS Executors 三者间的区别

  • Executor 和 ExecutorService 这两个接口主要的区别是:ExecutorService 接口继承了 Executor 接口,是 Executor 的子接口
  • Executor 和 ExecutorService 第二个区别是:Executor 接口定义了 execute()方法用来接收一个Runnable接口的对象,而 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的对象。
  • Executor 和 ExecutorService 接口第三个区别是 Executor 中的 execute() 方法不返回任何结果,而 ExecutorService 中的 submit()方法可以通过一个 Future 对象返回运算结果。
  • Executor 和 ExecutorService 接口第四个区别是除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用 shutDown() 方法终止线程池。可以通过 《Java Concurrency in Practice》 一书了解更多关于关闭线程池和如何处理 pending 的任务的知识。
  • Executors 类提供工厂方法用来创建不同类型的线程池。比如: newSingleThreadExecutor() 创建一个只有一个线程的线程池,newFixedThreadPool(int numOfThreads)来创建固定线程数的线程池,newCachedThreadPool()可以根据需要创建新的线程,但如果已有线程是空闲的会重用已有线程。

Executors创建线程池

import java.util.ArrayList;   
import java.util.List;   
import java.util.concurrent.*;   

public class CallableDemo{   
    public static void main(String[] args){   
        ExecutorService executorService = Executors.newCachedThreadPool();   
        List<Future<String>> resultList = new ArrayList<Future<String>>();   

        //创建10个任务并执行   
        for (int i = 0; i < 10; i++){   
            //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中   
            Future<String> future = executorService.submit(new TaskWithResult(i));   
            //将任务执行结果存储到List中   
            resultList.add(future);   
        }   

        //遍历任务的结果   
        for (Future<String> fs : resultList){   
                try{   
                    while(!fs.isDone);//Future返回如果没有完成,则一直循环等待,直到Future返回完成  
                    System.out.println(fs.get());     //打印各个线程(任务)执行的结果   
                }catch(InterruptedException e){   
                    e.printStackTrace();   
                }catch(ExecutionException e){   
                    e.printStackTrace();   
                }finally{   
                    //启动一次顺序关闭,执行以前提交的任务,但不接受新任务  
                    executorService.shutdown();   
                }   
        }   
    }   
}   


class TaskWithResult implements Callable<String>{   
    private int id;   

    public TaskWithResult(int id){   
        this.id = id;   
    }   

    /**  
     * 任务的具体过程,一旦任务传给ExecutorService的submit方法, 
     * 则该方法自动在一个线程上执行 
     */   
    public String call() throws Exception {  
        System.out.println("call()方法被自动调用!!!    " + Thread.currentThread().getName());   
        //该返回结果将被Future的get方法得到  
        return "call()方法被自动调用,任务返回的结果是:" + id + "    " + Thread.currentThread().getName();   
    }   
}  

自定义线程池

import java.util.concurrent.ArrayBlockingQueue;   
import java.util.concurrent.BlockingQueue;   
import java.util.concurrent.ThreadPoolExecutor;   
import java.util.concurrent.TimeUnit;   

public class ThreadPoolTest{   
    public static void main(String[] args){   
        //创建等待队列   
        BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);   
        //创建线程池,池中保存的线程数为3,允许的最大线程数为5  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
        for (int i = 0; i < 7; i++){ 
        	pool.execute(new MyThread());   
        }
        //关闭线程池   
        pool.shutdown();   
    }   
}   

class MyThread implements Runnable{   
    @Override   
    public void run(){   
        System.out.println(Thread.currentThread().getName() + "正在执行。。。");   
        try{   
            Thread.sleep(100);   
        }catch(InterruptedException e){   
            e.printStackTrace();   
        }   
    }   
}  

this逃逸问题

this逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误, 因此应该避免this逃逸的发生.this逃逸经常发生在构造函数中启动线程或注册监听器时, 如:

public class ThisEscape { 
  public ThisEscape() { 
    new Thread(new EscapeRunnable()).start(); 
    // ... 
  } 

  private class EscapeRunnable implements Runnable { 
    @Override 
    public void run() { 
      // 通过ThisEscape.this就可以引用外围类对象, 但是此时外围类对象可能还没有构造完成, 即发生了外围类的this引用的逃逸 
    } 
  } 
} 

解决方法:

public class ThisEscape { 
  private Thread t; 
  public ThisEscape() { 
    t = new Thread(new EscapeRunnable()); 
    // ... 
  } 

  public void init() { 
    t.start(); 
  } 

  private class EscapeRunnable implements Runnable { 
    @Override 
    public void run() { 
      // 通过ThisEscape.this就可以引用外围类对象, 此时可以保证外围类对象已经构造完成 
    } 
  } 
} 

已有 0 条评论

    欢迎您,新朋友,感谢参与互动!