和HashMap采用类似的数据结构,主要实现使用了Unsafe方法提供的原子操作和sychronized关键字实现
用来返回节点数组的指定位置的节点的原子操作
@SuppressWarnings("unchecked") static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }cas原子操作,在指定位置设定值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); }原子操作,在指定位置设定值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }get操作支持并发
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //1.计算哈希值 int h = spread(key.hashCode()); //2.判断table不为空,长度不为0,第一个节点不为空 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //3.如果第一个节点的哈希值相等并且值也相等,直接返回. if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))){ return e.val; } } //4.如果哈希值小于0,从当前节点一直查找,找到返回对应值或者null else if (eh < 0){ return (p = e.find(h, key)) != null ? p.val : null; } //5.如果上述两种情况不满足,则遍历链表进行查找. while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; //这边看来,get操作是支持并发的. }put方法这边实现
利用UnSafe的CAS方法添加第一个节点(如果为空)如果检测到哈希值为MOVED,会启动帮助扩容设值节点(第一个节点不为空)操作通过sychronized方法保证同步. 图示: public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { //1.不允许key或者value为null的情况 if (key == null || value == null) { throw new NullPointerException(); } //2.计算哈希值 int hash = spread(key.hashCode()); int binCount = 0; //3.进入循环 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //4.初始化table if (tab == null || (n = tab.length) == 0){ tab = initTable(); } //5.如果第一个节点为null,通过cas的方式去尝试设定值,成功则退出循环.这边没有加锁,失败则继续往下 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //6.如果检测出当前哈希值是MOVED,说明此时正在扩容,这边会帮助扩容 else if ((fh = f.hash) == MOVED){ tab = helpTransfer(tab, f); } //7.进入设值阶段,同步操作 else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; //8.链表的处理--这边直接进行赋值 for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //9.红黑树结构的操作 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //10.判断长度是否超过,超过则转换为树结构,否则直接返回 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //11.计数---不是很理解为什么这么做 addCount(1L, binCount); return null; }这边有一步,检测到MOVED,帮助扩容,这边我的理解是需要清空所有扩容的tab,不然会有内存隐患
public void clear() { long delta = 0L; // negative number of deletions int i = 0; Node<K,V>[] tab = table; //1.循环直到所有节点为空 while (tab != null && i < tab.length) { int fh; Node<K,V> f = tabAt(tab, i); //2.略过空桶 if (f == null) ++i; //3.帮助扩容? else if ((fh = f.hash) == MOVED) { tab = helpTransfer(tab, f); i = 0; // restart } //4.同步块处理,清空当前桶所有节点 else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> p = (fh >= 0 ? f : (f instanceof TreeBin) ? ((TreeBin<K,V>)f).first : null); while (p != null) { --delta; p = p.next; } setTabAt(tab, i++, null); } } } } if (delta != 0L) addCount(delta, -1); }remove这边直接调用了replaceNode方法 这边也有一步是帮助扩容的操作
final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); //1.进入循环 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //2.头结点为空,说明没有元素,直接退出循环,返回 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; //3.如果在做扩容操作,则帮助扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; //4.锁定该节点 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { validated = true; //5.循环链表查找,找到对应元素设值,否则继续往下 for (Node<K,V> e = f, pred = null;;) { K ek; //6.找到对应节点 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; //7.设值操作 if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; //8.达到最后节点,退出 if ((e = e.next) == null) break; } } //9.红黑树的替换处理 else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null) { if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null; }该部分参考:ConcurrentHashMap1.7源码分析
jdk1.7的concurrentHashMap使用一组segment对象存储对应的HashEntry数组,每个HashEntry对象上又挂着一组链表,代表真正的key-value数据对象。
下面可以看出segment是继承了ReentrantLock ,猜测这边的同步操作依赖于锁
static final class Segment<K,V> extends ReentrantLock implements Serializable { //table数组 transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; //临界值 transient int threshold; //负载因子 final float loadFactor; }HashEntry节点和HashMap的Entry节点类似,不过这边对value和next节点加了volatitle关键字
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; }put方法直接使用了Segment的方法,在put之前尝试加锁,加锁成功则进入操作,加锁失败则循环cas尝试获取锁,当达到一定次数后,锁住等待其他线程释放锁.
这边主要涉及到三个方法如下:
remove操作同样用到了有限次自旋CAS锁加锁,对整个的remove操作结束之后会释放锁
public V remove(Object key) { int hash = hash(key); Segment<K,V> s = segmentForHash(hash); //查找segment,如果segment不为空,则调用它的remove方法 return s == null ? null : s.remove(key, hash, null); } final V remove(Object key, int hash, Object value) { //1.没有拿到锁,就进入循环cas锁获取、达到一定次数挂起 if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null; //2.通过哈希值获取的HashEntry不为空 while (e != null) { K k; HashEntry<K,V> next = e.next; //3.获取到对应的key if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { //4.当前值为空,直接设置next值在当前位置 if (pred == null) setEntryAt(tab, index, next); //5.否则直接把指针指向e节点的下一个节点(1.e = next;2.next=e.next),也就是略过该节点(删除) else pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { //解锁 unlock(); } return oldValue; }replace和remove道理差不多类似
public V replace(K key, V value) { //1.找到对应hash的segment int hash = hash(key); if (value == null) throw new NullPointerException(); Segment<K,V> s = segmentForHash(hash); //2.进行replace操作或者直接返回null return s == null ? null : s.replace(key, hash, value); } final V replace(K key, int hash, V value) { //1.获取锁和有限次cas获取锁 if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V> e; //2.遍历对应的HashEntry节点 for (e = entryForHash(this, hash); e != null; e = e.next) { K k; //3.找到匹配的进行换值操作 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; e.value = value; ++modCount; break; } } } finally { //4.解锁 unlock(); } return oldValue; }clear会循环所有segment进行清除操作,并且清除时锁住每个segment,防止其他线程进入
public void clear() { final Segment<K,V>[] segments = this.segments; //1.循环所有的segment进行移除操作 for (int j = 0; j < segments.length; ++j) { Segment<K,V> s = segmentAt(segments, j); if (s != null) s.clear(); } } final void clear() { //1.锁定当前segment,这样应该也是为了get、put操作不能直接进来 lock(); try { //2.清除所有元素 HashEntry<K,V>[] tab = table; for (int i = 0; i < tab.length ; i++) setEntryAt(tab, i, null); ++modCount; count = 0; } finally { //3.解锁 unlock(); } } }