Java入门系列之线程池ThreadPoolExecutor原理剖析思索(十五)

前言

关于线程池原理剖析请参看《http://objcoding.com/2019/04/25/threadpool-running/》,建议对原理不太领会的童鞋先看下此文然后再来看本文,这里通过对原理的学习我谈谈对线程池的明白,若有错误之处,还望批评指正。

线程池思索

线程池我们可以为是准备好执行应用程序级义务的预先实例化的备用线程聚集,线程池通过同时运行多个义务来提高性能,同时防止线程建立过程中的时间和内存开销,例如,一个Web服务器在启动时实例化线程池,这样当客户端请求进入时,它就不会花时间建立线程,与为每个义务都建立线程相比,线程池通过制止一次无限建立线程来制止资源(处置器,内核,内存等)用尽,建立一定数目的线程后,通常将多余的义务放在守候行列中,直到有线程可用于新义务。下面我们通过一个简朴的例子来归纳综合线程池原理,如下:

    public static void main(String[] args) {

        ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<>(5);

        ThreadPoolExecutor poolExecutor =
                new ThreadPoolExecutor(2,
                        5, Long.MAX_VALUE, TimeUnit.NANOSECONDS, arrayBlockingQueue);

        for (int i = 0; i < 11; i++) {
            try {
                poolExecutor.execute(new Task());
            } catch (RejectedExecutionException ex) {
                System.out.println("拒绝义务 = " + (i + 1));
            }
            printStatus(i + 1, poolExecutor);
        }
    }

    static void printStatus(int taskSubmitted, ThreadPoolExecutor e) {
        StringBuilder s = new StringBuilder();
        s.append("事情池巨细 = ")
                .append(e.getPoolSize())
                .append(", 焦点池巨细 = ")
                .append(e.getCorePoolSize())
                .append(", 行列巨细 = ")
                .append(e.getQueue().size())
                .append(", 行列剩余容量 = ")
                .append(e.getQueue().remainingCapacity())
                .append(", 最大池巨细 = ")
                .append(e.getMaximumPoolSize())
                .append(", 提交义务数 = ")
                .append(taskSubmitted);

        System.out.println(s.toString());
    }

    static class Task implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(1000000);
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }

Java入门系列之线程池ThreadPoolExecutor原理剖析思索(十五)

如上例子很好的论述了线程池基本原理,我们声明一个有界行列(容量为5),实例化线程池的焦点池巨细为2,最大池巨细为10,建立线程没有自界说实现,默认通过线程池工厂建立,拒绝计谋为默认,提交11个义务。在启动线程池时,默认情形下它将以无线程启动,当我们提交第一个义务时,将发生第一个事情线程,并将义务移交给该线程,只要当前事情线程数小于设置的焦点池巨细,纵然某些先前建立的焦点线程可能处于空闲状态,也会为每个新提交的义务天生一个新的事情线程(注重:当事情线程池巨细未跨越焦点池巨细时以建立的Worker中的第一个义务执行即firstTask,而绕过了壅闭行列),若跨越焦点池巨细会将义务放入壅闭行列,一旦壅闭行列满后将重新建立线程义务,若义务跨越最大线程池巨细将执行拒绝计谋。当壅闭行列为无界行列(如LinkedBlockingQueue),很显然设置的最大池巨细将无效。我们再来论述下,当事情线程数到达焦点池巨细时,若此时提交的义务越来越多,线程池的具体表现行为是什么呢?

1、只要有任何空闲的焦点线程(先前建立的事情线程,但已经完成分配的义务),它们将接受提交的新义务并执行。

2、若是没有可用的空闲焦点线程,则每个提交的新义务都将进入已界说的事情行列中,直到有一个焦点线程可以处置它为止。若是事情行列已满,但仍然没有足够的空闲焦点线程来处置义务,那么线程池将恢复而建立新的事情线程,新义务将由它们来执行。 一旦事情线程数到达最大池巨细,线程池将再次住手建立新的事情线程,而且在此之后提交的所有义务都将被拒绝。

由上述2我们知道,一旦到达焦点线程巨细就会进入壅闭行列(壅闭行列未满),我们可以为这是一种执行壅闭行列优先的机制,那我们是不是可以思索一个问题:何不建立非焦点线程来扩展线程池巨细而不是进入壅闭行列,当到达最大池巨细时才进入壅闭行列举行排队,这种方式和默认实现方式在效率和性能上是不是可能会更好呢? 然则从另外一个层面来讲,既然不想很快进入壅闭行列,那么何不将指定的焦点池巨细举行扩展大一些呢?我们知道线程数越多那么将导致显著的数据争用问题,也就是说在非峰值系统中的线程数会许多,以是在峰值系统中通过建立非焦点线程理论上是不是能够比默认立刻进入壅闭行列具有支持规模化的义务加倍具有性能上的优势呢?那么我们怎样才能修改默认操作呢?我们首先来看看在执行义务时的操作

【高并发】不废话,言简意赅介绍BlockingQueue

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
    }
}

第一步获得当前事情线程数若小于焦点池巨细,那么将建立基于焦点池的线程然后执行义务,这一点我们没偏差,第二步若事情线程巨细跨越焦点池巨细,若当前线程正处于运行状态且将其义务放到壅闭行列中,若失败举行第三步建立非焦点池线程,通过源码剖析得知,若焦点池中线程纵然有空闲线程也会建立线程执行义务,那么我们是不是可以获得焦点池中是否有空闲的线程呢,若有然后才实验使其进入壅闭行列,以是我们需要重写壅闭行列中的offer方式,添加一个是否有空闲焦点池的线程,让其接待义务。以是我们继续上述有界壅闭行列,如下:

public class CustomArrayBlockingQueue<E> extends ArrayBlockingQueue {

    private final AtomicInteger idleThreadCount = new AtomicInteger();

    public CustomArrayBlockingQueue(int capacity) {
        super(capacity);
    }

    @Override
    public boolean offer(Object o) {
        return idleThreadCount.get() > 0 && super.offer(o);
    }
}

然则不幸的是,通过对线程池源码的剖析,我们并不能够获得空闲的焦点池的线程,然则我们可以跟踪焦点池中的空闲线程,在获取义务方式中如下:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
    && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
        return null;
    continue;
}

try {
    Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
        workQueue.take();
    if (r != null)
        return r;
    timedOut = true;
} catch (InterruptedException retry) {
    timedOut = false;
}

如上截取获取义务的焦点,若事情线程巨细大于焦点池巨细时,默认情形下会进入壅闭行列此时通过pool获取壅闭行列中的义务,若事情线程巨细小于焦点池巨细时,此时会挪用take方式获从壅闭行列中获取可用的义务,此时说明当前焦点池线程处于空闲状态,若是行列中没有义务,则线程将在此挪用时会壅闭,直到有可用的义务为止,因此焦点池线程仍然处于空闲状态,以是我们增添上述计数器,否则,挪用方式返回,此时该线程不再处于空闲状态,我们可以削减计数器,重写take方式,如下:

@Override
public Object take() throws InterruptedException {
    idleThreadCount.incrementAndGet();
    Object take = super.take();
    idleThreadCount.decrementAndGet();
    return take;
}

接下来我们再来思量timed为true的情形,在这种情形下,线程将使用poll方式,很显然,进入poll方式的任何线程当前都处于空闲状态,因此我们可以在事情行列中重写此方式的实现,以在开始时增添计数器,然后,我们可以挪用现实的poll方式,这可能导致以下两种情形之,若是行列中没有义务,则线程将守候此挪用以提供所提供的超时,然后返回null。到此时,线程将超时,并将很快从池中退出,从而将空闲线程数削减1,因此我们可以在此时削减计数器,否则由方式挪用返回,因此该线程不再处于空闲状态,此时我们也可以削减计数器。

@Override
public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
    idleThreadCount.incrementAndGet();
    Object poll = super.poll(timeout, unit);
    idleThreadCount.decrementAndGet();
    return poll;
}

通过上述我们对offer、pool、take方式的重写,使得在没有基于焦点池的空闲线程举行扩展非焦点线程,还未竣事,若到达了最大池巨细,此时我们需要将其添加到壅闭行列中排队,以是最终使用我们自界说的壅闭行列,并使用自界说的拒绝计谋,如下:

CustomArrayBlockingQueue<Runnable> arrayBlockingQueue = new CustomArrayBlockingQueue<>(5);

ThreadPoolExecutor poolExecutor =
        new ThreadPoolExecutor(10,
                100, Long.MAX_VALUE, TimeUnit.NANOSECONDS, arrayBlockingQueue
                , Executors.defaultThreadFactory(), (r, executor) -> {
            if (!executor.getQueue().add(r)) {
                System.out.println("拒绝义务");
            }
        });

for (int i = 0; i < 150; i++) {
    try {
        poolExecutor.execute(new Task());
    } catch (RejectedExecutionException ex) {
        System.out.println("拒绝义务 = " + (i + 1));
    }
    printStatus(i + 1, poolExecutor);
}

上述我们实现自界说的拒绝计谋,将拒绝的义务放入到壅闭行列中,若壅闭行列已满而不能再吸收新的义务,我们将挪用默认的拒绝计谋或者是其他处置程序,以是在将义务添加到壅闭行列中即挪用add方式时,我们还需要重写add方式,如下:

@Override
public boolean add(Object o) {
    return super.offer(o);
}

Java入门系列之线程池ThreadPoolExecutor原理剖析思索(十五)

总结

以上详细内容只是针对线程池的默认实现而引发的思索,通过如上方式是否能够对于规模化的义务处置起来在性能上有一定改善呢?可能也有思虑不周全的地方,暂且剖析于此。

原创文章,作者:2d28新闻网,如若转载,请注明出处:https://www.2d28.com/archives/5113.html