您所在的位置:主页 > JAVA技术 >

JDK多任务执行框架(Executor框架)

时间:2018-02-09 17:09来源:未知 作者:os 点击:

 
 
为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效地进行线程控制。它们都在java.util.concurrent包中,是JDK开发包的核心。其中有一个重要的类:Executors,他扮演这线程工厂的角色,我们通过Executors可以创建特定功能的线程池。
 
newFixedThreadPool()方法,该方法返回一个固定数量的线程池,该方法的线程数始终不变,当有一个任务提交时,如线程池中有空闲,则立即执行,如没有,则会被暂缓在一个任务队列中等待有空闲的线程去执行。
newSingleThreadExecutor()方法,创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。
newCachedThreadPool()方法,返回一个可以根据实际情况调整线程个数的线程池,不限制最大线程数量,若有空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60秒后自动收回。
 newScheduleThreadPool()方法,返回一个ScheduledExecutorService对象,但该线程池可以指定线程的数量。
查看它们的源码:
 
 
复制代码
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
 
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
 
     public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
 
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        //这里super是ThreadPoolExecutor类
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    
复制代码
从源码看以看出它们都是由ThreadPoolExecutor构造出来的。
 
ThreadPoolExecutor构造方法概述
ThreadPoolExecutor(
 
  int corePoolSize,//核心线程数,线程池刚初始化的时候实例化线程个数
 
  int maximumPoolSize,//最大线程数
 
  long keepLongTime,//空闲时间,过时回收
 
  TimeUnit unit,//时间单位
 
  BlockingQueue<Runable> worker,//线程暂缓处
 
  ThreadFactory threadFactory,
 
  RejectExecuteHandle handle//拒绝执行的方法
 
)
 
所以上面几个方法主要是根据不同的参数来执行不同的行为。其中newScheduleThreadPool方法稍复杂一点,它的方法值是ScheduledExecutorService。
 
ScheduledExecutorService主要有两个方法scheduleWithFixedDelay,scheduleAtFixedRate
 
1、scheduleAtFixedRate 方法,以固定的频率来执行某项计划(任务),即固定的频率来执行某项计划,它不受计划执行时间的影响。到时间就执行。 它不受计划执行时间的影响。
2、scheduleWithFixedDealy,相对固定的延迟后,执行某项计划(任务), 即无论某个任务执行多长时间,等执行完了,我再延迟指定的时间去执行。它受计划执行时间的影响。
 
 
复制代码
class Temp extends Thread {
    public void run() {
        try {
            System.out.println( new Date().toString() + "..run start");
            Thread.sleep(3000);
            System.out.println( new Date().toString() + "...run end");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
 
public class ScheduledJob {
    
    public static void main(String args[]) throws Exception {
    
        Temp command = new Temp();
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        System.out.println(new Date().toString() + "...start");
        //scheduler.scheduleWithFixedDelay(command, 5, 2, TimeUnit.SECONDS);
        scheduler.scheduleAtFixedRate(command, 5, 2, TimeUnit.SECONDS);
    }
}
复制代码
scheduleAtFixedRate 的执行结果:run方法延迟了5秒执行,run方法执行了3秒,已经超过了我们指定的间隔2秒,所以它在第一个run方法执行完后,立即执行了第二个run方法
 
复制代码
Tue Feb 06 15:48:01 CST 2018...start
Tue Feb 06 15:48:06 CST 2018..run start
Tue Feb 06 15:48:09 CST 2018...run end
Tue Feb 06 15:48:09 CST 2018..run start
Tue Feb 06 15:48:12 CST 2018...run end
Tue Feb 06 15:48:12 CST 2018..run start
复制代码
scheduleWithFixedDealy的执行结果:run方法延迟了5秒执行,run方法执行了3秒,已经超过了我们指定的间隔2秒,但是它还是等了2秒后,再执行了第二个run方法
 
复制代码
Tue Feb 06 15:53:04 CST 2018...start
Tue Feb 06 15:53:10 CST 2018..run start
Tue Feb 06 15:53:13 CST 2018...run end
Tue Feb 06 15:53:15 CST 2018..run start
Tue Feb 06 15:53:18 CST 2018...run end
Tue Feb 06 15:53:20 CST 2018..run start
Tue Feb 06 15:53:23 CST 2018...run end
复制代码
自定义线程池
如果Executors工厂类无法满足我们的需求,可以自己去创建线程池。在自己创建线程池时,这个构造方法对于队列是什么类型比较关键:
 
使用有届队列(ArrayBlockingQueue):若有新的任务需要执行时,如果线程池实际线程数小于corePoolSize,则优先创建线程,若大于corePoolSize,则会将任务加入队列中等待执行,若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建行的线程,诺线程数大于maximumPoolSize,则执行拒绝策略。
使用无界队列(LinkedBlockingQueue):除非系统资源耗尽,否则无界的任务队列不存在入队失败的情况。当有新任务来时,系统的线程数小于corePoolSize时,则新建线程执行任务,当达到corePoolSize后,就不会继续增加,若后续还有新的任务加入,而没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大时,无界队列会保持快速增长,直到耗尽系统内存。注意:maximumPoolSize在无界队列时没有作用
JDK拒绝策略:
AbortPolicy:直接抛出异常组织系统正常工作 CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务
DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务
DiscardPolicy:丢弃无法处理的任务,不做任何处理。
如果需要自定义拒绝策略可以实现RejectedExecutionHandler接口
 
有届队列的例子
 
复制代码
public class UseThreadPoolExecutor1 {
    public static void main(String[] args) {
        /**
         * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
         * 若大于corePoolSize,则会将任务加入队列,
         * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
         * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
         * 
         */    
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1,                 //coreSize
                2,                 //MaxSize
                60,             //60
                TimeUnit.SECONDS, 
                new ArrayBlockingQueue<Runnable>(3)            //指定一种队列 (有界队列)
                //new LinkedBlockingQueue<Runnable>()
//                , new MyRejected()
                //, new DiscardOldestPolicy()
                );
        
        MyTask mt1 = new MyTask(1, "任务1");
        MyTask mt2 = new MyTask(2, "任务2");
        MyTask mt3 = new MyTask(3, "任务3");
        MyTask mt4 = new MyTask(4, "任务4");
        MyTask mt5 = new MyTask(5, "任务5");
        MyTask mt6 = new MyTask(6, "任务6");
        
        pool.execute(mt1);
        pool.execute(mt2);
        pool.execute(mt3);
        pool.execute(mt4);
        pool.execute(mt5);
        pool.execute(mt6);
        
        pool.shutdown();
        
    }
}
复制代码
只执行前四个任务(把任务5,6先不执行)
 
run taskId =1
run taskId =2
run taskId =3
run taskId =4
它是一个一个执行的,原因是第一个线程来的时候,立即执行了(coreSize=1),当后面三个线程来的时候会被加入到queue中(new ArrayBlockingQueue<Runnable>(3))
 
只执行前五个任务
 
run taskId =1
run taskId =5
run taskId =2
run taskId =3
run taskId =4
它是两个两个执行的,原因是第五个任务来的时候已经超出了queue的size(3),但是总线程数有小于MaxSize(2),所以任务5和任务1一起执行了,之后会从queue中拿两个再执行
 
执行留个任务
 
复制代码
run taskId =1
Exception in thread "main" run taskId =5
java.util.concurrent.RejectedExecutionException: Task 6 rejected from java.util.concurrent.ThreadPoolExecutor@4e25154f[Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at com.bjsxt.height.concurrent018.UseThreadPoolExecutor1.main(UseThreadPoolExecutor1.java:46)
run taskId =2
run taskId =3
run taskId =4
复制代码
任务6来的时候,queue已经满了,总线程数也满了,这是会执行拒绝策略,JDK默认的是AbortPolicy,直接抛出异常,我们也可以改,就想代码里注释掉的一样
 
无界队列的例子
 
复制代码
public class UseThreadPoolExecutor2 implements Runnable{
 
    private static AtomicInteger count = new AtomicInteger(0);
    
    @Override
    public void run() {
        try {
            int temp = count.incrementAndGet();
            System.out.println("任务" + temp);
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) throws Exception{
        //System.out.println(Runtime.getRuntime().availableProcessors());
        BlockingQueue<Runnable> queue = 
                new LinkedBlockingQueue<Runnable>();
        ExecutorService executor  = new ThreadPoolExecutor(
                    5,         //core
                    10,     //max
                    120L,     //2fenzhong
                    TimeUnit.SECONDS,
                    queue);
        
        for(int i = 0 ; i < 20; i++){
            executor.execute(new UseThreadPoolExecutor2());
        }
        Thread.sleep(1000);
        System.out.println("queue size:" + queue.size());        //10
        Thread.sleep(2000);
    }
}
复制代码
执行结果:
 
复制代码
任务1
任务2
任务3
任务4
任务5
queue size:15
任务6
任务7
任务9
任务8
任务10
任务11
任务12
任务13
任务14
任务15
任务16
任务17
任务19
任务20
任务18
复制代码
任务是5个5个执行的,而且把剩余的任务全部加到queue中。第二个参数没有作用,不会创建10个线程,如果是有届队列,就会起作用。
 
Task的代码:
 
 
复制代码
public class MyTask implements Runnable {
 
    private int taskId;
    private String taskName;
    
    public MyTask(int taskId, String taskName){
        this.taskId = taskId;
        this.taskName = taskName;
    }
    
    public int getTaskId() {
        return taskId;
    }
 
    public void setTaskId(int taskId) {
        this.taskId = taskId;
    }
 
    public String getTaskName() {
        return taskName;
    }
 
    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }
 
    @Override
    public void run() {
        try {
            System.out.println("run taskId =" + this.taskId);
            Thread.sleep(5*1000);
            //System.out.println("end taskId =" + this.taskId);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        
    }
    
    public String toString(){
        return Integer.toString(this.taskId);
    }
 
}
复制代码
 MyRejected的代码:
 
 
复制代码
public class MyRejected implements RejectedExecutionHandler{
 
    
    public MyRejected(){
    }
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("自定义处理..");
        System.out.println("当前被拒绝任务为:" + r.toString());
    }
 
}
复制代码