Java并发容器ConcurrentLinkedQueue源码解析

Java提供了多种并发队列,具体如下:

  1. 无锁非阻塞队列:ConcurrentLinkedQueue & ConcurrentLinkedDeque
  2. 普通阻塞队列:ArrayBlockingQueue & LinkedBlockingQuene & LinkedBlockingDeque

1、ConcurrentLinkedQueue

ConcurrentLinkedQueue和ConcurrentLinkedDeque是无锁非阻塞的,其实现方式是通过循环CAS(wait-free算法)。内部的数据结构是链表,都没有大小限制,容量是无限的。其 size() 遍历整个队列统计个数,不是一个O(1)时间的方法,并且返回的值不能准确的反应队列的实际大小。

源码分析

head & tail 节点

head 和 tail 并不是一定指向整个队列的首部,和尾部。其满足一些规则:不变性和可变性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* A node from which the first live (non-deleted) node (if any)
* can be reached in O(1) time.
* Invariants(不变性):
* - all live nodes are reachable from head via succ() (所有的节点都可以通过 head 使用 succ() 方法取得)
* - head != null (head 节点不为 null)
* - (tmp = head).next != tmp || tmp != head (其实就是 head.next != head)
* Non-invariants:
* - head.item may or may not be null. (head 节点的值可能为null)
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head! (允许 tail 滞后于 head, 也就是调用 succ() 方法, 从 head 不可达tail)
*/
private transient volatile Node<E> head;

/**
* A node from which the last node on list (that is, the unique
* node with node.next == null) can be reached in O(1) time.
* Invariants:
* - the last node is always reachable from tail via succ() (tail 节点通过succ()方法一定到达队列中的最后一个节点(node.next = null))
* - tail != null
* Non-invariants:
* - tail.item may or may not be null. (tail 节点的值可能为null)
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head! (允许 tail 滞后于 head, 也就是调用 succ() 方法, 从 head 不可达tail)
* - tail.next may or may not be self-pointing to tail. (tail.next 可能指向 tail)
*/
private transient volatile Node<E> tail;

入队列offer

元素入队的主要的工作:

  1. 将入队节点设置成当前队列尾节点的下一个节点
  2. 更新 tail 节点(tail节点不总是为队尾节点,减少了CAS更新 tail 节点的次数,提高入队效率):
    • 若 tail 节点的next节点不为空,则将入队节点设置成 tail 节点
    • 若 tail 节点的next节点为空,则将入队节点设置成 tail 的next节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public boolean offer(E e) {
checkNotNull(e);
// 根据传入的值构造节点
final Node<E> newNode = new Node<E>(e);
// 通过 tail 节点找到队列的尾节点
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 定位p为队列的尾节点
if (q == null) {
// 使用cas将新节点加入到队尾
// 比较p的next是否为null,为null才会设置成功。这里实现了同步,防止了别人已经修改了队尾,而设置了错误的位置
// if如果失败,则会循环再次执行
if (p.casNext(null, newNode)) {
// 插入成功之后,判断队尾节节点的前一个节点是否是tail节点,若不是,则将tail节点指向队尾节点
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
// 这里的情况是 p == p.next,并且t为tail节点。表示这个队列刚初始化,正在准备添加节点,所以返回head节点
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 重新赋值p结点,让p成为尾节点
p = (p != t && t != (t = tail)) ? t : q;
}
}

出队poll

元素出队的主要工作:

  1. 根据 head 节点找到队列首个不为空的节点,并CAS让该节点出队
  2. 更新 head 节点(head 节点不总是为队首节点,减少了CAS更新 head 节点的次数,提高入队效率):
    • 若 head 节点有元素,不会更新head节点
    • 若 head 节点没有元素,将 head 指向队列的首节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 若头节点的值不为空,并且cas设置头节点值为null成功
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
// 若当前节点不是 head 节点
if (p != h) // hop two nodes at a time
// 若当前节点的下一个节点不为空,则将 head 节点指向当前节点的下一个节点,否者指向当前节点
updateHead(h, ((q = p.next) != null) ? q : p);
// 返回出队的值
return item;
}
// 若当前节点的下一个节点为空,则该队列已空,重新设置 head 节点,并返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 该情况为,p是已经删除的 head 节点,继续循环
else if (p == q)
continue restartFromHead;
else
// 重新赋值p节点
p = q;
}
}
}

使用场景

因为其采用的是无锁的方式,所以能够满足高并发情况下的需求,其性能优于BlockingQueue
。缺点在于其size()方法为一个O(n)时间复杂度的方法,并且返回的值并不是一个准确的值,所以在实际应用中,无法有效获取队列中节点的个数。