当前位置 博文首页 > 文章内容

    [爪洼ing]的博客:ConcurrentHashMap1.8 源码分析

    作者:shunshunshun18 栏目:未分类 时间:2021-12-02 10:39:35

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



    一、容器初始化

    在jdk8的ConcurrentHashMap中一共有5个构造方法,这四个构造方法中都没有对内部的数组做初始化, 只是对一些变量的初始值做了处理

    jdk8的ConcurrentHashMap的数组初始化是在第一次添加元素时完成

    //没有维护任何变量的操作,如果调用该方法,数组长度默认是16
    public ConcurrentHashMap() {
    }
    
    //传递进来一个初始容量,ConcurrentHashMap会基于这个值计算一个比这个值大的2的幂次方数作为初始容量
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    

    注意,调用这个方法,得到的初始容量和我们之前讲的HashMap以及jdk7的ConcurrentHashMap不同,即使你传递的是一个2的幂次方数,该方法计算出来的初始容量依然是比这个值大的2的幂次方数

    //调用四个参数的构造
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }
    
    //计算一个大于或者等于给定的容量值,该值是2的幂次方数作为初始容量
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }
    
    //基于一个Map集合,构建一个ConcurrentHashMap
    //初始容量为16
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }
    

    sizeCtl含义解释

    注意:以上这些构造方法中,都涉及到一个变量sizeCtl,这个变量是一个非常重要的变量,而且具有非常丰富的含义,它的值不同,对应的含义也不一样,这里我们先对这个变量不同的值的含义做一下说明,后续源码分析过程中,进一步解释

    sizeCtl为0,代表数组未初始化, 且数组的初始容量为16

    sizeCtl为正数,如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值

    sizeCtl为-1,表示数组正在进行初始化

    sizeCtl小于0,并且不是-1,表示数组正在扩容, -(1+n),表示此时有n个线程正在共同完成数组的扩容操作

    二、添加元素

    1、添加元素put/putVal方法

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        //如果有空值或者空键,直接抛异常
        if (key == null || value == null) throw new NullPointerException();
        //基于key计算hash值,并进行一定的扰动
        int hash = spread(key.hashCode());
        //记录某个桶上元素的个数,如果超过8个,会转成红黑树
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //1、如果数组还未初始化,先对数组进行初始化
            if (tab == null || (n = tab.length) == 0)
                   
                tab = initTable();   //初始化操作!
            
    	    
            //2、如果hash计算得到的桶位置没有元素,利用cas将元素添加
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //cas+自旋(和外侧的for构成自旋循环),保证元素添加安全
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            
            //3、如果hash计算得到的桶位置元素的hash值为MOVED,证明正在扩容,那么协助扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            
            //4 、 //hash计算的桶位置元素不为空,且当前没有处于扩容操作,进行元素添加
            else {
                V oldVal = null;
                //对当前桶进行加锁,保证线程安全,执行元素添加操作
                				//普通链表 : 尾插法
                				//树化后 : 插入树节点
                
                synchronized (f) {				//加锁 为节点头元素添加重量级锁,如果触发hash冲突,引发线程同步机制)!
                    if (tabAt(tab, i) == f) {
                        //普通链表节点
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {	
                                			//拿到锁后对每个节点equals
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;			//存在相等覆盖
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;		//不存在就尾插
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //树节点,将元素添加到红黑树中
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //链表长度大于/等于8,将链表转成红黑树(这个8设计到泊松分布)
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);   //尝试树化,因为树化还有满足数组长度 >  64 !
                    //如果是重复键,直接将旧值返回
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //添加的是新元素,维护集合长度,并判断是否要进行扩容操作
        addCount(1L, binCount);    //链表长度加1
        return null;
    }
    

    通过以上源码,我们可以看到,当需要添加元素时,会针对当前元素所对应的桶位进行加锁操作,这样一方面保证元素添加时,多线程的安全,同时对某个桶位加锁不会影响其他桶位的操作,进一步提升多线程的并发效率

    2、数组初始化,initTable方法

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        //cas+自旋,保证线程安全,对数组进行初始化操作
        while ((tab = table) == null || tab.length == 0) {
            //如果sizeCtl的值(-1)小于0,说明此时正在初始化, 让出cpu
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //cas修改sizeCtl的值为-1,修改成功,进行数组初始化,失败,继续自旋
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        //sizeCtl为0,取默认长度16,否则去sizeCtl的值
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        //基于初始长度,构建数组对象
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //计算扩容阈值,并赋值给sc
                        sc = n - (n >>> 2);    // sc = n - n/4 = 0.75n ;
                    }
                } finally {
                    //将扩容阈值,赋值给sizeCtl
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    

    3、Put加锁图解


    梳理一下上述流程:

    初始化:执行构造的时候是否传入参数,

    • 没有传参数:初始化容量为16,sizeCtl = 0 ;
    • 如果传入capacity : 数组table的则初始化容量为tableSize(capacity + capacity >>> 1 + 1)的结果,然后会将结果赋值给sizeCtl

    接下来容器创造好后,当我们想容器添加<k,v>的时候

    • 如果此时数组为空且长度为0,调用initTable方法,为数组初始化 (第一次都是为空,因为是懒汉模式)!
    • 如果数组不为空,通过扰动函数确定桶位,判断桶位是否位空
      • 如果位空直接cas + 自旋添加
      • 如果不为空,判断当前是链表还是红黑树(注意:此处是通过synchronized为每个桶的第一个元素加锁,实现安全 + 并发)!
        • 如果是链表就逐个个euqals比较,如果存在相等就覆盖,不等就尾插法插入链表末尾!
        • 如果是红黑树就按树的插入方式

    初始化并且插入的流程完成

    三、计数操作

    做一些赘述,方便理解

    参考:https://blog.csdn.net/u011392897/article/details/60479937

    1.7及以前的ConcurrentHashMap中使用了Segment,Segment能够分担所有的针对单个K-V的写操作,包括put/replace。并且Segment自带一些数据,比如Segment.count,用于处理Map的计数要求,这样就可以像put/repalce一样,分担整个Map的并发计数压力。但是1.8中没有再使用Segment来完成put/replace,虽然还是利用了锁分段的思想,但是使用的是自带的synchronized锁住hash桶中的第一个节点,没有新增别的数据。因此计数操作,被落下来了,它无法享受synchronized实现的变种分段锁带来的高效率,单独使用一个Map.size来计数,线程竞争可能会很大,比使用Segment是效率低很多

    为了处理这个问题,jdk1.8中使用了一个仿造LongAdder实现的计数器,让计数操作额外使用别的基于分段并发思想的实现的类。。ConcurrentHashMap中不直接使用LongAdder,而是自己拷贝代码实现一个内部的,主要为了方便。LongAdder的实现本身代码不是特别多,ConcurrentHashMap中的实现,基本和LongAdder一样,可以直接看做是LongAdder。


    参考:https://sylvanassun.github.io/2018/03/16/2018-03-16-map_family/

    • 在Java 7中ConcurrentHashMap对每个Segment单独计数,想要得到总数就需要获得所有Segment的锁,然后进行统计。由于Java 8抛弃了Segment,显然是不能再这样做了,而且这种方法虽然简单准确但也舍弃了性能
    • Java 8声明了一个volatile变量baseCount(可以看作LongAdder计数器)用于记录元素的个数,对这个变量的修改操作是基于CAS的,每当插入元素或删除元素时都会调用addCount()函数进行计数。加不进去就加数组上,保证每个线程添加元素都能统计上,最后把加上的和没加上的汇总

    1、addCount方法

    1. 记录ConcurrentHashMap元素数量,会调用fullAddCount具体执行
    2. 扩容ConcurrentHashMap ,会调用transer方法具体执行扩容
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //当CounterCell数组不为空,则优先利用数组中的CounterCell记录数量
        //或者当baseCount的累加操作失败,会利用数组中的CounterCell记录数量
        if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            //标识是否有多线程竞争
            boolean uncontended = true;
            //当as数组为空
            //或者当as长度为0
            //或者当前线程对应的as数组桶位的元素为空
            //或者当前线程对应的as数组桶位不为空,但是累加失败
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                //以上任何一种情况成立,都会进入该方法,传入的uncontended是false
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            //计算元素个数
            s = sumCount();  //我们在这里得到元素的个数
        }
        
        
        //接着判断是否需要扩容
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            //当元素个数达到扩容阈值
            //并且数组不为空
            //并且数组长度小于限定的最大值
            //满足以上所有条件,执行扩容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                //这个是一个很大的正数
                int rs = resizeStamp(n);
                //sc小于0,说明有线程正在扩容,那么会协助扩容
                if (sc < 0) {
                    //扩容结束或者扩容线程数达到最大值或者扩容后的数组为null或者没有更多的桶位需要转移,结束操作
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //扩容线程加1,成功后,进行协助扩容操作
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        //协助扩容,newTable不为null
                        transfer(tab, nt);
                }
                //没有其他线程在进行扩容,达到扩容阈值后,给sizeCtl赋了一个很大的负数
                //1+1=2 --》 代表此时有一个线程在扩容
                
                //rs << RESIZE_STAMP_SHIFT)是一个很大的负数
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    //扩容,newTable为null
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
    

    2、fullAddCount方法

    需要添加的个数分别累加到baseCount 上、或者累加到其他CountCell数组中的每个对象中的value属性上

    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
       
        if ((h = ThreadLocalRandom.getProbe()) == 0) {  //获取当前线程的hash值(就是确定加到具体哪一个累加单元!)
            ThreadLocalRandom.localInit();     
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        
     //注意:我们的辅助数组CountCell中保存的是CountCell对象,而我们value是这个对象的有一个属性,所以找到位置,还需要判断是否需要创建对象,如果需要就创建,如果不需要就直接在其value属性的基础上进行添加!
        
        //标识是否有冲突,如果最后一个桶不是null,那么为true
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            //数组不为空,优先对数组中CouterCell的value累加
            if ((as = counterCells) != null && (n = as.length) > 0) {
                //线程对应的桶位为null
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        //创建CounterCell对象
                        CounterCell r = new CounterCell(x); // Optimistic create
                        //利用CAS修改cellBusy状态为1,成功则将刚才创建的CounterCell对象放入数组中
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                //桶位为空, 将CounterCell对象放入数组
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    //表示放入成功
                                    created = true;
                                }
                            }