Administrator
发布于 2022-12-28 / 65 阅读
0
0

Java Executor Best Practise

内存模型

Java内存模型的主要目的是定义程序中 各种变量的访问规则,即关注在虚拟机中 把变量值存储到内存从内存中取出变量值 这样的底层细节

此处的变量(Variables)与Java编程中所说的变量有所区别,它包括了实例字段、静态字段和构成数组对象的元素,但是不包括局部变量与方法参数,因为后
者是线程私有的[1],不会被共享,自然就不会存在竞争问题。

Java内存模型规定了所有的变量都存储在主内存(Main Memory)中(此处的主内存与介绍物理硬件时提到的主内存名字一样,两者也可以类比,但物理上它仅是虚拟机内存的一部分)。

每条线程还有自己的工作内存(Working Memory,可与前面讲的处理器高速缓存类比),线程的工作内存中保存了被该线程使用的变量的主内存副本[2],线程对变量的所有操作(读取、赋值等)都必须在工作内存中进行,而不能直接读写主内存中的数据[3]。

不同的线程之间也无法直接访问对方工作内存中的变量,线程间变量值的传递均需要通过主内存来完成,线程、主内存、工作内存三者的交互关系如图12-2所示,注意与图12-1进行对比。

image-20221011105922352

注意:

如果局部变量是一个reference类型,它引用的对象在Java堆中可被各个线程共享,但是reference本身在Java栈的局部变量表中是线程私有的

根据《Java虚拟机规范》的约定,volatile变量依然有工作内存的拷贝,但是由于它特殊的操作顺序性规定(后文会讲到),所以看起来如同直接在主内存中读写访问一般,因此这里的描述对于volatile也并不存在例外。

内存间交互操作

一个变量如何从主内存拷贝到工作内存、如何从工作内存同步回主内存这一类的实现细节,Java内存模型中定义了以下8种操作来完成

Java虚拟机实现时必须保证下面提及的每一种操作都是原子的、不可再分的

·lock(锁定):作用于主内存的变量,它把一个变量标识为一条线程独占的状态。

·unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。

·read(读取):作用于主内存的变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用。

·load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。

·use(使用):作用于工作内存的变量,它把工作内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。

·assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收的值赋给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。

·store(存储):作用于工作内存的变量,它把工作内存中一个变量的值传送到主内存中,以便随后的write操作使用。

·write(写入):作用于主内存的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中。

如果要把一个变量从主内存拷贝到工作内存,那就要按顺序执行read和load操作,如果要把变量从工作内存同步回主内存,就要按顺序执行store和write操作。

注意,Java内存模型只要求上述两个操作必须按顺序执行,但不要求是连续执行。

也就是说read与load之间、store与write之间是可插入其他指令的,如对主内存中的变量a、b进行访问时,一种可能出现的顺序是read a、read b、load b、load a。

注意:

·一个变量在同一个时刻只允许一条线程对其进行lock操作,但lock操作可以被同一条线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。

·如果对一个变量执行lock操作,那将会清空工作内存中此变量的值,在执行引擎使用这个变量前,需要重新执行load或assign操作以初始化变量的值。

·如果一个变量事先没有被lock操作锁定,那就不允许对它执行unlock操作,也不允许去unlock一个被其他线程锁定的变量。

·对一个变量执行unlock操作之前,必须先把此变量同步回主内存中(执行store、write操作)。

volatile—最轻量级的同步机制

当一个变量被定义成volatile之后,它将具备两项特性:

第一项是保证此变量对所有线程的可见性,这里的“可见性”是指当一条线程修改了这个变量的值,新值对于其他线程来说是可以立即得知的。而普通变量并不能做到这一点,普通变量的值在线程间传递时均需要通过主内存来完成。

比如,线程A修改一个普通变量的值,然后向主内存进行回写,另外一条线程B在线程A回写完成了之后再对主内存进行读取操作,新变量值才会对线程B可见。

关于volatile变量的可见性,经常会被开发人员误解,他们会误以为下面的描述是正确的:“volatile变量对所有线程是立即可见的,对volatile变量所有的写操作都能立刻反映到其他线程之中。

换句话说,volatile变量在各个线程中是一致的,所以基于volatile变量的运算在并发下是线程安全的”。这句话的论据部分并没有错,但是由其论据并不能得出“基于volatile变量的运算在并发下是线程安全的”这样的结论。

**volatile变量在各个线程的工作内存中是不存在一致性问题的(**从物理存储的角度看,各个线程的工作内存中volatile变量也可以存在不一致的情况,但由于每次使用之前都要先刷新,执行引擎看不到不一致的情况,因此可以认为不存在一致性问题),但是Java里面的运算操作符并非原子操作,这导致volatile变量的运算在并发下一样是不安全的,我们可以通过一段简单的演示来说明原因,请看代码清单12-1中演示的例子。

package com.wpvolatile;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class TestVolatile {

    private static volatile int race = 0;

    private static final int THREAD_COUNT = 20;

    // 创建一个线程池
    private ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT, new ThreadFactory() {
        private final AtomicInteger poolNum = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            String threadName = String.valueOf(poolNum.getAndIncrement());
            return new Thread(r, "wptest-threadPool-" + threadName);

        }
    });


    public static void increase() {
        race++;
    }

    public static void main(String[] args) throws InterruptedException {
        TestVolatile testVolatile = new TestVolatile();
        for (int j = 0; j < THREAD_COUNT; j++) {
            testVolatile.executorService.execute(() -> {
                for (int i = 0; i < 1000; i++) {
                    increase();
                }
            });
        }


        TimeUnit.SECONDS.sleep(8);
        System.out.println(race);
    }

}

这段代码发起了20个线程,每个线程对race变量进行10000次自增操作,如果这段代码能够正确并发的话,最后输出的结果应该是200000。读者运行完这段代码之后,并不会获得期望的结果,而且会发现每次运行程序,输出的结果都不一样,都是一个小于200000的数字。这是为什么呢?

问题就出在自增运算“race++”之中,我们用Javap反编译这段代码后会得到代码清单12-2所示,发现只有一行代码的increase()方法在Class文件中是由4条字节码指令构成(return指令不是由race++产生的,这条指令可以不计算),

从字节码层面上已经很容易分析出并发失败的原因了:当getstatic指令把race的值取到操作栈顶时,volatile关键字保证了race的值在此时是正确的,但是在执行iconst_1、iadd这些指令的时候,其他线程可能已经把race的值改变了,而操作栈顶的值就变成了过期的数据,所以putstatic指令执行后就可能把较小的race值同步回主内存之中。

public static void increase();
Code:
Stack=2, Locals=0, Args_size=0
0: getstatic #13; //Field race:I
3: iconst_1
4: iadd
5: putstatic #13; //Field race:I
8: return
LineNumberTable:
line 14: 0
line 15: 8

由于volatile变量只能保证可见性,在不符合以下两条规则的运算场景中,我们仍然要通过加锁(使用synchronized、java.util.concurrent中的锁或原子类)来保证原子性:

·运算结果并不依赖变量的当前值,或者能够确保只有单一的线程修改变量的值。
·变量不需要与其他的状态变量共同参与不变约束。

其实总结下:就是,如果总体来看,只更改volatile变量 一次,那么在并发操作下,就是安全的。比如,把一个变量,从false改成true,后面就不再更改了,只会做基于这个变量的值,做判断,这种场景下,这个volatile变量 ,就是并发安全的。

比如,下面的代码,这类场景中就很适合使用volatile变量来控制并发,当shutdown()方法被调用时,能保证所有线程中执行的doWork()方法都立即停下来。

private volatile boolean shutdownRequested = false;

    public void doWork() {
        while (!shutdownRequested) {
            System.out.println("gogogo");
        }
        System.out.println("dowork  finish------");
    }

    public void shutdown() {
        shutdownRequested = true;
    }

完整代码如下:

package com.wpvolatile;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class TestVolatile {


    private static final int THREAD_COUNT = 20;

    // 创建一个线程池
    private ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT, new ThreadFactory() {
        private final AtomicInteger poolNum = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            String threadName = String.valueOf(poolNum.getAndIncrement());
            return new Thread(r, "wptest-threadPool-" + threadName);

        }
    });

    private volatile boolean shutdownRequested = false;

    public void doWork() {
        while (!shutdownRequested) {
            System.out.println("gogogo");
        }
        System.out.println("dowork  finish------");
    }

    public void shutdown() {
        shutdownRequested = true;
    }

    public static void main(String[] args) throws InterruptedException {
        TestVolatile testVolatile = new TestVolatile();
        for (int j = 0; j < THREAD_COUNT - 1; j++) {
            testVolatile.executorService.execute(() -> {
                testVolatile.doWork();
            });
        }

        TimeUnit.SECONDS.sleep(2);
        testVolatile.executorService.execute(() -> {
            testVolatile.shutdown();
        });

        TimeUnit.SECONDS.sleep(2);
    }

}

使用volatile变量的第二个语义是禁止指令重排序优化,普通的变量仅会保证在该方法的执行过程中所有依赖赋值结果的地方都能获取到正确的结果,而不能保证变量赋值操作的顺序与程序代码中的执行顺序一致。

上面描述仍然比较拗口难明,我们还是继续通过一个例子来看看为何指令重排序会干扰程序的并发执行。演示程序如代码清单12-4所示。

代码清单12-4 指令重排序

Map configOptions;
char[] configText;
// 此变量必须定义为volatile
volatile boolean initialized = false;


// 假设以下代码在线程A中执行
// 模拟读取配置信息,当读取完成后
// 将initialized设置为true,通知其他线程配置可用
configOptions = new HashMap();
configText = readConfigFile(fileName);
processConfigOptions(configText, configOptions);
initialized = true;

// 假设以下代码在线程B中执行
// 等待initialized为true,代表线程A已经把配置信息初始化完成
while (!initialized) {
	sleep();
}

// 使用线程A中初始化好的配置信息
doSomethingWithConfig();

代码清单12-4中所示的程序是一段伪代码,其中描述的场景是开发中常见配置读取过程,只是我们在处理配置文件时一般不会出现并发,所以没有察觉这会有问题。

读者试想一下,如果定义initialized变量时没有使用volatile修饰,就可能会由于指令重排序的优化,导致位于线程A中最后一条代码“initialized=true”被提前执行(这里虽然使用Java作为伪代码,但所指的重排序优化是机器级的优化操作,提前执行是指这条语句对应的汇编代码被提前执行),这样在线程B中使用配置信息的代码就可能出现错误,而volatile关键字则可以避免此类情况的发生

笔者再举一个可以实际操作运行的例子来分析volatile关键字是如何禁止指令重排序优化的

清单12-5所示是一段标准的双锁检测(Double Check Lock,DCL)单例[3]代码,可以观察加入volatile和未加入volatile关键字时所生成的汇编代码的差别

public class Singleton {
    
    private volatile static Singleton instance;
    
    public static Singleton getInstance() {
   		if (instance == null) {
    		synchronized (Singleton.class) {
    			if (instance == null) {
    				instance = new Singleton();
    			}
    		}
    	}
    	return instance;
    }
    
    public static void main(String[] args) {
    	Singleton.getInstance();
    }
}

在众多保障并发安全的工具中选用volatile的意义——它能让我们的代码比使用其他的同步工具更快吗?在某些情况下,volatile的同步机制的性能确实要优于锁
(使用synchronized关键字或java.util.concurrent包里面的锁),但是由于虚拟机对锁实行的许多消除和优化,使得我们很难确切地说volatile就会比synchronized快上多少。

如果让volatile自己与自己比较,那可以确定一个原则:volatile变量读操作的性能消耗与普通变量几乎没有什么差别,但是写操作则可能会慢上一些,因为它需要在本地代码中插入许多内存屏障指令来保证处理器不发生乱序执行。不过即便如此,大多数场景下volatile的总开销仍然要比锁来得更低。

我们在volatile与锁中选择的唯一判断依据仅仅是volatile的语义能否满足使用场景的需求。

原子性、可见性与有序性

原子性(Atomicity)

由Java内存模型来直接保证的原子性变量操作包括read、load、assign、use、store和write这六个,我们大致可以认为,基本数据类型的访问、读写都是具备原子性的(例外就是long和double的非原子性协定,读者只要知道这件事情就可以了,无须太过在意这些几乎不会发生的例外情况)。

如果应用场景需要一个更大范围的原子性保证(经常会遇到),Java内存模型还提供了lock和unlock操作来满足这种需求,尽管虚拟机未把lock和unlock操作直接开放给用户使用,但是却提供了更高层次的字节码指令monitorenter和monitorexit来隐式地使用这两个操作。这两个字节码指令反映到Java代码中就是同步块——synchronized关键字,因此在synchronized块之间的操作也具备原子性

可见性(Visibility)

可见性就是指当一个线程修改了共享变量的值时,其他线程能够立即得知这个修改。

上文在讲解volatile变量的时候我们已详细讨论过这一点。

Java内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值这种依赖主内存作为传递媒介的方式来实现可见性的,无论是
普通变量还是volatile变量都是如此。

普通变量与volatile变量的区别是,volatile的特殊规则保证了新值能立即同步到主内存,以及每次使用前立即从主内存刷新。因此我们可以说volatile保证了多线程操作时变量的可见性,而普通变量则不能保证这一点。

除了volatile之外,Java还有两个关键字能实现可见性,它们是synchronized和final。

同步块的可见性是由“对一个变量执行unlock操作之前,必须先把此变量同步回主内存中(执行store、write操作)”这条规则获得的。

而final关键字的可见性是指:被final修饰的字段在构造器中一旦被初始化完成,并且构造器没有把“this”的引用传递出去(this引用逃逸是一件很危险的事情,其他线程有可能通过这个引用访问到“初始化了一半”的对象),那么在其他线程中就能看见final字段的值。

如代码清单12-7所示,变量i与j都具备可见性,它们无须同步就能被其他线程正确访问。

public static final int i;
public final int j;

static {
        i = 0;
        // 省略后续动作
    }

    {
        // 也可以选择在构造函数中初始化
        j = 0;
        // 省略后续动作
    }

有序性(Ordering)

Java语言提供了volatile和synchronized两个关键字来保证线程之间操作的有序性,volatile关键字本身就包含了禁止指令重排序的语义,

而synchronized则是由“一个变量在同一个时刻只允许一条线程对其进行lock操作”这条规则获得的,这个规则决定了持有同一个锁的两个同步块只能串行地进入

先行发生原则

如果Java内存模型中所有的有序性都仅靠volatile和synchronized来完成,那么有很多操作都将会变得非常啰嗦,但是我们在编写Java并发代码的时候并没有察觉到这一点,这是因为Java语言中有一个“先行发生”(Happens-Before)的原则。

这个原则非常重要,它是判断数据是否存在竞争,线程是否安全的非常有用的手段。依赖这个原则,我们可以通过几条简单规则一揽子解决并发环境下两个操
作之间是否可能存在冲突的所有问题,而不需要陷入Java内存模型苦涩难懂的定义之中。

现在就来看看“先行发生”原则指的是什么。先行发生是Java内存模型中定义的两项操作之间的偏序关系,比如说操作A先行发生于操作B,其实就是说在发生操作B之前,操作A产生的影响能被操作B观察到,“影响”包括修改了内存中共享变量的值、发送了消息、调用了方法等。这句话不难理解,但它意味着什么呢?

我们可以举个例子来说明一下。如代码清单12-8所示的这三条伪代码。

// 以下操作在线程A中执行
i = 1;
// 以下操作在线程B中执行
j = i;
// 以下操作在线程C中执行
i = 2;

假设线程A中的操作“i=1”先行发生于线程B的操作“j=i”,那我们就可以确定在线程B的操作执行后,变量j的值一定是等于1,得出这个结论的依据有两个:一是根据先行发生原则,“i=1”的结果可以被观察到;二是线程C还没登场,线程A操作结束之后没有其他线程会修改变量i的值。

现在再来考虑线程C,我们依然保持线程A和B之间的先行发生关系,而C出现在线程A和B的操作之间,但是C与B没有先行发生关系,那j的值会是多少呢?答案是不确定!1和2都有可能,因为线程C对变量i的影响可能会被线程B观察到,也可能不会,这时候线程B就存在读取到过期数据的风险,不具备多线程安全性。

下面是Java内存模型下一些“天然的”先行发生关系,这些先行发生关系无须任何同步器协助就已经存在,可以在编码中直接使用。如果两个操作之间的关系不在此列,并且无法从下列规则推导出来,则它们就没有顺序性保障,虚拟机可以对它们随意地进行重排序。

·程序次序规则(Program Order Rule):在一个线程内,按照控制流顺序,书写在前面的操作先行发生于书写在后面的操作。注意,这里说的是控制流顺序而不是程序代码顺序,因为要考虑分支、循环等结构。

·管程锁定规则(Monitor Lock Rule):一个unlock操作先行发生于后面对同一个锁的lock操作。这里必须强调的是“同一个锁”,而“后面”是指时间上的先后。

·volatile变量规则(Volatile Variable Rule):对一个volatile变量的写操作先行发生于后面对这个变量的读操作,这里的“后面”同样是指时间上的先后。

·线程启动规则(Thread Start Rule):Thread对象的start()方法先行发生于此线程的每一个动作。

·线程终止规则(Thread Termination Rule):线程中的所有操作都先行发生于对此线程的终止检测,我们可以通过Thread::join()方法是否结束Thread::isAlive()的返回值等手段检测线程是否已经终止执行。

·线程中断规则(Thread Interruption Rule):对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通Thread::interrupted()方法检测到是否有中断发生。

·对象终结规则(Finalizer Rule):一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始。

·传递性(Transitivity):如果操作A先行发生于操作B,操作B先行发生于操作C,那就可以得出操作A先行发生于操作C的结论。

Java语言无须任何同步手段保障就能成立的先行发生规则有且只有上面这些,下面演示一下如何使用这些规则去判定操作间是否具备顺序性,对于读写共享变量的操作来说,就是线程是否安全。

读者还可以从下面这个例子中感受一下“时间上的先后顺序”与“先行发生”之间有什么不同。

演示例子如代码清单12-9所示。

private int value = 0;

pubilc void setValue(int value){
	this.value = value;
}

public int getValue(){
	return value;
}

代码清单12-9中显示的是一组再普通不过的getter/setter方法,假设存在线程A和B,线程A先(时间上的先后)调用了setValue(1),然后线程B调用了同一个对象的getValue(),那么线程B收到的返回值是什么?

我们依次分析一下先行发生原则中的各项规则。由于两个方法分别由线程A和B调用,不在一个线程中,所以程序次序规则在这里不适用;由于没有同步块,自然就不会发生lock和unlock操作,所以管程锁定规则不适用;由于value变量没有被volatile关键字修饰,所以volatile变量规则不适用;后面的线程启动、终止、中断规则和对象终结规则也和这里完全没有关系。因为没有一个适用的先行发生规则,所以最后一条传递性也无从谈起,

因此我们可以判定,尽管线程A在操作时间上先于线程B,但是无法确定线程B中getValue()方法的返回结果,换句话说,这里面的操作不是线程安全的。

那怎么修复这个问题呢?我们至少有两种比较简单的方案可以选择:要么把getter/setter方法都定义为synchronized方法,这样就可以套用管程锁定规则;要么把value定义为volatile变量,由于setter方法对value的修改不依赖value的原值,满足volatile关键字使用场景,这样就可以套用volatile变量规则来实现先行发生关系。

通过上面的例子,我们可以得出结论:一个操作“时间上的先发生”不代表这个操作会是“先行发生”。那如果一个操作“先行发生”,是否就能推导出这个操作必定是“时间上的先发生”呢?很遗憾,这个推论也是不成立的

一个典型的例子就是多次提到的“指令重排序”,

// 以下操作在同一个线程中执行
int i = 1;
int j = 2;

代码清单12-10所示的两条赋值语句在同一个线程之中,根据程序次序规则,“int i=1”的操作先行发生于“int j=2”,但是“int j=2”的代码完全可能先被处理器执行,这并不影响先行发生原则的正确性,因为我们在这条线程之中没有办法感知到这一点。

上面两个例子综合起来证明了一个结论:时间先后顺序与先行发生原则之间基本没有因果关系,所以我们衡量并发安全问题的时候不要受时间顺序的干扰,一切必须以先行发生原则为准。

Java多线程

状态转换

Java语言定义了6种线程状态,在任意一个时间点中,一个线程只能有且只有其中的一种状态,并且可以通过特定的方法在不同状态之间转换。这6种状态分别是:

·新建(New):创建后尚未启动的线程处于这种状态。

·运行(Runnable):包括操作系统线程状态中的Running和Ready,也就是处于此状态的线程有可能正在执行,也有可能正在等待着操作系统为它分配执行时间。

·无限期等待(Waiting):处于这种状态的线程不会被分配处理器执行时间,它们要等待被其他线程显式唤醒。以下方法会让线程陷入无限期的等待状态:
■没有设置Timeout参数的Object::wait()方法;
■没有设置Timeout参数的Thread::join()方法;
■LockSupport::park()方法。

·限期等待(Timed Waiting):处于这种状态的线程也不会被分配处理器执行时间,不过无须等待被其他线程显式唤醒,在一定时间之后它们会由系统自动唤醒。以下方法会让线程进入限期等待状态:

■Thread::sleep()方法;
■设置了Timeout参数的Object::wait()方法;
■设置了Timeout参数的Thread::join()方法;
■LockSupport::parkNanos()方法;
■LockSupport::parkUntil()方法。

·阻塞(Blocked):线程被阻塞了,“阻塞状态”与“等待状态”的区别是“阻塞状态”在等待着获取到一个排它,这个事件将在另外一个线程放弃这个锁的时候发生;而“等待状态”则是在等待一段时间,或者唤醒动作的发生。在程序等待进入同步区域的时候,线程将进入这种状态。

·结束(Terminated):已终止线程的线程状态,线程已经结束执行。

上述6种状态在遇到特定事件发生的时候将会互相转换,它们的转换关系如图12-6所示。

image-20221011174454653

线程安全 & 锁优化

在软件业发展的初期,程序编写都是以算法为核心的,程序员会把数据和过程分别作为独立的部分来考虑,数据代表问题空间中的客体,程序代码则用于处理这些数据,这种思维方式直接站在计算机的角度去抽象问题和解决问题,被称为面向过程的编程思想。

与此相对,面向对象的编程思想则站在现实世界的角度去抽象和解决问题,它把数据和行为都看作对象的一部分,这样可以让程序员能以符合现实世界的思维方式来编写和组织程序。

线程安全

笔者认为《Java并发编程实战(Java Concurrency In Practice)》的作者Brian Goetz为“线程安全”做出了一个比较恰当的定义:“当多个线程同时访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步,或者在调用方进行任何其他的协调操作,调用这个对象的行为都可以获得正确的结果,那就称这个对象是线程安全的。”

这个定义就很严谨而且有可操作性,它要求线程安全的代码都必须具备一个共同特征:代码本身封装了所有必要的正确性保障手段(如互斥同步等),令调用者无须关心多线程下的调用问题,更无须自己实现任何措施来保证多线程环境下的正确调用。

这点听起来简单,但其实并不容易做到,在许多场景中,我们都会将这个定义弱化一些。如果把“调用这个对象的行为”限定为“单次调用”,这个定义的其他描述能够成立的话,那么就已经可以称它是线程安全了。

不可变

在Java语言里面(特指JDK 5以后,即Java内存模型被修正之后的Java语言),不可变(Immutable)的对象一定是线程安全的,无论是对象的方法实现还是方法的调用者,都不需要再进行任何线程安全保障措施

在第10章里我们讲解“final关键字带来的可见性”时曾经提到过这一点:只要一个不可变的对象被正确地构建出来(即没有发生this引用逃逸的情况),那其外部的可见状态永远都不会改变,永远都不会看到它在多个线程之中处于不一致的状态

Java语言中,如果多线程共享的数据是一个基本数据类型,那么只要在定义时使用final关键字修饰它就可以保证它是不可变的。

如果共享数据是一个对象,由于Java语言目前暂时还没有提供值类型的支持,那就需要对象自行保证其行为不会对其状态产生任何影响才行

如果读者没想明白这句话所指的意思,不妨类比java.lang.String类的对象实例,它是一个典型的不可变对象,用户调用它的substring()、replace()和concat()这些方法都不会影响它原来的值,只会返回一个新构造的字符串对象。

保证对象行为不影响自己状态的途径有很多种,最简单的一种就是把对象里面带有状态的变量都声明为final,这样在构造函数结束之后,它就是不可变的,例如代码清单13-1中所示的java.lang.Integer构造函数,它通过将内部状态变量value定义为final来保障状态不变。

JDK中Integer类的构造函数:

/**
* The value of the <code>Integer</code>.
* @serial
*/
private final int value;
/**
* Constructs a newly allocated <code>Integer</code> object that
* represents the specified <code>int</code> value.
*
* @param value the value to be represented by the
* <code>Integer</code> object.
*/
public Integer(int value) {
this.value = value;
}

线程安全的实现方法

Mutual Exclusion & Synchronization (互斥同步)

互斥同步(Mutual Exclusion & Synchronization)是一种最常见也是最主要的并发正确性保障手段。

同步是指在多个线程并发访问共享数据时,保证共享数据在同一个时刻只被一条(或者是一些,当使用信号量的时候)线程使用

而互斥是实现同步的一种手段,临界区(Critical Section)、互斥量(Mutex)和信号量(Semaphore)都是常见的互斥实现方式。

因此在“互斥同步”这四个字里面,互斥是因,同步是果;互斥是方法,同步是目的

在Java里面,最基本的互斥同步手段就是synchronized关键字,这是一种块结构(Block Structured)的同步语法。

synchronized关键字经过Javac编译之后,会在同步块的前后分别形成monitorenter和monitorexit这两个字节码指令。

这两个字节码指令都需要一个reference类型的参数来指明要锁定和解锁的对象。

如果Java源码中的synchronized明确指定了对象参数,那就以这个对象的引用作为reference;如果没有明确指定,那将根据synchronized修饰的方法类型(如实例方法或类方法),来决定是取代码所在的对象实例还是取类型对应的Class对象来作为线程要持有的锁。

根据《Java虚拟机规范》的要求,在执行monitorenter指令时,首先要去尝试获取对象的锁。如果这个对象没被锁定,或者当前线程已经持有了那个对象的锁,就把锁的计数器的值增加一,而在执行monitorexit指令时会将锁计数器的值减一。

一旦计数器的值为零,锁随即就被释放了。

如果获取对象锁失败,那当前线程就应当被阻塞等待直到请求锁定的对象被持有它的线程释放为止

从功能上看,根据以上《Java虚拟机规范》对monitorenter和monitorexit的行为描述,我们可以得出两个关于synchronized的直接推论,这是使用它时需特别注意的:

·被synchronized修饰的同步块对同一条线程来说是可重入的。这意味着同一线程反复进入同步块也不会出现自己把自己锁死的情况

·被synchronized修饰的同步块在持有锁的线程执行完毕并释放锁之前,会无条件地阻塞后面其他线程的进入。这意味着无法像处理某些数据库中的锁那样,强制已获取锁的线程释放锁;也无法强制正在等待锁的线程中断等待或超时退出

自JDK 5起(实现了JSR 166[1]),Java类库中新提供了java.util.concurrent包(下文称J.U.C包),其中java.util.concurrent.locks.Lock接口便成了Java的另一种全新的互斥同步手段

基于Lock接口,用户能够以非块结构(Non-Block Structured)来实现互斥同步,从而摆脱了语言特性的束缚,改为在类库层面
去实现同步,这也为日后扩展出不同调度算法、不同特征、不同性能、不同语义的各种锁提供了广阔的空间。

重入锁(ReentrantLock)是Lock接口最常见的一种实现[2],顾名思义,它与synchronized一样是可重入[3]的

在基本用法上,ReentrantLock也与synchronized很相似,只是代码写法上稍有区别而已。

不过,ReentrantLock与synchronized相比增加了一些高级功能,主要有以下三项:等待可中断、可实现公平锁及锁可以绑定多个条件

·等待可中断:是指当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。可中断特性对处理执行时间非常长的同步块很有帮助。

·公平锁:是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁;而非公平锁则不保证这一点,在锁被释放时,任何一个等待锁的线程都有机会获得锁。synchronized中的锁是非公平的,ReentrantLock在默认情况下也是非公平的,但可以通过带布尔值的构造函数要求使用公平锁。不过一旦使用了公平锁,将会导致ReentrantLock的性能急剧下降,会明显影响吞吐量。

·锁绑定多个条件:是指一个ReentrantLock对象可以同时绑定多个Condition对象。在synchronized中,锁对象的wait()跟它的notify()或者notifyAll()方法配合可以实现一个隐含的条件,如果要和多于一个的条件关联的时候,就不得不额外添加一个锁;而ReentrantLock则无须这样做,多次调用newCondition()方法即可

Non-Blocking Synchronization(非阻塞同步)

互斥同步面临的主要问题是进行线程阻塞和唤醒所带来的性能开销,因此这种同步也被称为阻塞同步(Blocking Synchronization)。

从解决问题的方式上看,互斥同步属于一种悲观的并发策略,其总是认为只要不去做正确的同步措施(例如加锁),那就肯定会出现问题,无论共享的数据是否真的会出现竞争,它都会进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加
锁),这将会导致用户态到核心态转换、维护锁计数器和检查是否有被阻塞的线程需要被唤醒等开销

随着硬件指令集的发展,我们已经有了另外一个选择:基于冲突检测的乐观并发策略,通俗地说就是不管风险,先进行操作,如果没有其他线程争用共享数据,那操作就直接成功了;如果共享的数据的确被争用,产生了冲突,那再进行其他的补偿措施,最常用的补偿措施是不断地重试,直到出现没有竞争的共享数据为止

这种乐观并发策略的实现不再需要把线程阻塞挂起,因此这种同步操作被称为非阻塞同步(Non-Blocking Synchronization),使用这种措施的代码也常被称为无锁(Lock-Free)编程。

为什么笔者说使用乐观并发策略需要“硬件指令集的发展”?因为我们必须要求操作和冲突检测这两个步骤具备原子性

靠什么来保证原子性?如果这里再使用互斥同步来保证就完全失去意义了,所以我们只能靠硬件来实现这件事情,硬件保证某些从语义上看起来需要多次操作的行为可以只通过一条处理器指令就能完成,这类指令常用的有:

·测试并设置(Test-and-Set);
·获取并增加(Fetch-and-Increment);
·交换(Swap);
·比较并交换(Compare-and-Swap,下文称CAS);
·加载链接/条件储存(Load-Linked/Store-Conditional,下文称LL/SC)。

CAS指令需要有三个操作数,分别是内存位置(在Java中可以简单地理解为变量的内存地址,用V表示)、旧的预期值(用A表示)和准备设置的新值(用B表示)。

CAS指令执行时,当且仅当V符合A时,处理器才会用B更新V的值,否则它就不执行更新。

但是,不管是否更新了V的值,都会返回V的旧值,上述的处理过程是一个原子操作,执行期间不会被其他线程中断。

在JDK 5之后,Java类库中才开始使用CAS操作,该操作由sun.misc.Unsafe类里面的compareAndSwapInt()和compareAndSwapLong()等几个方法包装提供。

HotSpot虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器CAS指令,没有方法调用的过程,
或者可以认为是无条件内联进去了[5]。

不过由于Unsafe类在设计上就不是提供给用户程序调用的类(Unsafe::getUnsafe()的代码中限制了只有启动类加载器(Bootstrap ClassLoader)加载的Class才能访问它),因此在JDK 9之前只有Java类库可以使用CAS,譬如J.U.C包里面的整数原子类,其中的
compareAndSet()和getAndIncrement()等方法都使用了Unsafe类的CAS操作来实现。

而如果用户程序也有使用CAS操作的需求,那要么就采用反射手段突破Unsafe的访问限制,要么就只能通过Java类库API来间接使用它。

直到JDK 9之后,Java类库才在VarHandle类里开放了面向用户程序使用的CAS操作。

CAS的经典就是AtomicInteger,他的incrementAndGet()方法,同时保证了原子性、可见性、有序性这3个特性。

尽管CAS看起来很美好,既简单又高效,但显然这种操作无法涵盖互斥同步的所有使用场景,并且CAS从语义上来说并不是真正完美的,它存在一个逻辑漏洞:如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然为A值,那就能说明它的值没有被其他线程改变过了吗?

这是不能的,因为如果在这段期间它的值曾经被改成B,后来又被改回为A,那CAS操作就会误认为它从来没有被改变过。这个漏洞称为CAS操作的“ABA问题”。

J.U.C包为了解决这个问题,提供了一个带有标记的原子引用类AtomicStampedReference,它可以通过控制变量值的版本来保证CAS的正确性。不过目前来说这个类处于相当鸡肋的位置,大部分情况下ABA问题不会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比原子类更为高效。

无同步方案

  1. 可重入代码(Reentrant Code):

可重入代码有一些共同的特征,例如,不依赖全局变量、存储在堆上的数据和公用的系统资源,用到的状态量都由参数中传入,不调用非可重入的方法等。我们可以通过一个比较简单的原则来判断代码是否具备可重入性:如果一个方法的返回结果是可以预测的,只要输入了相同的数据,就都能返回相同的结果,那它就满足可重入性的要求,当然也就是线程安全的。

  1. 线程本地存储(Thread Local Storage):如果一段代码中所需要的数据必须与其他代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。如果能保证,我们就可以把共享数据的可见范围限制在同一个线程之内,这样,无须同步也能保证线程之间不出现数据争用的问题。

符合这种特点的应用并不少见,大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程限制在一个线程中消费完,其中最重要的一种应用实例就是经典Web交互模型中的“一个请求对应一个服务器线程”(Thread-per-Request)的处理方式,这种处理方式的广泛应用使得很多Web服务端应用都可以使用线程本地存储来解决线程安全问题。

Java语言中,如果一个变量要被多线程访问,可以使用volatile关键字将它声明为“易变的”;如果一个变量只要被某个线程独享,Java中就没有类似C++中__declspec(thread)[7]这样的关键字去修饰,不过我们还是可以通过java.lang.ThreadLocal类来实现线程本地存储的功能。

每一个线程的Thread对象中都有一个ThreadLocalMap对象,这个对象存储了一组以ThreadLocal.threadLocalHashCode为键,以本地线程变量为值的K-V值对,ThreadLocal对象就是当前线程的ThreadLocalMap的访问入口,每一个ThreadLocal对象都包含了一个独一无二的threadLocalHashCode值,使用这个值就可以在线程K-V值对中找回对应的本地线程变量。

锁优化

自旋锁与自适应自旋

前面我们讨论互斥同步的时候,提到了互斥同步对性能最大的影响是阻塞的实现,挂起线程和恢复线程的操作都需要转入内核态中完成,这些操作给Java虚拟机的并发性能带来了很大的压力。同时,虚拟机的开发团队也注意到在许多应用上,共享数据的锁定状态只会持续很短的一段时间,为了这段时间去挂起和恢复线程并不值得。现在绝大多数的个人电脑和服务器都是多路(核)处理器系统,如果物理机器有一个以上的处理器或者处理器核心,能让两个或以上的线程同时并行执行,

我们就可以让后面请求锁的那个线程“稍等一会”,但不放弃处理器的执行时间,看看持有锁的线程是否很快就会释放锁。为了让线程等待,我们只须让线程执行一个忙循环(自旋),这项技术就是所谓的自旋锁。

自旋等待不能代替阻塞,且先不说对处理器数量的要求,自旋等待本身虽然避免了线程切换的开销,但它是要占用处理器时间的,所以如果锁被占用的时间很短,自旋等待的效果就会非常好,反之如果锁被占用的时间很长,那么自旋的线程只会白白消耗处理器资源,而不会做任何有价值的工作,这就会带来性能的浪费。

因此自旋等待的时间必须有一定的限度,如果自旋超过了限定的次数仍然没有成功获得锁,就应当使用传统的方式去挂起线程。自旋次数的默认值是十次,用户也可以使用参数-XX:PreBlockSpin来自行更改。

不过无论是默认值还是用户指定的自旋次数,对整个Java虚拟机中所有的锁来说都是相同的。在JDK 6中对自旋锁的优化,引入了自适应的自旋。

自适应意味着自旋的时间不再是固定的了,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定的

如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也很有可能再次成功,进而允许自旋等待持续相对更长的时间,比如持续100次忙循环。

另一方面,如果对于某个锁,自旋很少成功获得过锁,那在以后要获取这个锁时将有可能直接省略掉自旋过程,以避免浪费处理器资源。

有了自适应自旋,随着程序运行时间的增长及性能监控信息的不断完善,虚拟机对程序锁的状况预测就会越来越精准,虚拟机就会变得越来越“聪明”了。

lock & condition

double-check & Initialization On Demand Holder idiom

  1. double-check----->volatile
  2. Initialization On Demand Holder idiom----->基于类初始化的解决方案(holder):类的初始化阶段(即在Class被加载后,且被线程使用之前),会执行类的初始化。在执行类的初始化期间,JVM会去获取一个锁。这个锁可以同步多个线程对同一个类的初始化

但基于volatile的双重检查锁定的方案有一个额外的优势:除了可以对静态字段实现延迟初始化外,还可以对实例字段实现延迟初始化

ConcurrentHashMap

锁分段技术

在并发编程中使用HashMap可能导致程序死循环。而使用线程安全的HashTable效率又非
常低下,基于以上两个原因,便有了ConcurrentHashMap的登场机会

1.线程不安全的HashMap
在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,所
以在并发情况下不能使用HashMap

HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表
形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获
取Entry。

2.效率低下的HashTable
HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable
的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同
步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方
法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。

HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁

那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率

这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。

这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。

1614215357127

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。

一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

不变(Immutable)和易变(Volatile)

ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将得到不一致的数据。

ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry代表每个hash链中的一个节点,其结构如下所示:

 static final class HashEntry<K,V> {
     final K key;
     final int hash;
     volatile V value;
     final HashEntry next;
 }

可以看到除了value不是final的,其它值都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,因为这需要修改next 引用值,所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。

但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操作时还会详述。为了确保读操作能够看到最新的值,将value设置成volatile,这避免了加锁。

segmentMask和segmentShift

final Segment segmentFor(int hash) {
     return segments[(hash >>> segmentShift) & segmentMask];
 }

segmentMask和segmentShift主要是为了定位段

可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希。

再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。

get操作

V get(Object key, int hash) {
     if (count != 0) { // read-volatile 当前桶的数据个数是否为0
         HashEntry e = getFirst(hash);  得到头节点
         while (e != null) {
             if (e.hash == hash && key.equals(e.key)) {
                 V v = e.value;
                 if (v != null)
                     return v;
                 return readValueUnderLock(e); // recheck
             }
             e = e.next;
         }
     }
     returnnull;
 }

get操作不需要锁。

除非读到的值是空的才会加锁重读,我们知道HashTable容器的get方法是需要加锁的,那么ConcurrentHashMap的get操作是如何做到不加锁的呢?原因是它的get方法里将要使用的共享变量都定义成volatile

如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。

定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值),在get操作里只需要读不需要写共享变量count和value,所以可以不用加锁。

之所以不会读到过期的值,是因为根据Java内存模型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值,这是用volatile替换锁的经典应用场景。

接下来就是根据hash和key对hash链进行遍历找到要获取的结点,如果没有找到,直接访回null。对hash链进行遍历不需要加锁的原因在于链指针next是final的。但是头指针却不是final的,这是通过getFirst(hash)方法返回,也就是存在 table数组中的值。

这使得getFirst(hash)可能返回过时的头结点,例如,当执行get方法时,刚执行完getFirst(hash)之后,另一个线程执行了删除操作并更新头结点,这就导致get方法中返回的头结点不是最新的。这是可以允许,通过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步。

最后,如果找到了所求的结点,判断它的值如果非空就直接返回,否则在有锁的状态下再读一次。这似乎有些费解,理论上结点的值不可能为空,这是因为 put的时候就进行了判断,如果为空就要抛NullPointerException。空值的唯一源头就是HashEntry中的默认值,因为 HashEntry中的value不是final的,非同步读取有可能读取到空值。

仔细看下put操作的语句:tab[index] = new HashEntry(key, hash, first, value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被重新排序,这就可能导致结点的值为空。

这里当v为空时,可能是一个线程正在改变节点,而之前的get操作都未进行锁定,根据bernstein条件,读后写或写后读都会引起数据的不一致,所以这里要对这个e重新上锁再读一遍,以保证得到的是正确值。

V readValueUnderLock(HashEntry e) {
     lock();
     try {
         return e.value;
     } finally {
         unlock();
     }
 }

如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值),在get操作里只需要读不需要写共享变量count和value,所以可以不用加锁。

之所以不会读到过期的值,是根据java内存模型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值,这是用volatile替换锁的经典应用场景

put操作

同样地put操作也是委托给段的put方法。下面是段的put方法:

V put(K key, int hash, V value, boolean onlyIfAbsent) {
     lock();
     try {
         int c = count;
         if (c++ > threshold) // ensure capacity
             rehash();
         HashEntry[] tab = table;
         int index = hash & (tab.length - 1);
         HashEntry first = tab[index];
         HashEntry e = first;
         while (e != null && (e.hash != hash || !key.equals(e.key)))
             e = e.next;
         V oldValue;
         if (e != null) {
             oldValue = e.value;
             if (!onlyIfAbsent)
                 e.value = value;
         }
         else {
             oldValue = null;
             ++modCount;
             tab[index] = new HashEntry(key, hash, first, value);
             count = c; // write-volatile
         }
         return oldValue;
     } finally {
         unlock();
     }
 }

由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必
须加锁。

put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个
步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位
置,然后将其放在HashEntry数组里。

(1)是否需要扩容
在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阈
值,则对数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap
是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容
之后没有新元素插入,这时HashMap就进行了一次无效的扩容。

(2)如何扩容

在扩容的时候,首先会创建一个容量是原来容量两倍的数组,然后将原数组里的元素进
行再散列后插入到新的数组里。为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只
对某个segment进行扩容。

接着是找是否存在同样一个key的结点,如果存在就直接替换这个结点的值。否则创建一个新的结点并添加到hash链的头部,这时一定要修改modCount和count的值,同样修改count的值一定要放在最后一步。

删除操作remove(key)

public V remove(Object key) {
   hash = hash(key.hashCode());
   return segmentFor(hash).remove(key, hash, null);
}

整个操作是先定位到段,然后委托给段的remove操作。当多个删除操作并发进行时,只要它们所在的段不相同,它们就可以同时进行。

下面是Segment的remove方法实现:

V remove(Object key, int hash, Object value) {
     lock();
     try {
         int c = count - 1;
         HashEntry[] tab = table;
         int index = hash & (tab.length - 1);
         HashEntry first = tab[index];
         HashEntry e = first;
         while (e != null && (e.hash != hash || !key.equals(e.key)))
             e = e.next;
         V oldValue = null;
         if (e != null) {
             V v = e.value;
             if (value == null || value.equals(v)) {
                 oldValue = v;

                 // All entries following removed node can stay
                 // in list, but all preceding ones need to be
                 // cloned.
                 ++modCount;
                 HashEntry newFirst = e.next;
                 *for (HashEntry p = first; p != e; p = p.next)
                     *newFirst = new HashEntry(p.key, p.hash,
                                                   newFirst, p.value);
                 tab[index] = newFirst;
                 count = c; // write-volatile
             }
         }
         return oldValue;
     } finally {
         unlock();
     }
 }

整个操作是在持有段锁的情况下执行的,空白行之前的行主要是定位到要删除的节点e。接下来,如果不存在这个节点就直接返回null,否则就要将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不需要复制,它们可以重用。

中间那个for循环是做什么用的呢?**(*号标记)**从代码来看,就是将定位之后的所有entry克隆并拼回前面去,但有必要吗?每次删除一个元素就要将那之前的元素克隆一遍?

这点其实是由entry的不变性来决定的,仔细观察entry定义,发现除了value,其他所有属性都是用final来修饰的,这意味着在第一次设置了next域之后便不能再改变它,取而代之的是将它之前的节点全都克隆一次。

至于entry为什么要设置为不变性,这跟不变性的访问不需要同步从而节省时间有关

1614216348823

整个remove实现并不复杂,但是需要注意如下几点。

  • 第一,当要删除的结点存在时,删除的最后一步操作要将count的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。
  • 第二,remove执行的开始就将table赋给一个局部变量tab,这是因为table是 volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写做任何优化,直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。

size()操作

如果我们要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,我们是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?

不是的,虽然相加时可以获取每个Segment的count的最新值,但是拿到之后可能累加前使用的count发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计size的时候把所有Segment的put,remove和clean方法全部锁住,但是这种做法显然非常低效。

因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。

那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put , remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

CopyOnWrite

参考:https://blog.csdn.net/shiyuezhong/article/details/90726406

阻塞队列

CountDownLatch & CyclicBarrier & Semaphore

CountDownLatch 让一个线程A,等待其他所有的线程B C D,都结束后,线程A才能继续执行

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

cyclicBarrier相比于countdownLatch的优势在于,cyclicBarrier的屏障可以多次重用,即当屏障被突破了1次后,这个屏障又可以重用,作为下次的屏障,等待被突破。

await() 源码注释,描述了方法功能:调用该方法的线程进入等待,在CyclicBarrier上进行阻塞等待,直到发生以下情形之一:

  • 在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。—— 正常情形
  • 当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。
  • 其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
  • 其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
  • 其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。

除了第一种属于正常的情形,其他的都会导致 BrokenBarrierException。
带时限的await() 会抛出 TimeoutException;

public int await(long timeout, TimeUnit unit) throws InterruptedException,
                                                             BrokenBarrierException,
                                                             TimeoutException

当前线程等待超时,则抛出TimeoutException异常,并停止等待,继续执行。
当前线程抛出 TimeoutException 异常时,其他线程会抛出 BrokenBarrierException 异常。

img

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯。

比如××马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入××马路,但是如果前一百辆中有5辆车已经离开了××马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。

1.应用场景
Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制

在代码中,虽然有30个线程在执行,但是只允许10个并发执行。Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。

中断专题

中断的含义

中断是一种协作机制。当一个线程中断另一个线程时,被中断的线程不一定要立即停止正在做的事情。相反,中断是礼貌地请求另一个线程在它愿意并且方便的时候停止它正在做的事情。有些方法,例如 Thread.sleep(),很认真地对待这样的请求,但每个方法不是一定要对中断作出响应。对于中断请求,不阻塞但是仍然要花较长时间执行的方法可以轮询中断状态,并在被中断的时候提前返回。 您可以随意忽略中断请求,但是这样做的话会影响响应。

中断的协作特性所带来的一个好处是,它为安全地构造可取消活动提供更大的灵活性。我们很少希望一个活动立即停止;如果活动在正在进行更新的时候被取消,那么程序数据结构可能处于不一致状态。中断允许一个可取消活动来清理正在进行的工作,恢复不变量,通知其他活动它要被取消,然后才终止。

也就是说,中断是优雅的,并不是强制性的,而是礼貌的请求。那么,被中断的线程 可以对中断请求立即响应,也可以 过一会儿再响应,甚至不响应,这些都是被中断线程自己决定的。

即中断 和 响应中断是2件事,可以分别处理

阻塞方法如何处理中断标识

阻塞方法,如TimeUnit.seconds.sleep(3),当线程A处于阻塞过程中,在外部将线程A中断,那么会抛出InterruptedException ,表明 线程A的阻塞方法,将尝试停止它正在做的事情而提前返回,并通过抛出 InterruptedException 表明它提前返回。

非阻塞方法如何处理中断标识

仅仅因为一个任务是可取消的,并不意味着需要立即 对中断请求作出响应。

对于执行一个循环中的代码的任务,通常只需为每一个循环迭代检查一次中断。取决于循环执行的时间有多长,任何代码可能要花一些时间才能注意到线程已经被中断(或者是通过调用 Thread.isInterrupted() 方法轮询中断状态,或者是调用一个阻塞方法)。 如果任务需要提高响应能力,那么它可以更频繁地轮询中断状态。

通常可以在if条件 或者 while循环头部进行判断

 public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted()){
                queue.put(p = p.nextProbablePrime());
            }
        } catch (InterruptedException consumed) {
            /* Allow thread to exit */
        }
    }

实际处理中断方式

如果当处于阻塞方法时,出现了中断异常,这个时候,有2中处理方式。

第一种:捕获到了中断异常后,添加中断标识,重新抛出中断异常 或者其他异常,目的是为了不继续执行剩余代码了。

第二种:捕获到了中断异常后,添加中断标识,不再抛出任何异常,目的是 让剩余代码可以继续执行。

那么在实际代码中,应该选择哪种方式呢?

判断标准: 看 如果不抛出异常,继续执行剩余代码,会不会出现问题,比如,会不会出现空指针 或者数据错误等。 如果继续执行剩余代码,会出现问题,那么选择第一种处理方式;如果不会出现问题,那么选择第二种。

示例:

public HriDbHelper getWriteHelper() throws SQLException {
        HriDbHelper writeHelper = null;
        try {
            writeHelper = writeHelperQueue.poll(GET_HELPER_TIMEOUT, TimeUnit.SECONDS);
            if (writeHelper == null) {
                // 没有取到writeHelper
                LOG.error("getWriteHelper time out");
                throw new SQLException("getWriteHelper time out");
            }
            writeHelper.setLastUseTime(System.nanoTime());
            LOG.debug("getWriteHelper,time:{}", System.nanoTime());
            return writeHelper;
        } catch (InterruptedException e) {
            LOG.error("getWriteHelper occue exception:{}", e);
            Thread.currentThread().interrupt();
            return writeHelper;
        }
    }

我们可以看到上面的代码,当处于poll阻塞状态时,如果这个时候线程被中断了,那么就会捕获到中断异常。这个时候怎么处理呢?

根据判断标准,如果出现中断异常,执行剩余代码,就会出现null.getHriSqlDataBase()这种情况,如下:

@Override
    public boolean updateDataGenericInterface(String table, Map<String, ?> value, Map<String, ?> condition) throws SQLException {
      
        HriSqlDataBase hriSqlDataBase = HriDbHelperPool.instance().getWriteHelper().getHriSqlDataBase();
     
    }

出现null.getHriSqlDataBase()这种情况,明显是不对的,

所以继续执行剩余代码是有问题的,所以应该选择第一种处理方式

那么代码应该怎么修改呢?修改后的代码如下:

  public HriDbHelper getWriteHelper() throws SQLException {
        HriDbHelper writeHelper = null;
        try {
            writeHelper = writeHelperQueue.poll(GET_HELPER_TIMEOUT, TimeUnit.SECONDS);
            if (writeHelper == null) {
                // 没有取到writeHelper
                LOG.error("getWriteHelper time out");
                throw new SQLException("getWriteHelper time out");
            }
            writeHelper.setLastUseTime(System.nanoTime());
            LOG.debug("getWriteHelper,time:{}", System.nanoTime());
            return writeHelper;
        } catch (InterruptedException e) {
            LOG.error("getWriteHelper occue exception:{}", e);
            Thread.currentThread().interrupt();
            throw new SQLException("when getWriteHelper method block,occue InterruptedException");
        }
    }

我们可以看到,捕获到中断异常后,添加中断标识,又重新抛出SQLException,这样剩余的代码就不会再继续执行,就不会出现null.getHriSqlDataBase()这种情况了。

那么什么时候,会采用第二种处理方式呢?

如果getWriterHelper方式的返回值不是HriDbHelper,而是void,那么就应该采用第二种处理方式了。

interrupt & interrupted & isInterrupted

interrupt方法,是给线程打上一个中断标识

interrupted 方法 和 isInterrupted 方法,都是检测这个线程,是否存在中断标识。

需要注意的是:interrupted 方法,在检测中断标识后,会将中断标识改为false;而isInterrupted 方法,在检测中断标识后,不会修改中断标识

Future#cancel

// java.util.concurrent.Future#cancel
boolean cancel(boolean mayInterruptIfRunning);

如何处理 InterruptedException

有3中方式:

1.不捕捉 InterruptedException,将它传播给调用者

2.在重新抛出 InterruptedException 之前执行特定于任务的清理工作

3.捕捉 InterruptedException 后恢复中断状态

下面分别讲述:

清单 1. 不捕捉 InterruptedException,将它传播给调用者
public class TaskQueue {
    private static final int MAX_TASKS = 1000;
 
    private BlockingQueue<Task> queue 
        = new LinkedBlockingQueue<Task>(MAX_TASKS);
 
    public void putTask(Task r) throws InterruptedException { 
        queue.put(r);
    }
 
    public Task getTask() throws InterruptedException { 
        return queue.take();
    }
}

有时候需要在传播异常之前进行一些清理工作。在这种情况下,可以捕捉 InterruptedException,执行清理,然后抛出异常。清单 2 演示了这种技术,该代码是用于匹配在线游戏服务中的玩家的一种机制。 matchPlayers() 方法等待两个玩家到来,然后开始一个新游戏。如果在一个玩家已到来,但是另一个玩家仍未到来之际该方法被中断,那么它会将那个玩家放回队列中,然后重新抛出 InterruptedException,这样那个玩家对游戏的请求就不至于丢失。

清单 2. 在重新抛出 InterruptedException 之前执行特定于任务的清理工作
public class PlayerMatcher {
    private PlayerSource players;
 
    public PlayerMatcher(PlayerSource players) { 
        this.players = players; 
    }
 
    public void matchPlayers() throws InterruptedException { 
        try {
             Player playerOne, playerTwo;
             while (true) {
                 playerOne = playerTwo = null;
                 // Wait for two players to arrive and start a new game
                 playerOne = players.waitForPlayer(); // could throw IE
                 playerTwo = players.waitForPlayer(); // could throw IE
                 startNewGame(playerOne, playerTwo);
             }
         }
         catch (InterruptedException e) {  
             // If we got one player and were interrupted, put that player back
             if (playerOne != null)
                 players.addFirst(playerOne);
             // Then propagate the exception
             throw e;
         }
    }
}

有时候抛出 InterruptedException 并不合适,例如当由 Runnable 定义的任务调用一个可中断的方法时,就是如此。在这种情况下,不能重新抛出 InterruptedException,但是您也不想什么都不做。当一个阻塞方法检测到中断并抛出 InterruptedException 时,它清除中断状态。如果捕捉到 InterruptedException 但是不能重新抛出它,那么应该保留中断发生的证据,以便调用栈中更高层的代码能知道中断,并对中断作出响应。该任务可以通过调用 interrupt() 以 “重新中断” 当前线程来完成,如清单 3 所示。至少,每当捕捉到 InterruptedException 并且不重新抛出它时,就在返回之前重新中断当前线程。

清单 3. 捕捉 InterruptedException 后恢复中断状态
public class TaskRunner implements Runnable {
    private BlockingQueue<Task> queue;
 
    public TaskRunner(BlockingQueue<Task> queue) { 
        this.queue = queue; 
    }
 
    public void run() { 
        try {
             while (true) {
                 Task task = queue.take(10, TimeUnit.SECONDS);
                 task.execute();
             }
         }
         catch (InterruptedException e) { 
             // Restore the interrupted status
             Thread.currentThread().interrupt();
         }
    }
}

异常专题

setUncaughtExceptionHandler

针对Thread和Runnable,异常处理,使用setUncaughtExceptionHandler

由于在多线程中,run()方法无法继续向上显式抛出异常。所以这就使得Thread线程中的异常处理变得棘手。

首先对于受检异常,其应对办法非常通用:就是直接在run()方法体中捕获异常,然后进行对应的处理。对于非受检异常,JVM会帮助我们捕获到。那么我们如何处理JVM捕获的异常呢?答案是Thread.UncaughtExceptionHandler类。正如JDK文档所介绍的一样:

“当一个线程由于发生了非受检异常而终止时,JVM会使用Thread.gerUncaughtExceptionHandler()方法查看该线程上的UncaughtExceptionHandler,并调用他的uncaughtException()方法”——翻译自JDK7 API文档。

下面我们来尝试使用hread.UncaughtExceptionHandler类,来处理线程内部的非受检异常(受检异常直接在run()方法体内部的catch子句中处理)。

public class Demo {
    private static final AtomicInteger num = new AtomicInteger();
 
    public static void main(String[] args) throws InterruptedException {
        final Thread thread = new Thread(new ThreadTask());
        //此处使用thread.setUncaughtExceptionHandler(new ThreadExceptionHandler)可以代替ThreadTask中
        //的Thread.currentThread().setUncaughtExceptionHandler(new ThreadExceptionHandler(),但这样不好
        thread.start();
    }
 
    private static final class ThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            System.out.println("One uncaught exception was got:");
            System.out.println("Thread id=" + t.getId());
            System.out.println("Thread name=" + t.getName());
            e.printStackTrace(System.out);
            // 当捕获到非受检异常时,可以断定原来的线程已经被JVM终止。
            // 此时可以新建一个线程继续执行原来的任务
            if (num.get() < 5) {
                new Thread(new ThreadTask()).start();
            }
        }
    }
 
    private static final class ThreadTask implements Runnable {
        @Override
        public void run() {
            num.incrementAndGet();
            Thread.currentThread().setUncaughtExceptionHandler(new ThreadExceptionHandler());//设置非受检异常的ExceptionHandler
            try {
                for (int i = 4; i >= 0; i--) {
                    TimeUnit.SECONDS.sleep(1);
                    // 当i=0时候抛出的非受检异常,将导致当前线程被JVM杀死
                    // 但异常会被在上面设置的ThreadExceptionHandler捕获到,进而被处理
                    System.out.println(12 / i);
                }
            } catch (InterruptedException e) {//受检异常直接在run方法体内部处理
                e.printStackTrace();
            }
        }
    }
}

afterExecute

上面介绍的异常处理,使用setUncaughtExceptionHandler,这个只在Thread和Runnable体系中生效的。

如果在Executors 体系和 future callable体系中,setUncaughtExceptionHandler不再生效了

参考:https://blog.csdn.net/yizhenn/article/details/70455606

在线程池中,线程池Executor会Catch住所有运行时异常,当使用Future.get()获取其结果时,才会抛出这些运行时异常。我们看Future类的get()方法就可见一斑。

从JDK对Future.get()方法的定义可见:Callable线程中抛出的非受检异常会被Executor框架捕获到,然后通过Future类的get()方法传递给调用者。get() 方法的ExecutionException异常就是Runnable线程或者Callable线程抛出的异常。

我们可以这样总结:当使用Executor线程池相关框架来执行线程任务时,UncaughtExceptionHandler线程异常处理机制就是不生效的,这种情况下,线程内部的异常由线程池框架统一管理。当程序调用Future类的get()方法时,将感知到线程内部的运行时异常。当程序不使用Future类获取结果时,运行时异常将被线程池框架隐藏。

import java.util.concurrent.*;
 
public class Demo {
    private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
 
    public static void main(String[] args) {
        executorService.submit(new MyTask());
 
        executorService.shutdown();
    }
 
    private static final class MyTask implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
 
            int sum = 0;
            for (int i = 4; I >= 0; i--) {
                sum = sum + (12 / i);
            }
            return sum;
        }
    }
}

此程序后,在控制台没有任何输出。可见不使用future.get()的时候,运行时异常被线程池框架给“吃了”。

既然是由线程池框架统一管理,那么是够可以修改修改线程池框架的异常处理机制呢?答案是可以的。此时我们要自定义线程池框架,使用ThreadPoolExecutor类即可。ThreadPoolExecutor类提供了很多可调整的参数和钩子函数,可以很方便的构造一个线程池。具体介绍参见笔者之前的博客ThreadPoolExecutor线程池的使用。在这篇博客中,介绍了一个afterExecute(Runnable, Throwable)方法,在该方法中可以定义当前线程池中执行的线程发生内部异常时的处理方案。例子如下:

 private static final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)) {
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            printException(r, t);
        }
 
        private void printException(Runnable r, Throwable t) {
 
            if (t == null && r instanceof Future<?>) {
                try {
                    Object result = ((Future<?>) r).get();
                } catch (CancellationException ce) {
                    t = ce;
                } catch (ExecutionException ee) {
                    t = ee.getCause();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null) {
                t.printStackTrace(System.out);
                executeTask();
            }
        }
    };

在上面的代码中,我们使用ThreadPoolExecutor类自定义了一个线程池对象,对于所有使用该线程池执行的任务,其异常都会通过afterExecute()方法捕获到并进行处理。

需要注意的是,如果是在ScheduledThreadPoolExecutor 线程池中,不能像上面这样写。

因为,ScheduledThreadPoolExecutor中有很多 每隔一段时间 就执行的 循环任务,如果使用上面的future.get就会到 线程不断阻塞。

阻塞的原因是:因为任务 是每隔一段时间 就 循环执行的,因此,这个任务第一次执行完成后,其实并没有完成,因此调用future.get, 线程就会阻塞住。

但是 ScheduledThreadPoolExecutor也有另一种任务,就是,延时一段时间,然后只执行一次,这种任务就可以使用future.get,并不会阻塞

区分两种任务,只需要判断任务 是否是 周期性任务

private void recordExceptionToLog(Runnable runnable, Throwable throwable) {
			System.out.println("weipeng recordExceptionToLog runnable:{},throwable:{}");
			Throwable allThrowable = throwable;

			if (throwable == null && runnable instanceof RunnableScheduledFuture<?>) {
				System.out.println("weipeng enter RunnableScheduledFuture");
				RunnableScheduledFuture runnableScheduledFuture = (RunnableScheduledFuture<?>) runnable;
				if (!runnableScheduledFuture.isPeriodic()) {
					// 周期性任务,如果使用get会阻塞,非周期性人物,可以使用get
					try {
						runnableScheduledFuture.get();
					} catch (CancellationException ce) {
						allThrowable = ce;
					} catch (ExecutionException ee) {
						allThrowable = ee.getCause();
					} catch (InterruptedException ie) {
						Thread.currentThread().interrupt();
					} finally {
						System.out.println("weipeng enter finally");
					}
				}

				System.out.println("weipeng finish RunnableScheduledFuture");
			}

			if (allThrowable != null) {
				String name = Thread.currentThread().getName();
				System.out.println("thread:{} occur exception:{}");
			}
		}

线程池的组成

线程池,主要由以下几个部分组成:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
  • corePool:核心线程池的大小。
  • maximumPool:最大线程池的大小。
  • BlockingQueue:用来暂时保存任务的工作队列。
  • RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和时(达到了最大线程池大小且工作队列已满),execute()方法将要调用的Handler。

下面,我们来看下,线程池创建的bestPractise

private static final int TASK_COUNT = 100;

private final AtomicInteger poolNum = new AtomicInteger(1);

private BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(TASK_COUNT);

 private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
            2 * Runtime.getRuntime().availableProcessors(), 20L,
            TimeUnit.SECONDS, taskQueue, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            String threadName = String.valueOf(poolNum.getAndIncrement());
            return new Thread(r, "Monitor-Backend-ThreadPool-" + threadName);
        }
    });

上面的corePool,取的是Runtime.getRuntime().availableProcessors(),而maximumPool是2倍的cpu core。

这里指定了workQueue,是capacity为100的LinkedBlockingQueue,防止Jvm内存被撑爆了。

通过ThreadFactory,来给线程池中,所有线程起了一个统一的名字Monitor-Backend-ThreadPool-




考虑2个:线程数量 提交任务的数量

ThreadPoolExecutor执行execute方法分下面4种情况。
1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤
需要获取全局锁)。

2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执
行这一步骤需要获取全局锁)。

4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
RejectedExecutionHandler.rejectedExecution()方法。

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

·AbortPolicy:直接抛出异常。
·CallerRunsPolicy:只用调用者所在线程来运行任务。
·DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
·DiscardPolicy:不处理,丢弃掉。
当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录
日志或持久化存储不能处理的任务。

核心线程数 和 最大线程数的设置

计算机,分为CPU密集型 和 IO密集型。

CPU密集型,顾名思义,任务一般是需要大量计算的,需要耗费大量的CPU资源的。因此,CPU的计算能力,成为了瓶颈。


IO密集型,顾名思义,任务一般涉及到大量的io操作,比如网络传输、数据库交互、文件下载上传等等,而一般io操作时,线程一般都处于空闲状态,等待IO操作完成。因此,IO的带宽,成为了瓶颈。



对于CPU密集型,因为CPU的计算能力,已经被大量占用了。所以,此时增加线程数,也没有多余CPU时间片,分给增加的线程了,所以,增加线程数,并不能提高处理速度,反而,频繁的切换线程或进程也是要消耗时间的,会导致任务效率下降。

对于IO密集型,一个线程在进行IO操作,等待IO操作完成,这个时间段内,是不需要CPU的,即CPU处于空闲状态。所以,增加线程数,让新增加的线程 ,将空闲状态的CPU利用起来。这样,就能提高处理速度,提高效率。

对于CPU密集型,线程数 = cpu core + 1
对于IO密集型,线程数 = n * cpu core。具体是多少,需要根据场景来计算,常用的tomcat的默认最大线程数,是200。

shotdown() showdownNow()区别

可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。
shutdown() 方法在终止前允许执行以前提交的任务,
shutdownNow() 方法阻止等待任务启动并试图停止当前正在执行的任务。在终止时执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。关闭未使用的 ExecutorService 以允许回收其资源。
一般分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后调用 shutdownNow(如有必要)取消所有遗留的任务

`// 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。``    ``threadPool.shutdown();`

ExecuteService 服务的关闭:
当使用 ExecutorService 完毕之后,我们应该关闭它。 例如:你的程序通过 main() 方法启动,并且主线程退出了你的程序,如果你还有壹個活动的 ExecutorService 存在于你的程序中,那么程序将会继续保持运行状态。存在于 ExecutorService 中的活动线程会阻止Java虚拟机关闭。
为了关闭在 ExecutorService 中的线程,你需要调用 ***shutdown() 方法。executorService.shutdown();ExecutorService 并不会马上关闭,而是不再接收新的任务,壹但所有的线程结束执行当前任务,ExecutorServie 才会真的关闭。所有在调用 shutdown() 方法之前提交到 ExecutorService 的任务都会执行。如果你希望立即关闭 ExecutorService,你可以调用 shutdownNow() ***方法。executorService.shutdownNow();这個方法会尝试马上关闭所有正在执行的任务,并且跳过所有已经提交但是还没有运行的任务。但是对于正在执行的任务,是否能够成功关闭它是无法保证的。
awaitTermination方法,方法调用会被阻塞,直到所有任务执行完毕并且shutdown请求被调用,或者参数中定义的timeout时间到达,或者当前线程被打断,这几种情况任意一个发生了就会导致该方法的执行。接收timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。(若超时未关闭,程序也继续往下进行。传入Long.MAX_VALUE, TimeUnit.DAYS参数,可确保子进程完成。)一般情况下会和shutdown方法组合使用。

// 关闭启动线程 
service.shutdown(); 
// 等待子线程结束,再继续执行下面的代码 
service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);  //这里表示近似永远等待
//service.awaitTermination(1, TimeUnit.MINUTES);//等待一分钟
//service.awaitTermination(2, TimeUnit.SECONDS);//两秒钟
System.out.println("all thread complete");

Thread与Runnable的区别

Runnable只是一个工作单元,而Thread不仅包含runnable,更是一种状态机。Thread存储了,当前线程的状态、是否中断、异常处理器、本地变量副本(threadLocal)等。也就是说,Thread类似于一个上下文容器,存储了很多东西。

Executor

Executor框架主要由3大部分组成如下。

·任务。包括被执行任务需要实现的接口:Runnable接口或Callable接口。

·任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口
(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。

·异步计算的结果。包括接口Future和实现Future接口的FutureTask类。


评论