This thread has been locked.

If you have a related question, please click the "Ask a related question" button in the top right corner. The newly created question will be automatically linked to this question.

Ring Buffer 有什么特别?

首先 - Ring Buffer。我对 Disruptor 的最初印象只有 Ring Buffer。后来我渐渐明白 Ring Buffer 结构是这个模式的中心,关键之处是 Disruptor 如何控制对它的访问。
Ring Buffer 究竟是什么?
正如名字描述那样 - 它是一个环 (圆形,首尾相接的),你可以把它当作一个缓存 (buffer),用来在一个线程上下文与另一个线程上下文之间传递数据。


(好吧,我是用 Paint 画的。我尝试画草图,希望强迫症没有掺和进来要求我画出完美的圆和直线)。 
所以基本上 Ring Buffer 就是拥有一个序号指向下一个可用元素的数组。

如果你持续向 buffer 中写入数据(应该也会从里面读数据),这个序号会一直增长,直到绕过整个环。

要找到数组中当前序号指向的元素,你可以用 mod 运算。
sequence mod array length = array index
因此对于上面的 Ring Buffer,这个算法就是(用 JAVA 的 mod 语法):12 % 10 = 2。很简单。
其实图片里画着 10 个元素完全是一个意外。2 的 N 次方个元素会更好,因为计算机是用二进制思考的。    
接下来呢?
如果你从 Wikipedia 查到 Circular Buffers​,你会看到它与我们的实现方式有一个重要的差别-没有指向末尾的指针。我们只有下一个可用的序号。这是刻意的-选择 Ring Buffer 的根本原因是需要支持可靠的消息通信。我们需要把服务发出的消息存储起来,那么当另一个服务发来一个 NAK (拒绝应答信号)​​ 说他们没有收到消息的时候,我们可以重新发送给他们。
Ring Buffer 看起来很理想。它用序号来指出 buffer 的末尾在哪里,而且当它收到一个 NAK 信号的时候,可以重发从那一点到当前序号之间的所有消息:

我们所实现的 Ring Buffer 与传统队列的区别是:buffer 里的对象不会被销毁-它们留在那儿直到下次被覆盖写入。这是与 Wikipedia 上的版本相比我们的实现不需要尾指针的原因。在我们的实现中,确定 Ring Buffer 是否重叠的工作,是由数据结构之外来完成的(这是生产者与消费者行为的一部分-如果你来不及等我写博客说明它,可以自己检出 Disruptor 代码​​)。
Ring Buffer 这么棒是因为...?

我们使用 Ring Buffer 这种数据结构,是因为它给我们提供了可靠的消息传递特性。这个理由就足够了,不过它还有一些其他的优点。
首先,Ring Buffer 比链表要快,因为它是数组,而且有一个容易预测的访问模式。这很不错,对 CPU 高速缓存友好 (CPU-cache-friendly)-数据可以在硬件层面预加载到高速缓存,因此 CPU 不需要经常回到主内存 RAM 里去寻找 Ring Buffer 的下一条数据。
第二点,Ring Buffer 是一个数组,你可以预先分配内存,并保持数组元素永远有效。这意味着内存垃圾收集(GC)在这种情况下几乎什么也不用做。此外,也不像链表那样每增加一条数据都要创建对象-当这些数据从链表里删除时,这些对象都要被清理掉。
文章缺少的部分
我没有提到如何避免环重叠,以及怎么向 Ring Buffer 读、写数据的细节。你也会注意到我在拿它和链表那样的数据结构相比较,我想没人会认为链表是实际问题的解决方案。    
有趣的部分来自于拿 Disruptor 和队列之类的实现相比较。队列通常关注于维护队列的头和尾,添加和消费消息一类的东西。所有这些东西我还没有在 Ring Buffer 一节真正提到。这是因为 Ring Buffer 本身并不负责这些事情,我们把这些问题挪到了数据结构的外部。     

-------————————————————————————————————————————————————————————————————

你也可以把你的理解和相关代码跟帖发来,大家一起学习

  • 圆形缓冲区(circular buffer),也称作圆形队列(circular queue),循环缓冲区(cyclic buffer),环形缓冲区(ring buffer),是一种数据结构用于表示一个固定尺寸、头尾相连的缓冲区,适合缓存数据流

    用法

    圆形缓冲区的一个有用特性是:当一个数据元素被用掉后,其余数据元素不需要移动其存储位置。相反,一个非圆形缓冲区(例如一个普通的队列)在用掉一个数据元素后,其余数据元素需要向前搬移。换句话说,圆形缓冲区适合实现先进先出缓冲区,而非圆形缓冲区适合后进先出缓冲区。


    圆形缓冲区适合于事先明确了缓冲区的最大容量的情形。扩展一个圆形缓冲区的容量,需要搬移其中的数据。因此一个缓冲区如果需要经常调整其容量,用链表实现更为合适。

    写操作覆盖圆形缓冲区中未被处理的数据在某些情况下是允许的。特别是在多媒体处理时。例如,音频的生产者可以覆盖掉声卡尚未来得及处理的音频数据。

    工作过程

    一个圆形缓冲区最初为空并有预定的长度。例如,这是一个具有七个元素空间的圆形缓冲区,其中底部的单线与箭头表示“头尾相接”形成一个圆形地址空间:

    Circular buffer - empty.svg

    假定1被写入缓冲区中部(对于圆形缓冲区来说,最初的写入位置在哪里是无关紧要的):

    Circular buffer - XX1XXXX.svg

    再写入2个元素,分别是2 & 3 — 被追加在1之后:

    Circular buffer - XX123XX.svg

    如果两个元素被处理,那么是缓冲区中最老的两个元素被卸载。在本例中,1 & 2被卸载,缓冲区中只剩下3:

    Circular buffer - XXXX3XX.svg

    如果缓冲区中有7个元素,则是满的:

    Circular buffer - 6789345.svg

    如果缓冲区是满的,又要写入新的数据,一种策略是覆盖掉最老的数据。此例中,2个新数据— A & B — 写入,覆盖了3 & 4:

    Circular buffer - 6789AB5.svg

    也可以采取其他策略,禁止覆盖缓冲区的数据,采取返回一个错误码或者抛出异常

    最终,如果从缓冲区中卸载2个数据,不是3 & 4 而是 5 & 6 。因为 A & B 已经覆盖了3 & 4:

    Circular buffer - X789ABX.svg

    圆形缓冲区工作机制

    由于计算机内存是线性地址空间,因此圆形缓冲区需要特别的设计才可以从逻辑上实现。

    读指针与写指针

    一般的,圆形缓冲区需要4个指针:

    • 在内存中实际开始位置;
    • 在内存中实际结束位置,也可以用缓冲区长度代替;
    • 存储在缓冲区中的有效数据的开始位置(读指针);
    • 存储在缓冲区中的有效数据的结尾位置(写指针)。

    读指针、写指针可以用整型值来表示。

    下例为一个未满的缓冲区的读写指针:

    Circular buffer - XX123XX with pointers.svg

    下例为一个满的缓冲区的读写指针:

    Circular buffer - 6789AB5 with pointers.svg

    区分缓冲区满或者空

    缓冲区是满、或是空,都有可能出现读指针与写指针指向同一位置:

    250px

    有多种策略用于检测缓冲区是满、或是空.

    总是保持一个存储单元为空

    缓冲区中总是有一个存储单元保持未使用状态。缓冲区最多存入 个数据。如果读写指针指向同一位置,则缓冲区为空。如果写指针位于读指针的相邻后一个位置,则缓冲区为满。这种策略的优点是简单、鲁棒;缺点是语义上实际可存数据量与缓冲区容量不一致,测试缓冲区是否满需要做取余数计算。

    使用数据计数

    这种策略不使用显式的写指针,而是保持着缓冲区内存储的数据的计数。因此测试缓冲区是空是满非常简单;对性能影响可以忽略。缺点是读写操作都需要修改这个存储数据计数,对于多线程访问缓冲区需要并发控制

    镜像指示位

    缓冲区的长度如果是n,逻辑地址空间则为0至n-1;那么,规定n至2n-1为镜像逻辑地址空间。本策略规定读写指针的地址空间为0至2n-1,其 中低半部分对应于常规的逻辑地址空间,高半部分对应于镜像逻辑地址空间。当指针值大于等于2n时,使其折返(wrapped)到ptr-2n。使用一位表 示写指针或读指针是否进入了虚拟的镜像存储区:置位表示进入,不置位表示没进入还在基本存储区。

    Circular buffer - mirror solution full and empty

    在读写指针的值相同情况下,如果二者的指示位相同,说明缓冲区为空;如果二者的指示位不同,说明缓冲区为满。这种方法优点是测试缓冲区满/空很简 单;不需要做取余数操作;读写线程可以分别设计专用算法策略,能实现精致的并发控制。 缺点是读写指针各需要额外的一位作为指示位。

    如果缓冲区长度是2的,则本方法可以省略镜像指示位。如果读写指针的值相等,则缓冲区为空;如果读写指针相差n,则缓冲区为满,这可以用条件表达式(写指针 == (读指针 异或 缓冲区长度))来判断。

    /* This approach adds one bit to end and start pointers */
     
    /* Circular buffer object */
    typedef struct {
        int         size;   /* maximum number of elements */
        int         start;  /* index of oldest element */
        int         end;    /* index at which to write new element */
        ElemType   *elems;  /* vector of elements */
    } CircularBuffer;
     
    void cbInit(CircularBuffer *cb, int size) {
        cb->size  = size;
        cb->start = 0;
        cb->end   = 0;
        cb->elems = (ElemType *)calloc(cb->size, sizeof(ElemType));
    }
     
    void cbPrint(CircularBuffer *cb) {
        printf("size=0x%x, start=%d, end=%d\n", cb->size, cb->start, cb->end);
    }
     
    int cbIsFull(CircularBuffer *cb) {
        return cb->end == (cb->start ^ cb->size); /* This inverts the most significant bit of start before comparison */ }
     
    int cbIsEmpty(CircularBuffer *cb) {
        return cb->end == cb->start; }
     
    int cbIncr(CircularBuffer *cb, int p) {
        return (p + 1)&(2*cb->size-1); /* start and end pointers incrementation is done modulo 2*size */
    }
     
    void cbWrite(CircularBuffer *cb, ElemType *elem) {
        cb->elems[cb->end&(cb->size-1)] = *elem;
        if (cbIsFull(cb)) /* full, overwrite moves start pointer */
            cb->start = cbIncr(cb, cb->start);
        cb->end = cbIncr(cb, cb->end);
    }
     
    void cbRead(CircularBuffer *cb, ElemType *elem) {
        *elem = cb->elems[cb->start&(cb->size-1)];
        cb->start = cbIncr(cb, cb->start);
    }
    

    读/写 计数

    用两个有符号整型变量分别保存写入、读出缓冲区的数据数量。其差值就是缓冲区中尚未被处理的有效数据的数量。这种方法的优点是读线程、写线程互不干扰;缺点是需要额外两个变量。

    记录最后的操作

    使用一位记录最后一次操作是读还是写。读写指针值相等情况下,如果最后一次操作为写入,那么缓冲区是满的;如果最后一次操作为读出,那么缓冲区是空。 这种策略的缺点是读写操作共享一个标志位,多线程时需要并发控制。

    POSIX优化实现

    #include <sys/mman.h>
    #include <stdlib.h>
    #include <unistd.h>
     
    #define report_exceptional_condition() abort ()
     
    struct ring_buffer
    {
      void *address;
     
      unsigned long count_bytes;
      unsigned long write_offset_bytes;
      unsigned long read_offset_bytes;
    };
     
    //Warning order should be at least 12 for Linux
    void
    ring_buffer_create (struct ring_buffer *buffer, unsigned long order)
    {
      char path[] = "/dev/shm/ring-buffer-XXXXXX";
      int file_descriptor;
      void *address;
      int status;
     
      file_descriptor = mkstemp (path);
      if (file_descriptor < 0)
        report_exceptional_condition ();
     
      status = unlink (path);
      if (status)
        report_exceptional_condition ();
     
      buffer->count_bytes = 1UL << order;
      buffer->write_offset_bytes = 0;
      buffer->read_offset_bytes = 0;
     
      status = ftruncate (file_descriptor, buffer->count_bytes);
      if (status)
        report_exceptional_condition ();
     
      buffer->address = mmap (NULL, buffer->count_bytes << 1, PROT_NONE,
                              MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
     
      if (buffer->address == MAP_FAILED)
        report_exceptional_condition ();
     
      address =
        mmap (buffer->address, buffer->count_bytes, PROT_READ | PROT_WRITE,
              MAP_FIXED | MAP_SHARED, file_descriptor, 0);
     
      if (address != buffer->address)
        report_exceptional_condition ();
     
      address = mmap (buffer->address + buffer->count_bytes,
                      buffer->count_bytes, PROT_READ | PROT_WRITE,
                      MAP_FIXED | MAP_SHARED, file_descriptor, 0);
     
      if (address != buffer->address + buffer->count_bytes)
        report_exceptional_condition ();
     
      status = close (file_descriptor);
      if (status)
        report_exceptional_condition ();
    }
     
    void
    ring_buffer_free (struct ring_buffer *buffer)
    {
      int status;
     
      status = munmap (buffer->address, buffer->count_bytes << 1);
      if (status)
        report_exceptional_condition ();
    }
     
    void *
    ring_buffer_write_address (struct ring_buffer *buffer)
    {
      /*** void pointer arithmetic is a constraint violation. ***/
      return buffer->address + buffer->write_offset_bytes;
    }
     
    void
    ring_buffer_write_advance (struct ring_buffer *buffer,
                               unsigned long count_bytes)
    {
      buffer->write_offset_bytes += count_bytes;
    }
     
    void *
    ring_buffer_read_address (struct ring_buffer *buffer)
    {
      return buffer->address + buffer->read_offset_bytes;
    }
     
    void
    ring_buffer_read_advance (struct ring_buffer *buffer,
                              unsigned long count_bytes)
    {
      buffer->read_offset_bytes += count_bytes;
     
      if (buffer->read_offset_bytes >= buffer->count_bytes)
        { /*如果读指针大于等于缓冲区长度,那些读写指针同时折返回[0, buffer_size]范围内 */
          buffer->read_offset_bytes -= buffer->count_bytes;
          buffer->write_offset_bytes -= buffer->count_bytes;
        }
    }
     
    unsigned long
    ring_buffer_count_bytes (struct ring_buffer *buffer)
    {
      return buffer->write_offset_bytes - buffer->read_offset_bytes;
    }
     
    unsigned long
    ring_buffer_count_free_bytes (struct ring_buffer *buffer)
    {
      return buffer->count_bytes - ring_buffer_count_bytes (buffer);
    }
     
    void
    ring_buffer_clear (struct ring_buffer *buffer)
    {
      buffer->write_offset_bytes = 0;
      buffer->read_offset_bytes = 0;
    }
     
    /*Note, that initial anonymous mmap() can be avoided - after initial mmap() for descriptor fd, you can try mmap() with hinted address as (buffer->address + buffer->count_bytes) and if it fails - another one with hinted address as (buffer->address - buffer->count_bytes). Make sure MAP_FIXED is not used in such case, as under certain situations it could end with segfault. The advantage of such approach is, that it avoids requirement to map twice the amount you need initially (especially useful e.g. if you want to use hugetlbfs and the allowed amount is limited) and in context of gcc/glibc - you can avoid certain feature macros (MAP_ANONYMOUS usually requires one of: _BSD_SOURCE, _SVID_SOURCE or _GNU_SOURCE).*/
    

    Linux内核的kfifo

    在Linux内核文件kfifo.h和kfifo.c中,定义了一个先进先出圆形缓冲区实现。如果只有一个读线程、一个写线程,二者没有共享的被修改的控制变量,那么可以证明这种情况下不需要并发控制。kfifo就满足上述条件。kfifo要求缓冲区长度必须为2的幂。读、写指针分别是无符号整型变量。把读写指针变换为缓冲区内的索引值,仅需要“按位与”操作:(指针值 按位与 (缓冲区长度-1))。这避免了计算代价高昂的“求余”操作。且下述关系总是成立:

    读指针 + 缓冲区存储的数据长度 == 写指针

    即使在写指针达到了无符号整型的上界,上溢出后写指针的值小于读指针的值,上述关系仍然保持成立(这是因为无符号整型加法的性质)。 kfifo的写操作,首先计算缓冲区中当前可写入存储空间的数据长度:

    len = min{待写入数据长度, 缓冲区长度 - (写指针 - 读指针)}

    然后,分两段写入数据。第一段是从写指针开始向缓冲区末尾方向;第二段是从缓冲区起始处写入余下的可写入数据,这部分可能数据长度为0即并无实际数据写入。

  • 学习了,谢谢分享!

  • 如何从Ringbuffer读取

    ConsumerBarrier与消费者

    这里我要稍微反过来介绍,因为总的来说读取数据这一过程比写数据要容易理解。假设通过一些“魔法”已经把数据写入到Ring Buffer了,怎样从Ring Buffer读出这些数据呢?

    (好,我开始后悔使用Paint/Gimp 了。尽管这是个购买绘图板的好借口,如果我继续写下去的话… UML界的权威们大概也在诅咒我的名字了。)

    消费者(Consumer)是一个想从Ring Buffer里读取数据的线程,它可以访问ConsumerBarrier对象——这个对象由RingBuffer创建并且代表消费者与RingBuffer进行交互。就像Ring Buffer显然需要一个序号才能找到下一个可用节点一样,消费者也需要知道它将要处理的序号——每个消费者都需要找到下一个它要访问的序号。在上面的例子中,消费者处理完了Ring Buffer里序号8之前(包括8)的所有数据,那么它期待访问的下一个序号是9

    消费者可以调用ConsumerBarrier对象的waitFor()方法,传递它所需要的下一个序号.

    1 final long availableSeq = consumerBarrier.waitFor(nextSequence);

    ConsumerBarrier返回RingBuffer的最大可访问序号——在上面的例子中是12ConsumerBarrier有一个WaitStrategy方法来决定它如何等待这个序号,我现在不会去描述它的细节,代码的注释里已经概括了每一种WaitStrategy的优点和缺点 。

    接下来怎么做?

    接下来,消费者会一直原地停留,等待更多数据被写入Ring Buffer。并且,一旦数据写入后消费者会收到通知——节点9101112 已写入。现在序号12到了,消费者可以让ConsumerBarrier去拿这些序号节点里的数据了。

    拿到了数据后,消费者(Consumer)会更新自己的标识(cursor)。

    你应该已经感觉得到,这样做是怎样有助于平缓延迟的峰值了——以前需要逐个节点地询问“我可以拿下一个数据吗?现在可以了么?现在呢?”,消费者(Consumer)现在只需要简单的说“当你拿到的数字比我这个要大的时候请告诉我”,函数返回值会告诉它有多少个新的节点可以读取数据了。因为这些新的节点的确已经写入了数据(Ring Buffer本身的序号已经更新),而且消费者对这些节点的唯一操作是读而不是写,因此访问不用加锁。这太好了,不仅代码实现起来可以更加安全和简单,而且不用加锁使得速度更快。

    另一个好处是——你可以用多个消费者(Consumer)去读同一个RingBuffer ,不需要加锁,也不需要用另外的队列来协调不同的线程(消费者)。这样你可以在Disruptor的协调下实现真正的并发数据处理。