`
reino
  • 浏览: 1555 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
最近访客 更多访客>>
社区版块
存档分类
最新评论

ConcurrentHashMap源码分析

 
阅读更多

ConcurrentHashMap是util.concurrent 的重要成员,是Java 5中支持高并发、高吞吐量的线程安全HashMap实现。类似于Hashtable, ConcurrentHashMap是线程安全的;但是,区别在于ConcurrentHashMap使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性。它并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这也就是分段锁(Lock Striping)。在这种机制下,任意数量的读取线程可以并发地访问HashMap,并且允许一定数量的写线程并发地修改HaspMap。

 

ConcurrentHashMap内部划分了多个Segments,默认的并发级别是16,所以划分出了16个Segment;而每个Segment相当于一个Hash Table。它的这种内部结构使得可以并发地锁定其中的某些Segments,对这些Segment并发地写;而读操作则是完全并发的,而且读操作之前不需要加锁。下面引用一张图来描述ConcurrentHashMap的这种分段结构:

 

 ConcurrentHashMap 类中包含两个静态内部类 HashEntry 和 Segment。HashEntry 用来封装映射表的键 / 值对;Segment 用来充当锁的角色,每个 Segment 对象守护整个散列映射表的若干个桶。每个桶是由若干个 HashEntry 对象链接起来的链表。一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组。

 

HashEntry类的定义

 

    static final class HashEntry<K,V> {
        final int hash; //声明hash值为final类型
        final K key;  //声明key为final类型
        volatile V value;  //声明value为volatile类型
        volatile HashEntry<K,V> next;  //声明next域为volatile类型

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        /**
         * Sets next field with volatile write semantics. 
         */
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }
    }

 这是JDK 1.7的定义(本文的源码全部来自JDK 1.7),它定义了hash值,key、value键值对和next域。这里的next也被声明成volatile变量了,而在JDK 1.6是final的。声明成volatile的变量可以保证此变量对所有线程的可见性,也就是说线程修改了volatile变量的值,其他线程可以立刻看到新的值。ConcurrentHashMap允许多个线程并发修改HashEntry、添加或删除HashEntry,由于value和next是volatile变量,所以这种修改对于其他线程是立即可见的,读线程可以立即得到最新的修改。

另一个跟JDK 1.6不同的地方是,setNext()方法调用了Unsafe类的putOrderedObject()。Unsafe.java是jdk并发类库java.util.concurrent包中使用的底层类,该类中的方法都是通过native方法调用操作系统命令。putOrderedObject()方法,只对volatile字段有用,但是它不能保证内存对其他线程的即时可见性。

 

Segment类

 

Segment其实是Hash Table的特殊版本,它继承了ReentrantLock,优化了锁操作。  Segments维护着一组Hash桶的头结点,通过这个头结点可以遍历整个Hash桶对应的链表。这个类的读操作不需要加锁,写操作则需要先获得锁,再执行写操作,然后释放锁。这里锁是可控的自旋机制,这种机制通过Segment的scanAndLock()和scanAndLockForPut()方法来实现,进一步减少了加锁的可能性。

 

先看一下scanAndLockForPut()和scanAndLock()这两个方法的实现:

 

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
}

 

在scanAndLockForPut()方法中,首先根据hash值得到Segment中Hash桶的头结点,然后在while循环中不断尝试加锁。这里完全有可能尝试一次就获得了锁,或者尚未遍历完Hash桶的链表并且没找到key对应的HashEntry的情况下就获得了锁,这种两种情况下会该方法会返回null。

如果遍历完Hash桶,但是没找到key对应的HashEntry,则创建一个新的HashEntry结点,将retries设置成0。如果在Hash桶中找到了key对应的HashEntry结点,同样会把retries设置成0。一旦retries从-1变成0,就表明对Hash桶中链表的遍历已经结束。接下来,会不断尝试加锁,直到尝试的次数达到了最大限制(MAX_SCAN_RETRIES),才会进入等待的状态,这就是所谓的自旋等待。MAX_SCAN_RETRIES是根据系统是否是核来决定的,单核系统最大限制次数为1,多核系统最大限制次数为64。这自旋等待到什么时候结束呢?直到真正获得了锁,才退出while循环,所以scanAndLockForPut()方法是可以保证获得了锁的。

另外要注意的是,如果在自旋的过程中检测到Hash桶的头结点发生了变化,需要获取新的Hash桶头结点,将retries设成-1,重新为获取锁而进行自旋遍历。

 

private void scanAndLock(Object key, int hash) {
            // similar to but simpler than scanAndLockForPut
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            int retries = -1;
            while (!tryLock()) {
                HashEntry<K,V> f;
                if (retries < 0) {
                    if (e == null || key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f;
                    retries = -1;
                }
            }
}

 

scanAndLock()方法是用于Segment的remove()和replace()方法的,主要的思路跟scanAndLockForPut()基本一致,主要区别就是在没找到key对应的HashEntry结点时不需要创建新的HashEntry结点。无论是否遍历到key对应的HashEntry结点,最后都会将retries设置成-1,进入自旋等待。尝试加锁的过程,Hash桶头结点改变后为获取锁而重新进行自旋遍历的实现逻辑与scanAndLockForPut()是一样的。在这个方法返回的时候,它可以保证已经获得了锁,而且即使没找到key对应的HashEntry结点,也一样需要执行加锁操作,从而保证写操作的一致性。

 

接着分析一下Segment里面的put(), rehash(), remove(), replace()和clear()方法:

 

 put()操作

 

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
}

 put()方法一开始就会尝试获取锁,如果成功得到了锁,就不再需要执行scanAndLockForPut()了,否则就执行这个方法保证能够取到锁。然后根据hash值计算出Segment中相应的Hash桶的头结点。
注意,在这里Hash桶的index是通过(tab.length - 1) & hash计算出来的因为ConcurrentHashMap在计算Segment中数组长度时会保证该值是2的倍数,而且Segment在做rehash时也是每次增长一倍,所以只需要计算(tab.length - 1) & hash就可以得到Hash桶的index了。

 

table是volatile变量,它会禁止指令重排序优化,带来性能上的损失。在开始时将该引用赋值给tab变量,可以减少直接引用table变量,直接在tab局部变量上操作,可以实现编译、运行时的优化。为了保证每次读取的table中的数组项都是最新的值,因而调用entryAt()方法获取数组项的值,而不是通过tab[index]方式直接获取。在put操作更新节点链时,它采用Unsafe.putOrderedObject()操作(setEntryAt()),此时它对Hash桶的头结点的更新只局限与当前线程,为了保证接下来的put操作能够读取到上一次的更新结果,需要使用volatile读的语义去读取相应Hash桶的头结点,如下面enrtyAt()的代码所示。
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                       HashEntry<K,V> e) {
        UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
 
    static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
        return (tab == null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile
            (tab, ((long)i << TSHIFT) + TBASE);
    }
 根据hash值计算出Hash桶的头结点之后,就开始遍历Hash桶的链表。如果找到key对应的HashEntry结点,则将它的value值赋值给oldValue;如果这不是putIfAbsent,则修改该结点的value,同时modCount自增1,put操作完毕,释放锁。如果遍历的过程中找不到key对应的HashEntry结点,说明这是一个新结点,需要添加到这个Hash桶中。put()方法获取了锁,但是新结点可能在scanAndLockForPut()中创建出来了,当然也可能返回null。所以,在put()方法中,要检测一下这个新结点是不是null。如果不是null,说明它已经在scanAndLockForPut()准备好了,直接将它的next域指向当前Hash桶的头结点;否则,需要生成一个新的HashEntry结点,并将它的next域指向当前Hash桶的头结点。至此,插入新结的操作已经完成了一半。接着,计算Hash桶链表的结点个数,如果它超过了阀值并且tab数组长度没超过最大容量,就需要做rehash();如果个数不超过阀值,就把新的HashEntry结点当作头结点插入到当前Hash桶中。这里使用setEntryAt()操作以实现对链头的延时写,以提升性能,因为此时并不需要将该更新写入到内存,而在锁退出后该更新自然会写入内存。最后,modCount自增1,更新count变量和oldValue变量,退出循环,put操作完毕,释放锁。
rehash()操作
private void rehash(HashEntry<K,V> node) {
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
}
 在rehash()方法里面,将容器的容量扩大为原来的2倍,重新计算阀值,然后创建一个长度是原来的table数组2倍的新数组。接着遍历每一个table数组项,也就是Hash桶,以及Hash桶对应的HashEntry链表。对每个节点重新计算它的数组索引,然后创建一个新的节点插入到新数组。重新创建一个新节点,而不是直接修改已有节点的next域,其实是为了在做rehash过程中能够保证其他线程的遍历操作可以正常在原有的链表上正常进行。
如果希望理解rehash的过程,必须要明白一点,那就是Segment中的table数组是成倍地增长的,新数组的长度永远是旧数组长度的2倍,对原来的每个Hash桶的HashEntry结点重新计算得到的索引要不就跟原来的索引一样,要不就是原来的索引加上原数组的长度。例如,如果原数组长度是8,rahash后的长度就是16;如果原来的索引值是3,rehash重新计算到的索引就只可能是3或者是11(3+8)。
rehash()的过程减少了不必要的HashEntry结点创建。这种优化是基于识别原来的Hash桶链表中可重用的结点来实现的。首先,对于只有一个结点的链表,直接将它放入相应的Hash桶头结点位置。其次,对于有多个结点的链表,rehash()的内循环遍历每个Hash桶链表,搜索到第一个其后面所有节点的索引值不变的节点p,将这个结点p放在该索引位置的Hash桶头结点上,则这个p结点及其后面的所有链表结点都挂在相应的Hash桶上了。然后,从原来的Hash桶的头结点开始,重新创建原来的Hash桶上的每个结点,直到遇上p结点为止。每一个新创建的结点都会作为头结点插入到相应索引位置的Hash桶头结点位置。
处理完Segment的所有Hash桶及其链表后,将处理新添加进来的HashEntry结点,这个操作其实是比较简单的。先根据hash值计算出结点的索引位置,也就是计算出应该将它添加到哪个Hash桶上;然后在索引位置的Hash桶的头结点位置插入这个新结点。
remove()操作
final V remove(Object key, int hash, Object value) {
            if (!tryLock())
                scanAndLock(key, hash);
            V oldValue = null;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> e = entryAt(tab, index);
                HashEntry<K,V> pred = null;
                while (e != null) {
                    K k;
                    HashEntry<K,V> next = e.next;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        V v = e.value;
                        if (value == null || value == v || value.equals(v)) {
                            if (pred == null)
                                setEntryAt(tab, index, next);
                            else
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            oldValue = v;
                        }
                        break;
                    }
                    pred = e;
                    e = next;
                }
            } finally {
                unlock();
            }
            return oldValue;
}
 在JDK 1.6版本中,remove操作先找到key对应的Hash桶的头结点,然后遍历该Hash桶链表,如果在链表中找到key对应的结点,则为该结点之前的所有结点重新创建结点并组成一条新链表,将该新链表的尾结点指向找到结点的下一个结点。这样就会同时存在两条链表,即使有另一个线程正在该链表上遍历也不会出问题。
JDK 1.7版本的remove()方法,不再采用这种做法,它充分利用CPU缓存的特性,不再复制key结点之前的所有结点,而是在当前缓存中将链表中找到的待删除结点移除;而另一个遍历线程的缓存中继续存在原来的链表,它的遍历也不会出现问题。具体的做法:1) 如果待删除结点是Hash桶的头结点,则将待删除结点的下一个结点设置成Hash桶的头结点;2) 如果待删除结点不是Hash桶的头结点,则将待删除结点的上一个结点的next域指向待删除结点的下一个结点。可以看到,HashEntry结点的next域是可以被改变的,这也是JDK 1.7不再像JDK 1.6那样把next域声明为final的原因了。
当然,在进入remove()方法的时候也需要首先通过tryLock()或者scanAndLock()获得锁,与put()方法的区别之一是不需要创建新结点。remove操作完成后,同样需要释放锁。
replace()操作
final boolean replace(K key, int hash, V oldValue, V newValue) {
            if (!tryLock())
                scanAndLock(key, hash);
            boolean replaced = false;
            try {
                HashEntry<K,V> e;
                for (e = entryForHash(this, hash); e != null; e = e.next) {
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        if (oldValue.equals(e.value)) {
                            e.value = newValue;
                            ++modCount;
                            replaced = true;
                        }
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return replaced;
}
 replace()是ConcurrentHashMap新添加的接口,它与put()的区别是: put()方法如果没找到与key相应的HashEntry结点,就会添加一个新的结点,而replace()方法则不会这样做,它只会在找与key相应的HashEntry结点才会更新它的值,如果找不到,就不会做任何操作,返回false。同样,replace()方法也通过tryLock()或者scanAndLock()获得锁,完成操作之后,释放锁。
clear()操作
 final void clear() {
            lock();
            try {
                HashEntry<K,V>[] tab = table;
                for (int i = 0; i < tab.length ; i++)
                    setEntryAt(tab, i, null);
                ++modCount;
                count = 0;
            } finally {
                unlock();
            }
 }
 clear()操作不像其他的put(), remove(), replace()那样通过scanAndLock()或者scanAndLockForPut()来加锁,它直接调用ReentrantLock的lock()方法获得锁,从而不存在自旋等待的过程。这有可能是因为它不像其他操作只是对table中的一个Hash桶链表操作,而是需要对整个table进行操作,因而需要等到所有在table上操作的线程退出后才能执行。对一个Hash桶链表操作的线程执行得比较快,因而自旋可以后获得锁的可能性比较大,对table操作的等待相对要比较久,因而自旋等待意义不大。clear()操作只是将table数组的每个项设置为null,将count变量重置为0。另外,注意到它使用了setEntryAt的延迟设置,从而能够保证其他读线程的正常工作。
ensureSegment()方法
private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
}
 该方法返回给定索引位置的Segment,如果Segment不存在,则参考Segment表中的第一个Segment的参数创建一个Segment并通过CAS操作将它记录到Segment表中去。
ConcurrentHashMap中的get()、containsKey()、put()、putIfAbsent()、replace()、Remove()、clear()操作
ConcurrentHashMap中的get、containsKey操作只需根据key计算出hash值,然后根据hash值找到相应的Segment,再通过Segment找到Hash桶结点链表,然后遍历结点链表便可以得到结果。
ConcurrentHashMap中的put、putIfAbsent、replace、remove、clear操作,都委托给Segment去实现 了。在这些方法中所需要做的事情其实很简单,只需要通过hash值找到相应Segment和其中的Hash桶,然后调用Segment的相应方法就行了。
ConcurrentHashMap中的size()操作
public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
}
 首先看看RETRIES_BEFORE_LOCK,它是在未获行锁之前非同步的情况下尝试计算的次数,这个常量在size()和containsValue()中都用到了。这是用来避免在有其他线程不断地对Hash桶及其链表进行修改的情况下无限次尝试计算而设计的。
因为size()操作需要扫瞄整个所有的Segments,正常情况下需要先获得所有Segments的锁,然后做相应的查找、计算得到结果,再解锁,最后返回结果。但是,可以看到在这个size()方法中,并不是直接就对所有Segment加锁,而是在还没获得锁、非同步的情况下计算size,如果前后两次计算的过程中都总的修改次数(modCount)都是一样的,那就认为在这个计算的过程中,Map没有被修改过,可以认为这个计算的结果是正确的。如果尝试了3次计算,依然未能得出size的结果(因为有其他线程在修改Map),那么就会对所有的Segment加锁,然后再去计算size的值,而这个计算的过程跟非同步下的过程是一样的,同样是比较前后两次计算过程中修改次数来决定是否得到正确的结果。
 containsValue()、contains()操作
public boolean containsValue(Object value) {
        // Same idea as size()
        if (value == null)
            throw new NullPointerException();
        final Segment<K,V>[] segments = this.segments;
        boolean found = false;
        long last = 0;
        int retries = -1;
        try {
            outer: for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                long hashSum = 0L;
                int sum = 0;
                for (int j = 0; j < segments.length; ++j) {
                    HashEntry<K,V>[] tab;
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null && (tab = seg.table) != null) {
                        for (int i = 0 ; i < tab.length; i++) {
                            HashEntry<K,V> e;
                            for (e = entryAt(tab, i); e != null; e = e.next) {
                                V v = e.value;
                                if (v != null && value.equals(v)) {
                                    found = true;
                                    break outer;
                                }
                            }
                        }
                        sum += seg.modCount;
                    }
                }
                if (retries > 0 && sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return found;
}
 containsValues()方法跟size()方法的设计思路是一致的,都是在非同步的情况下先尝试计算3次,如果未能得到结果,就在所有Segment加锁的情况下,再计算,直到获得正确的结果。跟size()不同的地方是其中的计算逻辑,它遍历了每个Hash桶的链表去查找HashEntry结点的value是否和给定的value相等。如果相等,则返回true。如果遍历完所有Hash桶都找不到给定的value,则返回false。contains()其实就是简单地调用了containsValue(),不再赘述。
isEmpty()操作
public boolean isEmpty() {
        long sum = 0L;
        final Segment<K,V>[] segments = this.segments;
        for (int j = 0; j < segments.length; ++j) {
            Segment<K,V> seg = segmentAt(segments, j);
            if (seg != null) {
                if (seg.count != 0)
                    return false;
                sum += seg.modCount;
            }
        }
        if (sum != 0L) { // recheck unless no modifications
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    if (seg.count != 0)
                        return false;
                    sum -= seg.modCount;
                }
            }
            if (sum != 0L)
                return false;
        }
        return true;
}
 跟containsValue()等方法一样,isEmpty()也汇总每个Segment的修改次数并比较前后两次的汇总结果,从而可以避免在查检一个Segment的时候另一个Segment添加或删除了HashEntry结点。如果在检查某个Segment的时候发现它的count不是0,就直接返回false。另外,这个方法并没有加锁操作,它是在非同步的状态下执行的。
get()操作
public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
}
 get()方法并不需要获得锁,ConcurrentHashMap的读操作是完全并发的。get()方法的实现比较简单,它首先根据key计算hash值,然后通过hash值计算Segment的索引,通过UnSafe.getObjectVolatile()方法找到Segment,最后对Hash桶的链表进行遍历查找key,找到就返回key对应的value。如果找不到,返回null。
HashIterator类的定义
abstract class HashIterator {
        int nextSegmentIndex;
        int nextTableIndex;
        HashEntry<K,V>[] currentTable;
        HashEntry<K, V> nextEntry;
        HashEntry<K, V> lastReturned;

        HashIterator() {
            nextSegmentIndex = segments.length - 1;
            nextTableIndex = -1;
            advance();
        }

        /**
         * Set nextEntry to first node of next non-empty table
         * (in backwards order, to simplify checks).
         */
        final void advance() {
            for (;;) {
                if (nextTableIndex >= 0) {
                    if ((nextEntry = entryAt(currentTable,
                                             nextTableIndex--)) != null)
                        break;
                }
                else if (nextSegmentIndex >= 0) {
                    Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);
                    if (seg != null && (currentTable = seg.table) != null)
                        nextTableIndex = currentTable.length - 1;
                }
                else
                    break;
            }
        }

        final HashEntry<K,V> nextEntry() {
            HashEntry<K,V> e = nextEntry;
            if (e == null)
                throw new NoSuchElementException();
            lastReturned = e; // cannot assign until after null check
            if ((nextEntry = e.next) == null)
                advance();
            return e;
        }

        public final boolean hasNext() { return nextEntry != null; }
        public final boolean hasMoreElements() { return nextEntry != null; }

        public final void remove() {
            if (lastReturned == null)
                throw new IllegalStateException();
            ConcurrentHashMap.this.remove(lastReturned.key);
            lastReturned = null;
        }
}
 HashIterator是KeyIterator、ValueIterator、EntryIterator父类,其子类的方法大都是基于HashIterator进行的,比较简单。这里关键理解一下advance()方法。初始化HashIterator的时候,将nextSegmentIndex初始值设为最后一个Segment的index,将nextTableIndex的初始值设为-1,然后调用advance()方法,将nextEntry设为第一个非空Segment的第一个非空Hash桶的头结点。 在nextEntry()方法中,通过advance()方法找到下一个非空Hash桶的头结点。
如有错漏,欢迎拍砖:)
  • 大小: 21.4 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics