前言
其实一开始没有接触过这个,但是看Netty源码的时候,发现Netty的对象池使用了MpscQueue,感觉还挺有意思的。
MpscQueue主要针对的是单消费者,多生产者的情况,实现上是LockFree的,这里的Lock Free,一般都是指用了CAS解决并发问题。
单消费者单生产者
我们先一步一步来,先考虑单消费者单生产者的情况。
这种情况下,除了Lock Free,还可以做到Wait Free。
数组存储
如果我们使用数组作为存储的话,维护一个ReadIndex和WriteIndex。
需要保证WriteIndex > ReadIndex就行。
每次读的时候,判断队列是否为空,每次写的时候判断队列是否已经满了。
链表存储
如果我们使用链表作为存储的话,原理和数组类似,可以维护一个Head节点和一个Tail节点。
每次写入的时候
1 | Node node = newNode(); |
每次读取的时候
1 | if (head == tail) { |
单消费者,多生产者
这里我只想对于链表的实现,因为这是Netty的默认实现方式。
对于数组的实现方式,等大家看了RingBuffer的实现方式之后,想必自然就懂了。
在链表的实现上,对于单消费者,多生产者,其实对于消费者端的代码而言,区别和单消费者单生产者不大。
但是由于生产者可能有多个,所以对于tail指针的操作,多线程下是不安全的。
在Netty的Mpsc中,引入CAS操作,对tail指针进行原子操作:
1 | MpscLinkedNode tail = new MpscLinkedNode(); |
这里的valueOffset
就是tail对象的地址。
replaceTail这个函数,接收一个新的Node节点,将Tail节点替换成新的Node阶段,同时返回旧的Tail节点。
Offer操作
1 | public boolean offer(E value) { |
生产者加元素的代码不多,算起来就2行。
我们用图例来演示这2步究竟做了什么,首先,我们有一个单链表的结构。
第一步,单线程
第一步,原子替换Tail节点,将新的节点设置为Tail节点。
但是这一步操作完之后,其实并没有改变之前的节点的Next指向,上一个节点的Next还是指向的之前的Tail节点。
第二步,单线程
第二步执行完,才完成整个Next指针的链接。
第一步,多线程
假设第一步被多线程并发了,现在有2个线程同时执行完了第一步。
这个时候,整个链表的结构看起来就是这样的。
后面两个接待是断开的。
第二步,多线程
但是没关系,因为对于每个线程而言,都有自己的newTail和oldTail,这些newTail和oldTail相互串联了起来。
这2个线程结束之后,又是一个完整的链表。
take
对于消费者而言,因为没有竞争,其实连CAS都不需要。
1 | private MpscLinkedQueueNode<E> peekNode() { |
先看peekNode方法,因为head不保存数据,同时在多线程并发的情况下,可能会出现节点之间还没有被串联起来的情况。
这里使用了for(;;)
去等待数据。
同时如果head == tail
,表示队列中没有数据。
看完了peek,我们再来看poll方法
1 | public E poll() { |
这里就是拿到就的Head节点,把他置空,将next节点置为新的Head节点。