2024年8月

@


前言

请各大网友尊重本人原创知识分享,谨记本人博客: 南国以南i


提示:以下是本篇文章正文内容,下面案例可供参考

简介

使用Java实现
栈(Stack)

队列(Queue)
的操作是很常见的任务。栈和队列是两种不同的数据结构,它们分别具有特定的操作和行为。

栈是一种
后进先出(LIFO, Last In First Out)
的数据结构。它只允许在栈顶进行添加(push)或删除(pop)元素的操作。
类似于羽毛球球桶一样最开始放进去的球需要最后拿出来

Java实现栈的示例代码:

public static void query() {
        Queue<Integer> queue = new LinkedList<>();
        // 入队
        queue.offer(1);
        queue.offer(2);
        queue.offer(3);

        // 查看队首元素
        System.out.println("队首元素: " + queue.peek()); // 不移除队首元素

        // 出队
        while (!queue.isEmpty()) {
            System.out.println("出队元素: " + queue.poll());
        }
    }


    public static void stack() {
        //1、创建栈:使用Stack类(尽管Stack是遗留类,更推荐使用Deque接口的实现如ArrayDeque)或Deque接口(及其实现类如ArrayDeque)来实现栈。
        //Stack<Integer> stack = new Stack<Integer>();
        Deque<Integer> stack = new ArrayDeque<>();

        //2、入栈将元素添加到栈顶
        stack.push(1);
        stack.push(2);
        stack.push(3);

        //3、出栈(Pop):从栈顶移除元素,并返回被移除的元素。Stack类提供了pop()方法用于出栈操作
        int element = stack.pop(); // 返回并移除栈顶元素
        System.out.println(element); // 输出:3


        // 4、访问栈顶元素(Peek):获取栈顶元素,但不对栈进行修改。Stack类提供了peek()方法用于访问栈顶元素
        int outElement = stack.peek(); // 返回栈顶元素但不移除
        System.out.println("栈顶元素: " + outElement); // 输出:3

        // 5、循环出栈
        while (!stack.isEmpty()) {
            System.out.println("出栈元素: " + stack.pop());
        }
        
        /*输出:
        栈顶元素: 3
        出栈元素: 3
        出栈元素: 2
        出栈元素: 1*/
    }

栈的主要应用场景包括:

  1. 函数调用栈:
    在编程语言中,函数调用是通过栈来实现的。当函数被调用时,它的局部变量、参数和返回地址等信息会被压入栈中。当函数执行完毕时,这些信息会从栈中弹出,控制权返回给调用者。

  2. 浏览器的前进后退:
    浏览器的历史记录通常使用栈来管理。当我们浏览网页时,每个访问的页面都会被压入栈中;当我们点击“后退”时,页面会从栈中弹出,返回到上一个页面。

  3. 括号匹配:
    在解析或编译代码时,检查圆括号、花括号等是否匹配是一个常见问题。栈可以用来解决这个问题,每当遇到一个左括号时,就将其压入栈中;每遇到一个右括号时,就检查栈顶元素是否与之匹配,如果匹配则弹出栈顶元素,否则报错。

  4. 撤销操作:
    在许多编辑器或图形界面中,用户可以通过撤销操作回到之前的状态。撤销操作通常使用栈来实现,每次操作都会被压入栈中,当用户执行撤销操作时,栈顶的操作会被弹出并应用于当前状态。

队列

队列是一种
先进先出(FIFO, First In First Out)
的数据结构。它只允许在队尾添加元素(enqueue)和在队首删除元素(dequeue)
类似于排队的过程

Java实现队列的示例代码:

public static void queue() {
        // 1、创建队列:我们可以使用Java的集合类LinkedList来实现队列的操作。
        Queue<Integer> queue = new LinkedList<>();
        // 2、入队(Enqueue):将元素添加到队尾。LinkedList类提供了offer()方法用于入队操作。
        queue.offer(1);
        queue.offer(2);
        queue.offer(3);

        //3、出队(Dequeue):从队头移除元素,并返回被移除的元素。LinkedList类提供了poll()方法用于出队操作。
        int element = queue.poll(); // 返回并移除队头元素
        System.out.println(element); // 输出:1

        // 4、访问队头元素(Peek):获取队头元素,但不对队列进行修改。LinkedList类提供了peek()方法用于访问队头元素。
        System.out.println("队首元素: " + queue.peek()); // 不移除队首元素

        // 5、循环出队
        while (!queue.isEmpty()) {
            System.out.println("出队元素: " + queue.poll());
        }

        /*输出:
        队首元素: 1
        出队元素: 1
        出队元素: 2
        出队元素: 3*/
    }

LinkedList中的add方法和offer方法的区别

add方法:‌
add方法属于Collection接口
,‌它试图将指定的元素添加到列表中。‌如果添加成功,‌它返回true;‌如果因为某些原因(‌如容量限制)‌添加失败,‌它会抛出IllegalStateException异常。‌在LinkedList中,‌当队列为空时,‌使用add方法可能会因为违反容量限制而报错。‌此外,‌当将LinkedList视为列表使用时,‌通常采用add方法来压入对象。‌

offer方法:‌
offer方法属于Deque接口(‌双端队列)‌
,‌它试图将指定的元素插入到队列中。‌如果插入成功,‌它返回true;‌如果因为空间限制无法添加元素,‌则返回false。‌与add方法不同,‌offer方法不会抛出异常,‌而是通过返回值来指示操作是否成功。‌在有容量限制的队列中,‌
offer方法优于add方法,‌因为它通过返回false来处理空间不足的情况,‌而不是抛出异常,‌这种方式更加高效
。‌

总结来说,‌add方法和offer方法的主要区别在于它们的返回值和异常处理方式。‌add方法可能会因为违反容量限制而抛出异常,‌而offer方法则通过返回值来指示操作是否成功,‌避免了异常处理开销。‌

队列主要应用场景:

  1. 任务调度:
    在多任务处理系统中,任务通常被存储在一个队列中。系统按照任务进入队列的顺序来执行它们,实现了公平的调度。

  2. 消息传递:
    在进程间通信或网络编程中,消息通常被存储在一个队列中。发送者将消息发送到队列的尾部,接收者从队列的头部取出消息进行处理。

  3. 页面请求处理:
    在Web服务器中,多个用户请求可能同时到达。服务器可以将这些请求存储在一个队列中,然后按照请求到达的顺序进行处理。

  4. 广度优先搜索(BFS):
    在图的遍历算法中,广度优先搜索使用队列来存储待访问的节点。算法开始时,将起始节点加入队列。然后,算法循环进行,每次从队列中取出一个节点进行访问,并将其未被访问的邻接节点加入队列。

总结

了解
栈和队列
的应用场景有助于我们根据实际需求选择合适的数据结构,从而更高效地解决问题。


我是
南国以南i
记录点滴每天成长一点点,学习是永无止境的!转载请附原文链接!!!

参考:
https://overstarry.vip/posts/apisix如何添加自定义插件/

首先,我们需要向自定义的插件user-remote-auth添加到apisix中,对这块不清楚的同学,可以参考我这篇文章:【
apisix~lua插件开发与插件注册
】,添加成功之后,通过
curl http://apisix-admin.apisix:9180/apisix/admin/plugins/user-remote-auth -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1'
命令是可以查询到具体插件信息的。

将自定义插件添加到dashboard平台

  1. 从apisix-admin中获取插件的schema.json文件
  2. 将自定义插件的schema添加到这个文件中
  3. 更新apisix-dashboard文件,将这个schema.json文件进行挂载

从apisix-dashboard.json文件

  • 进入dashboard这个容器,将/usr/local/apisix-dashboard/conf/schema.json文件下载

将自定义插件的schema添加到这个文件中

  • 通过
    curl http://apisix-admin.apisix:9180/apisix/admin/plugins/user-remote-auth -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1'
    获取到插件schema信息,或者查看插件的lua脚本的schema小节
  • 添加自定义插件进行添加,如图

更新apisix-dashboard文件,将这个schema.json文件进行挂载

添加k8s configmap

更新k8s apisix-dashboard服务

在dashboard中查看新的插件

作者:来自 vivo 互联网服务器团队- Li Wanghong

本文首先介绍了 Disruptor 高性能内存队列的基本概念、使用 Demo、高性能原理及源码分析,最后通过两个例子介绍了 Disruptor 在i主题业务中的应用。

一、i主题及 Disruptor 简介

i主题是 vivo 旗下的一款主题商店 app,用户可以通过下载主题、壁纸、字体等,实现对手机界面风格的一键更换和自定义。

Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能的内存队列(用于系统内部线程间传递消息,不同于 RocketMQ、Kafka 这种分布式消息队列),基于 Disruptor 开发的系统单线程能支撑每秒600万订单。目前,包括 Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了 Disruptor 以获取高性能。在 vivo 内部它也有不少应用,比如自定义监控中使用 Disruptor 队列来暂存通过监控 SDK 上报的监控数据,i主题中也使用它来统计本地内存指标数据。

接下来从 Disruptor 和 JDK 内置队列的对比、Disruptor 核心概念、Disruptor 使用Demo、Disruptor 核心源码、Disruptor 高性能原理、Disruptor 在 i主题业务中的应用几个角度来介绍 Disruptor。

二、和 JDK 中内置的队列对比

下面来看下 JDK 中内置的队列和 Disruptor 的对比。队列的底层实现一般分为三种:数组、链表和堆,其中堆一般是为了实现带有优先级特性的队列,暂不考虑。另外,像 ConcurrentLinkedQueue 、LinkedTransferQueue 属于无界队列,在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列。这样 JDK 中剩下可选的线程安全的队列还有ArrayBlockingQueue 和 LinkedBlockingQueue。

由于 LinkedBlockingQueue 是基于链表实现的,由于链表存储的数据在内存里不连续,对于高速缓存并不友好,而且 LinkedBlockingQueue 是加锁的,性能较差。ArrayBlockingQueue 有同样的问题,它也需要加锁,另外,ArrayBlockingQueue 存在伪共享问题,也会导致性能变差。而今天要介绍的 Disruptor 是基于数组的有界无锁队列,符合空间局部性原理,可以很好的利用 CPU 的高速缓存,同时它避免了伪共享,大大提升了性能。

图片

三、Disruptor 核心概念

如下图,从数据流转的角度先对 Disruptor 有一个直观的概念。Disruptor 支持单(多)生产者、单(多)消费者模式。消费时支持广播消费(HandlerA 会消费处理所有消息,HandlerB 也会消费处理所有消息)、集群消费(HandlerA 和 HandlerB 各消费部分消息),HandlerA 和HandlerB 消费完成后会把消息交给 HandlerC 继续处理。

图片

下面结合 Disruptor 官方的架构图介绍下 Disruptor 的核心概念:

  • RingBuffer:前文说 Disruptor 是一个高性能内存内存队列,而 RingBuffer 就是该内存队列的数据结构,它是一个环形数组,是承载数据的载体。

  • Producer:Disruptor 是典型的生产者消费者模型。因此生产者是 Disruptor 编程模型中的核心组成,可以是单生产者,也可以多生产者。

  • Event:具体的数据实体,生产者生产 Event ,存入 RingBuffer,消费者从 RingBuffer 中消费它进行逻辑处理。

  • Event Handler:开发者需要实现 EventHandler 接口定义消费者处理逻辑。

  • Wait Strategy:等待策略,定义了当消费者无法从 RingBuffer 获取数据时,如何等待。

  • Event Processor:事件循环处理器,EventProcessor 继承了 Runnable 接口,它的子类实现了 run 方法,内部有一个 while 循环,不断尝试从 RingBuffer 中获取数据,交给 EventHandler 去处理。

  • Sequence:RingBuffer 是一个数组,Sequence (序号)就是用来标记生产者数据生产到哪了,消费者数据消费到哪了。

  • Sequencer:分为单生产者和多生产者两种实现,生产者发布数据时需要先申请下可用序号,Sequencer 就是用来协调申请序号的。

  • Sequence Barrier:见下文分析。

图片

四、Disruptor 使用 Demo

4.1 定义 Event

Event 是具体的数据实体,生产者生产 Event ,存入 RingBuffer,消费者从 RingBuffer 中消费它进行逻辑处理。Event 就是一个普通的 Java 对象,无需实现 Disruptor 内定义的接口。

public class OrderEvent {
    private long value;
 
    public long getValue() {
        return value;
    }
 
    public void setValue(long value) {
        this.value = value;
    }
}

4.2 定义 EventFactory

用于创建 Event 对象。

public class OrderEventFactory implements EventFactory<OrderEvent> {
    public OrderEvent newInstance() {
        return new OrderEvent();
    }
}

4.3 定义生产者

可以看到,生成者主要是持有 RingBuffer 对象进行数据的发布。这里有几个点需要注意:

  • RingBuffer 内部维护了一个 Object 数组(也就是真正存储数据的容器),在 RingBuffer 初始化时该 Object 数组就已经使用 EventFactory 初始化了一些空 Event,后续就不需要在运行时来创建了,提高性能。因此这里通过 RingBuffer 获取指定序号得到的是一个空对象,需要对它进行赋值后,才能进行发布。

  • 这里通过 RingBuffer 的 next 方法获取可用序号,如果 RingBuffer 空间不足会阻塞。

  • 通过 next 方法获取序号后,需要确保接下来使用 publish 方法发布数据。

public class OrderEventProducer {
 
    private RingBuffer<OrderEvent> ringBuffer;
     
    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
     
    public void sendData(ByteBuffer data) {
        // 1、在生产者发送消息的时候, 首先需要从我们的ringBuffer里面获取一个可用的序号
        long sequence = ringBuffer.next();
        try {
            //2、注意此时获取的OrderEvent对象是一个没有被赋值的空对象
            OrderEvent event = ringBuffer.get(sequence);
            //3、进行实际的赋值处理
            event.setValue(data.getLong(0));           
        } finally {
            //4、 提交发布操作
            ringBuffer.publish(sequence);          
        }
    }
}

4.4 定义消费者

消费者可以实现 EventHandler 接口,定义自己的处理逻辑。

public class OrderEventHandler implements EventHandler<OrderEvent> {
 
    public void onEvent(OrderEvent event,
                        long sequence,
                        boolean endOfBatch) throws Exception {
        System.out.println("消费者: " + event.getValue());
    }
}

4.5 主流程

  • 首先初始化一个 Disruptor 对象,Disruptor 有多个重载的构造函数。支持传入 EventFactory 、ringBufferSize (需要是2的幂次方)、executor(用于执行EventHandler 的事件处理逻辑,一个 EventHandler 对应一个线程,一个线程只服务于一个 EventHandler )、生产者模式(支持单生产者、多生产者)、阻塞等待策略。在创建 Disruptor 对象时,内部会创建好指定 size 的 RingBuffer 对象。

  • 定义 Disruptor 对象之后,可以通过该对象添加消费者 EventHandler。

  • 启动 Disruptor,会将第2步添加的 EventHandler 消费者封装成 EventProcessor(实现了 Runnable 接口),提交到构建 Disruptor 时指定的 executor 对象中。由于 EventProcessor 的 run 方法是一个 while 循环,不断尝试从RingBuffer 中获取数据。因此可以说一个 EventHandler 对应一个线程,一个线程只服务于一个EventHandler。

  • 拿到 Disruptor 持有的 RingBuffer,然后就可以创建生产者,通过该RingBuffer就可以发布生产数据了,然后 EventProcessor 中启动的任务就可以消费到数据,交给 EventHandler 去处理了。

public static void main(String[] args) {
    OrderEventFactory orderEventFactory = new OrderEventFactory();
    int ringBufferSize = 4;
    ExecutorService executor = Executors.newFixedThreadPool(1);
 
    /**
     * 1. 实例化disruptor对象
       1) eventFactory: 消息(event)工厂对象
       2) ringBufferSize: 容器的长度
       3) executor:
       4) ProducerType: 单生产者还是多生产者
       5) waitStrategy: 等待策略
     */
    Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
                                                        ringBufferSize,
                                                        executor,
                                                        ProducerType.SINGLE,
                                                        new BlockingWaitStrategy());
 
    // 2. 添加消费者的监听
    disruptor.handleEventsWith(new OrderEventHandler());
 
    // 3. 启动disruptor
    disruptor.start();
 
    // 4. 获取实际存储数据的容器: RingBuffer
    RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
 
    OrderEventProducer producer = new OrderEventProducer(ringBuffer);
 
    ByteBuffer bb = ByteBuffer.allocate(8);
 
    for (long i = 0; i < 5; i++) {
        bb.putLong(0, i);
        producer.sendData(bb);
    }
 
    disruptor.shutdown();
    executor.shutdown();
}

五、Disruptor 源码分析

本文分析时以单(多)生产者、单消费者为例进行分析。

5.1 创建 Disruptor

首先是通过传入的参数创建 RingBuffer,将创建好的 RingBuffer 与传入的 executor 交给 Disruptor 对象持有。

public Disruptor(
    final EventFactory<T> eventFactory,
    final int ringBufferSize,
    final Executor executor,
    final ProducerType producerType,
    final WaitStrategy waitStrategy){
    this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
         executor);
}

接下来分析 RingBuffer 的创建过程,分为单生产者与多生产者。

public static <E> RingBuffer<E> create(
        ProducerType producerType,
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy){
        switch (producerType){
            case SINGLE:
                // 单生产者
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                // 多生产者
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
}

不论是单生产者还是多生产者,最终都会创建一个 RingBuffer 对象,只是传给 RingBuffer 的 Sequencer 对象不同。可以看到,RingBuffer 内部最终创建了一个Object 数组来存储 Event 数据。这里有几点需要注意:

  • RingBuffer 是用数组实现的,在创建该数组后紧接着调用 fill 方法调用 EventFactory 工厂方法为数组中的元素进行初始化,后续在使用这些元素时,直接通过下标获取并给对应的属性赋值,这样就避免了 Event 对象的反复创建,避免频繁 GC。

  • RingBuffe 的数组中的元素是在初始化时一次性全部创建的,所以这些元素的内存地址大概率是连续的。消费者在消费时,是遵循空间局部性原理的。消费完第一个Event 时,很快就会消费第二个 Event,而在消费第一个 Event 时,CPU 会把内存中的第一个 Event 的后面的 Event 也加载进 Cache 中,这样当消费第二个 Event时,它已经在 CPU Cache 中了,所以就不需要从内存中加载了,这样可以大大提升性能。

public static <E> RingBuffer<E> createSingleProducer(
    EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy){
     
    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize,
                                                                    waitStrategy);
    return new RingBuffer<E>(factory, sequencer);
}

RingBufferFields(
        EventFactory<E> eventFactory,
        Sequencer sequencer){
        // 省略部分代码...
         
        // 额外创建2个填充空间的大小, 首尾填充, 避免数组的有效载荷和其它成员加载到同一缓存行
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
}
 
private void fill(EventFactory<E> eventFactory){
    for (int i = 0; i < bufferSize; i++){
        // BUFFER_PAD + i为真正的数组索引
        entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
}

5.2 添加消费者

添加消费者的核心代码如下所示,核心就是为将一个 EventHandler 封装成 BatchEventProcessor,

然后添加到 consumerRepository 中,后续启动 Disruptor 时,会遍历 consumerRepository 中的所有 BatchEventProcessor(实现了 Runnable 接口),将 BatchEventProcessor 任务提交到线程池中。

public final EventHandlerGroup<T> handleEventsWith(
                                    final EventHandler<? super T>... handlers){
    // 通过disruptor对象直接调用handleEventsWith方法时传的是空的Sequence数组
    return createEventProcessors(new Sequence[0], handlers);
}

EventHandlerGroup<T> createEventProcessors(
    final Sequence[] barrierSequences,
    final EventHandler<? super T>[] eventHandlers) {
 
    // 收集添加的消费者的序号
    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    // 本批次消费由于添加在同一个节点之后, 因此共享该屏障
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
 
    // 为每个EventHandler创建一个BatchEventProcessor
    for (int i = 0, eventHandlersLength = eventHandlers.length;
                    i < eventHandlersLength; i++) {
        final EventHandler<? super T> eventHandler = eventHandlers[i];
 
        final BatchEventProcessor<T> batchEventProcessor =
            new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
 
        if (exceptionHandler != null){
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }
 
        // 添加到消费者信息仓库中
        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }
 
    // 更新网关序列(生产者只需要关注所有的末端消费者节点的序列)
    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
 
    return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

创建完 Disruptor 对象之后,可以通过 Disruptor 对象添加 EventHandler,这里有一需要注意:通过 Disruptor 对象直接调用 handleEventsWith 方法时传的是空的 Sequence 数组,这是什么意思?可以看到 createEventProcessors 方法接收该空 Sequence 数组的字段名是 barrierSequences,翻译成中文就是栅栏序号。怎么理解这个字段?

比如通过如下代码给 Disruptor 添加了两个handler,记为 handlerA 和 handlerB,这种是串行消费,对于一个 Event,handlerA 消费完后才能轮到 handlerB 去消费。对于 handlerA 来说,它没有前置消费者(生成者生产到哪里,消费者就可以消费到哪里),因此它的 barrierSequences 是一个空数组。而对于 handlerB 来说,它的前置消费者是 handlerA,因此它的 barrierSequences 就是A的消费进度,也就是说 handlerB 的消费进度是要小于 handlerA 的消费进度的。

图片

disruptor.handleEventsWith(handlerA).handleEventsWith(handlerB);

图片

5.3 启动 Disruptor

Disruptor的启动逻辑比较简洁,就是遍历consumerRepository 中收集的 EventProcessor(实现了Runnable接口),将它提交到创建 Disruptor 时指定的executor 中,EventProcessor 的 run 方法会启动一个while 循环,不断尝试从 RingBuffer 中获取数据进行消费。

disruptor.start();

public RingBuffer<T> start() {
    checkOnlyStartedOnce();
    for (final ConsumerInfo consumerInfo : consumerRepository) {
        consumerInfo.start(executor);
    }
 
    return ringBuffer;
}
 
public void start(final Executor executor) {
    executor.execute(eventprocessor);
}

5.4 发布数据

在分析 Disruptor 的发布数据的源码前,先来回顾下发布数据的整体流程。

  • 调用 next 方法获取可用序号,该方法可能会阻塞。

  • 通过上一步获得的序号从 RingBuffer 中获取对应的 Event,因为 RingBuffer 中所有的 Event 在初始化时已经创建好了,这里获取的只是空对象。

  • 因此接下来需要对该空对象进行业务赋值。

  • 调用 next 方法需要在 finally 方法中进行最终的发布,标记该序号数据已实际生产完成。

public void sendData(ByteBuffer data) {
    long sequence = ringBuffer.next();
    try {
        OrderEvent event = ringBuffer.get(sequence);
        event.setValue(data.getLong(0));           
    } finally {
        ringBuffer.publish(sequence);          
    }
}

5.4.1 获取序号

next 方法默认申请一个序号。nextValue 表示已分配的序号,nextSequence 表示在此基础上再申请n个序号(此处n为1),cachedValue 表示缓存的消费者的最小消费进度。

假设有一个 size 为8的 RingBuffer,当前下标为6的数据已经发布好(nextValue为6),消费者一直未开启消费(cachedValue 和 cachedGatingSequence 为-1),此时生产者想继续发布数据,调用 next() 方法申请获取序号为7的位置(nextSequence为7),计算得到的 wrapPoint 为7-8=-1,此时 wrapPoint 等于 cachedGatingSequence,可以继续发布数据,如左图。最后将 nextValue 赋值为7,表示序号7的位置已经被生产者占用了。

接着生产者继续调用 next() 方法申请序号为0的数据,此时 nextValue为7,nextSequence 为8,wrapPoint 等于0,由于消费者迟迟未消费(cachedGatingSequence为-1),此时 wrapPoint 大于了 cachedGatingSequence,因此 next 方法的if判断成立,会调用 LockSupport.parkNanos 阻塞等待消费者进行消费。其中 getMinimumSequence 方法是获取多个消费者的最小消费进度。

图片

public long next() {
    return next(1);
}

public long next(int n) {
 
    /**
     * 已分配的序号的缓存(已分配到这里), 初始-1. 可以看该方法的返回值nextSequence,
     * 接下来生产者就会该往该位置写数据, 它赋值给了nextValue, 所以下一次调用next方
     * 法时, nextValue位置就是表示已经生产好了数据, 接下来要申请nextSequece的数据
     */
    long nextValue = this.nextValue;
 
    // 本次申请分配的序号
    long nextSequence = nextValue + n;
 
    // 构成环路的点:环形缓冲区可能追尾的点 = 等于本次申请的序号-环形缓冲区大小
    // 如果该序号大于最慢消费者的进度, 那么表示追尾了, 需要等待
    long wrapPoint = nextSequence - bufferSize;
 
    // 上次缓存的最小网关序号(消费最慢的消费者的进度)
    long cachedGatingSequence = this.cachedValue;
 
    // wrapPoint > cachedGatingSequence 表示生产者追上消费者产生环路(追尾), 即缓冲区已满,
    // 此时需要获取消费者们最新的进度, 以确定是否队列满
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        // 插入StoreLoad内存屏障/栅栏, 保证可见性。
        // 因为publish使用的是set()/putOrderedLong, 并不保证其他消费者能及时看见发布的数据
        // 当我再次申请更多的空间时, 必须保证消费者能消费发布的数据
        cursor.setVolatile(nextValue);
 
        long minSequence;
        // minSequence是多个消费者的最小序号, 要等所有消费者消费完了才能继续生产
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences,
                                                                  nextValue))) {
            LockSupport.parkNanos(1L);
        }
 
        // 缓存生产者们最新的消费进度
        this.cachedValue = minSequence;
    }
 
    // 这里只写了缓存, 并未写volatile变量, 因为只是预分配了空间但是并未被发布数据,
    // 不需要让其他消费者感知到。消费者只会感知到真正被发布的序号
    this.nextValue = nextSequence;
 
    return nextSequence;
}

5.4.2 根据序号获取 Event

直接通过 Unsafe 工具类获取指定序号的 Event 对象,此时获取的是空对象,因此接下来需要对该 Event 对象进行业务赋值,赋值完成后调用 publish 方法进行最终的数据发布。

OrderEvent event = ringBuffer.get(sequence);

public E get(long sequence) {
    return elementAt(sequence);
}

protected final E elementAt(long sequence) {
    return (E) UNSAFE.getObject(entries,
                                REF_ARRAY_BASE +
                                ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}

5.4.3 发布数据

生产者获取到可用序号后,首先对该序号处的空 Event 对象进行业务赋值,接着调用 RingBuffer 的 publish 方法发布数据,RingBuffer 会委托给其持有的 sequencer(单生产者和多生产者对应不同的 sequencer)对象进行真正发布。单生产者的发布逻辑比较简单,更新下 cursor 进度(cursor 表示生产者的生产进度,该位置已实际发布数据,而 next 方法中的 nextSequence 表示生产者申请的最大序号,可能还未实际发布数据),接着唤醒等待的消费者。

waitStrategy 有不同的实现,因此唤醒逻辑也不尽相同,如采用 BusySpinWaitStrategy 策略时,消费者获取不到数据时自旋等待,然后继续判断是否有新数据可以消费了,因此 BusySpinWaitStrategy 策略的 signalAllWhenBlocking 就是一个空实现,啥也不做。

ringBuffer.publish(sequence);

public void publish(long sequence) {
    sequencer.publish(sequence);
}

public void publish(long sequence) {
    // 更新生产者进度
    cursor.set(sequence);
    // 唤醒等待的消费者
    waitStrategy.signalAllWhenBlocking();
}

5.4.4 消费数据

前面提到,Disruptor 启动时,会将封装 EventHandler 的EventProcessor(此处以 BatchEventProcessor 为例)提交到线程池中运行,BatchEventProcessor 的 run 方法会调用 processEvents 方法不断尝试从 RingBuffer 中获取数据进行消费,下面分析下 processEvents 的逻辑(代码做了精简)。它会开启一个 while 循环,调用 sequenceBarrier.waitFor 方法获取最大可用的序号,比如获取序号一节所提的,生产者持续生产,消费者一直未消费,此时生产者已经将整个 RingBuffer 数据都生产满了,生产者无法再继续生产,生产者此时会阻塞。假设这时候消费者开始消费,因此 nextSequence 为0,而 availableSequence 为7,此时消费者可以批量消费,将这8条已生产者的数据全部消费完,消费完成后更新下消费进度。更新消费进度后,生产者通过 Util.getMinimumSequence 方法就可以感知到最新的消费进度,从而不再阻塞,继续发布数据了。

private void processEvents() {
    T event = null;
 
    // sequence记录消费者的消费进度, 初始为-1
    long nextSequence = sequence.get() + 1L;
 
    // 死循环,因此不会让出线程,需要独立的线程(每一个EventProcessor都需要独立的线程)
    while (true) {
        // 通过屏障获取到的最大可用序号
        final long availableSequence = sequenceBarrier.waitFor(nextSequence);
 
        // 批量消费
        while (nextSequence <= availableSequence) {
            event = dataProvider.get(nextSequence);
            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
            nextSequence++;
        }
 
        // 更新消费进度(批量消费, 每次消费只更新一次Sequence, 减少性能消耗)
        sequence.set(availableSequence);
    }
}

下面分析下 SequenceBarrier 的 waitFor 方法。首先它会调用 waitStrategy 的 waitFor 方法获取最大可用序号,以 BusySpinWaitStrategy 策略为例,它的 waitFor 方法的三个参数的含义分别是:

  • sequence:消费者期望获得的序号,也就是当前消费者已消费的进度+1

  • cursor:当前生产者的生成进度

  • dependentSequence:消费者依赖的前置消费者的消费进度。该字段是在添加 EventHandler,创建BatchEventProcessor 时创建的。如果当前消费者没有前置依赖的消费者,那么它只需要关心生产者的进度,生产者生产到哪里,它就可以消费到哪里,因此 dependentSequence 就是 cursor。而如果当前消费者有前置依赖的消费者,那么dependentSequence就是FixedSequenceGroup(dependentSequences)。

因为 dependentSequence 分为两种情况,所以 waitFor 的逻辑也可以分为两种情况讨论:

  • 当前消费者无前置消费者:假设 cursor 为6,也就是序号为6的数据已经发布了数据,此时传入的sequence为6,则waitFor方法可以直接返回availableSequence(6),可以正常消费。序号为6的数据消费完成后,消费者继续调用 waitFor 获取数据,传入的 sequence为7,而此时 availableSequence 还是未6,因此消费者需要自旋等待。当生产者继续发布数据后,因为 dependentSequence 持有的就是生产者的生成进度,因此消费者可以感知到,继续消费。

  • 当前消费者有前置消费者:假设 cursor 为6,当前消费者C有两个前置依赖的消费者A(消费进度为5)、B(消费进度为4),那么此时 availableSequence(FixedSequenceGroup实例,它的 get 方法是获取A、B的最小值,也就是4)为4。如果当前消费者C期望消费下标为4的数据,则可以正常消费,但是消费下标为5的数据就不行了,它需要等待它的前置消费者B消费完进度为5的数据后才能继续消费。

在 waitStrategy 的 waitFor 方法返回,得到最大可用的序号 availableSequence 后,最后需要再调用下 sequencer 的 getHighestPublishedSequence 获取真正可用的最大序号,这和生产者模型有关系,如果是单生产者,因为数据是连续发布的,直接返回传入的 availableSequence。而如果是多生产者,因为多生产者是有多个线程在生产数据,发布的数据是不连续的,因此需要通过 过getHighestPublishedSequence 方法获取已发布的且连续的最大序号,因为获取序号进行消费时需要是顺序的,不能跳跃。

public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException {
    /**
     * sequence: 消费者期望获取的序号
     * cursorSequence: 生产者的序号
     * dependentSequence: 消费者需要依赖的序号
     */
    long availableSequence = waitStrategy.waitFor(sequence,
                                                  cursorSequence,
                                                  dependentSequence, this);
 
    if (availableSequence < sequence) {
        return availableSequence;
    }
 
    // 目标sequence已经发布了, 这里获取真正的最大序号(和生产者模型有关)
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

public long waitFor(
    final long sequence, Sequence cursor, final Sequence dependentSequence,
    final SequenceBarrier barrier) throws AlertException, InterruptedException {
    long availableSequence;
 
    // 确保该序号已经被我前面的消费者消费(协调与其他消费者的关系)
    while ((availableSequence = dependentSequence.get()) < sequence) {
        barrier.checkAlert();
        // 自旋等待
        ThreadHints.onSpinWait();
    }
 
    return availableSequence;
}

六、Disruptor 高性能原理分析

6.1 空间预分配

前文分析源码时介绍到,RingBuffer 内部维护了一个 Object 数组(也就是真正存储数据的容器),在 RingBuffer 初始化时该 Object 数组就已经使用EventFactory 初始化了一些空 Event,后续就不需要在运行时来创建了,避免频繁GC。

另外,RingBuffe 的数组中的元素是在初始化时一次性全部创建的,所以这些元素的内存地址大概率是连续的。消费者在消费时,是遵循空间局部性原理的。消费完第一个Event 时,很快就会消费第二个 Event,而在消费第一个 Event 时,CPU 会把内存中的第一个 Event 的后面的 Event 也加载进 Cache 中,这样当消费第二个 Event 时,它已经在 CPU Cache 中了,所以就不需要从内存中加载了,这样也可以大大提升性能。

6.2 避免伪共享

6.2.1 一个伪共享的例子

如下代码所示,定义了一个 Pointer 类,它有2个 long 类型的成员变量x、y,然后在 main 方法中其中2个线程分别对同一个 Pointer 对象的x和y自增 100000000 次,最后统计下方法耗时,在我本机电脑上测试多次,平均约为3600ms。

public class Pointer {
 
    volatile long x;
 
    volatile long y;
 
    @Override
    public String toString() {
        return new StringJoiner(", ", Pointer.class.getSimpleName() + "[", "]")
                .add("x=" + x)
                .add("y=" + y)
                .toString();
    }
}

public static void main(String[] args) throws InterruptedException {
    Pointer pointer = new Pointer();
 
    int num = 100000000;
    long start = System.currentTimeMillis();
 
    Thread t1 = new Thread(() -> {
        for(int i = 0; i < num; i++){
            pointer.x++;
        }
    });
 
    Thread t2 = new Thread(() -> {
        for(int i = 0; i < num; i++){
            pointer.y++;
        }
    });
 
    t1.start();
    t2.start();
    t1.join();
    t2.join();
 
    System.out.println(System.currentTimeMillis() - start);
    System.out.println(pointer);
}

接着将 Pointer 类修改如下:在变量x和y之间插入7个 long 类型的变量,仅此而已,接着继续通过上述的 main 方法统计耗时,平均约为500ms。可以看到,修改前的耗时是修改后(避免了伪共享)的7倍多。那么什么是伪共享,为什么避免了伪共享能有这么大的性能提升呢?

public class Pointer {
 
    volatile long x;
 
    long p1, p2, p3, p4, p5, p6, p7;
 
    volatile long y;
 
    @Override
    public String toString() {
        return new StringJoiner(", ", Pointer.class.getSimpleName() + "[", "]")
                .add("x=" + x)
                .add("y=" + y)
                .toString();
    }
}

6.2.2 避免伪共享为什么可以提升性能

内存的访问速度是远远慢于 CPU 的,为了高效利用 CPU,在 CPU 和内存之间加了缓存,称为 CPU Cache。为了提高性能,需要更多地从 CPU Cache 里获取数据,而不是从内存中获取数据。CPU Cache 加载内存里的数据,是以缓存行(通常为64字节)为单位加载的。Java 的 long 类型是8字节,因此一个缓存行可以存放8个 long 类型的变量。

但是,这种加载带来了一个坏处,如上述例子所示,假设有一个 long 类型的变量x,另外还有一个 long 类型的变量y紧挨着它,那么当加载x时也会加载y。如果此时 CPU Core1 的线程在对x进行修改,另一个 CPU Core2 的线程却在对y进行读取。当前者修改x时,会把x和y同时加载到 CPU Core1 对应的 CPU Cache 中,更新完后x和其它所有包含x的缓存行都将失效。而当 CPU Core2 的线程读取y时,发现这个缓存行已经失效了,需要从主内存中重新加载。

这就是伪共享,x和y不相干,但是却因为x的更新导致需要重新从主内存读取,拖慢了程序性能。解决办法之一就是如上述示例中所做,在x和y之间填充7个 long 类型的变量,保证x和y不会被加载到同一个缓存行中去。Java8 中也增加了新的注解@Contended(JVM加上启动参数-XX:-RestrictContended 才会生效),也可以避免伪共享。

图片

6.2.3 Disruptor 中使用伪共享的场景

Disruptor 中使用 Sequence 类的 value 字段来表示生产/消费进度,可以看到在该字段前后各填充了7个 long 类型的变量,来避免伪共享。另外,向 RingBuffer 内部的数组、

SingleProducerSequencer 等也使用了该技术。

class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}
 
class Value extends LhsPadding {
    protected volatile long value;
}
 
class RhsPadding extends Value {
    protected long p9, p10, p11, p12, p13, p14, p15;
}

6.3 无锁

生产者生产数据时,需要入队。消费者消费数据时,需要出队。入队时,不能覆盖没有消费的元素。出队时,不能读取没有写入的元素。因此,Disruptor 中需要维护一个入队索引(生产者数据生产到哪里,对应 AbstractSequencer 中的 cursor )和一个出队索引(所有消费者中消费进度最小的序号)。

Disruptor 中最复杂的是入队操作,下面以多生产者(MultiProducerSequencer)的 next(n) 方法(申请n个序号)为例分析下 Disruptor 是如何实现无锁操作的。代码如下所示,判断下是否有足够的序号(空余位置),如果没有,就让出 CPU 使用权,然后重新判断。如果有,则使用 CAS 设置 cursor(入队索引)。

public long next(int n) {
    do {
        // cursor类似于入队索引, 指的是上次生产到这里
        current = cursor.get();
        // 目标是再生产n个
        next = current + n;
 
        // 前文分析过, 用于判断消费者是否已经追上生产进度, 生产者能否申请到n个序号
        long wrapPoint = next - bufferSize;
        // 获取缓存的上一次的消费进度
        long cachedGatingSequence = gatingSequenceCache.get();
 
        // 第一步:空间不足就继续等待
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
            // 重新计算下所有消费者里的最小消费进度
            long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
 
            // 依然没有足够的空间, 让出CPU使用权
            if (wrapPoint > gatingSequence) {
                LockSupport.parkNanos(1);
                continue;
            }
 
            // 更新下最新的最小的消费进度
            gatingSequenceCache.set(gatingSequence);
        }
        // 第二步:看见空间足够时尝试CAS竞争空间
        else if (cursor.compareAndSet(current, next)) {
            break;
        }
    } while (true);
 
    return next;
}

6.4 支持批量消费定义 Event

这个比较好理解,在前文分析消费数据的逻辑时介绍了,消费者会获取下最大可用的序号,然后批量消费这些消息。

七、Disruptor 在i主题业务中的使用

很多开源项目都使用了 Disruptor,比如日志框架 Log4j2 使用它来实现异步日志。HBase、Storm 等项目中也使用了到了 Disruptor。vivo 的 i主题业务也使用了 Disruptor,下面简单介绍下它的2个使用场景。

7.1 监控数据上报

业务监控系统对于企业来说非常重要,可以帮助企业及时发现和解决问题,可以方便的检测业务指标数据,改进业务决策,从而保证业务的可持续发展。i主题使用 Disruptor(多生产者单消费者)来暂存待上报的业务指标数据,然后有定时任务不断提取数据上报到监控平台,如下图所示。

图片

7.2 本地缓存 key 统计分析

i主题业务中大量使用了本地缓存,为了统计本地缓存中key 的个数(去重)以及每种缓存模式 key 的数量,考虑使用 Disruptor 来暂存并消费处理数据。因为业务代码里很多地方涉及到本地缓存的访问,也就是说,生产者是多线程的。考虑到消费处理比较简单,而如果使用多线程消费的话又涉及到加锁同步,因此消费者采用单线程模式。

整体流程如下图所示,首先在缓存访问工具类中增加缓存访问统计上报的调用,缓存访问数据进入到 RingBuffer 后,单线程消费者使用 HyperLogLog 来去重统计不同 key的个数,使用正则匹配来统计每种模式key的数量。然后有异步任务定时获取统计结果,进行展示。

需要注意的是,因为 RingBuffer 队列大小是固定的,如果生产者生产过快而消费者消费不过来,如果使用 next 方法申请序号,如果剩余空间不够会导致生产者阻塞,因此建议使用 tryPublishEvent 方法去发布数据,它内部是使用 tryNext 方法申请序号,该方法如果申请不到可用序号会抛出异常,这样生产者感知到了就可以做兼容处理,而不是阻塞等待。

图片

八、使用建议

  • Disruptor 是基于生产者消费者模式,如果生产快消费慢,就会导致生产者无法写入数据。因此,不建议在 Disruptor 消费线程中处理耗时较长的业务。

  • 一个 EventHandler 对应一个线程,一个线程只服务于一个 EventHandler。Disruptor 需要为每一个EventHandler(EventProcessor) 创建一个线程。因此在创建 Disruptor 时不推荐传入指定的线程池,而是由 Disruptor 自身根据 EventHandler 数量去创建对应的线程。

  • 生产者调用 next 方法申请序号时,如果获取不到可用序号会阻塞,这一点需要注意。推荐使用 tryPublishEvent 方法,生产者在申请不到可用序号时会立即返回,不会阻塞业务线程。

  • 如果使用 next 方法申请可用序号,需要确保在 finally 方法中调用 publish 真正发布数据。

  • 合理设置等待策略。消费者在获取不到数据时会根据设置的等待策略进行等待,BlockingWaitStrategry 是最低效的策略,但其对 CPU消耗最小。YieldingWaitStrategy 有着较低的延迟、较高的吞吐量,以及较高 CPU 占用率。当 CPU 数量足够时,可以使用该策略。

九、总结

本文首先通过对比 JDK 中内置的线程安全的队列和Disruptor 的特点,引入了高性能无锁内存队列 Disruptor。接着介绍了 Disruptor 的核心概念和基本使用,使读者对 Disruptor 建立起初步的认识。接着从源码和原理角度介绍了 Disruptor 的核心实现以及高性能原理(空间预分配、避免伪共享、无锁、支持批量消费)。其次,结合i主题业务介绍了 Disruptor 在实践中的应用。最后,基于上述原理分析及应用实战,总结了一些 Disruptor 最佳实践策略。

参考文章:

https://time.geekbang.org/column/article/132477

https://lmax-exchange.github.io/disruptor/

相关文章

数据库系列:MySQL慢查询分析和性能优化
数据库系列:MySQL索引优化总结(综合版)
数据库系列:高并发下的数据字段变更
数据库系列:覆盖索引和规避回表
数据库系列:数据库高可用及无损扩容
数据库系列:使用高区分度索引列提升性能
数据库系列:前缀索引和索引长度的取舍
数据库系列:MySQL引擎MyISAM和InnoDB的比较
数据库系列:InnoDB下实现高并发控制
数据库系列:事务的4种隔离级别
数据库系列:RR和RC下,快照读的区别
数据库系列:MySQL InnoDB锁机制介绍
数据库系列:MySQL不同操作分别用什么锁?
数据库系列:业内主流MySQL数据中间件梳理
数据库系列:巨量数据表的分页性能问题
数据库系列: 主流分库分表中间件介绍(图文总结)

★ 针对常见的互联网业务场景进行数据库架构设计总结

1 先导

我们的系统会随着业务的膨胀而越发的复杂,数据基数也会越变得越来越大。
如何
在数据变为巨量数据之后,依然保持强有力的稳定性和高效的性能,是系统架构的一个重要的目标
,下面我们按照数据架构的演进来阐述变化过程。

2 演进

2.1 单体

一般是产品设计之初,或者试运行阶段。

image

最常见的架构设计如上:

  1. Service:服务层,对调用者提供优质的Rset或者RPC接口,同时能获取数据库数据
  2. DB:数据层,一般是单库,负责数据存储

2.2 冷热分离架构

历史数据访问频率比较低,可以放在独立的表或者库中。
比如你的去年之前的朋友圈,当有需要的时候才回去翻阅。
但是最近一周内的朋友圈信息(无论是你的还是你朋友发的图文,都是热数据),为了性能,为了你的活跃数据的表更轻便一点,必然要做隔离。

image
架构如上:

  1. Service:服务层,对调用者提供良好的Rset或者RPC接口,同时能获取数据库数据
  2. DB-Cool:冷数据库,一般存放历史比较久远的数据,数据量庞大,能够忍受加载效率慢
  3. DB-Hot:热数据库,一般存近期活跃数据,数据量不会很庞大,加载效率高

2.3 读写分离架构

多读少些的场景已经形成了,大量的读操作影响到写数据的性能和稳定性了
,为了解决【数据库读写高并发】 的问题而设计。
当然可引入缓存、消息队列等中间件支撑,咱们这边不提。

image

如上,这是最常见的1主N从,读写分离的数据库模式:

  1. Service:服务层,对调用者提供良好的Rset或者RPC接口,同时能获取数据库数据
  2. DB-M(master):主库,提供数据库写服务
  3. DB-R(replicate):从库,提供数据库读服务

它有如下特点:

  1. 1主n从,线性提升读的性能,当然n不是越多越好,也会有复制压力
  2. 主从之间通过binlog来进行写操作的replay,实现数据复制
  3. 主从之前的数据结构和数据结果完全一直,即使会有同步延迟,最终还是要一致的
  4. 注意写依然是单点,那是因为互联网业务中,高频写的热数据占比一般都是很低的

2.4 数据分片架构

典型sharding(水平拆分)方案。
水平拆分又分为库内分表和分库分表,来解决单表中数据量增长出现的压力,这些数据库中的表结构完全相同,我们这边按照分库分表来介绍。
image
架构如上:

  1. Service:服务层,对调用者提供良好的Rset或者RPC接口,同时能获取数据库数据
  2. DB-0、DB-2:数据按照分片进行还分,这边的例子每1KW存一个分片

水平分区的数据可以按照这5种策略隔离:

2.4.1 Hash(哈希)

这种策略是通过对表的一个或多个列的Hash Key进行计算,最后通过这个Hash码不同数值对应的数据区域进行分区。例如我们可以建立一个对表的日期的年份进行分区的策略,这样每个年份都会被聚集在一个区间。

1 PARTITION BY HASH(YEAR(createtime))
2 PARTITIONS 10

image

2.4.2 Range(范围)

这种策略是将数据划分不同范围。例如我们可以将一个千万级别的表通过id划分成4个分区,每个分区大约250W的数据,超过750W后的数据统一放在第4个分区。

PARTITION BY RANGE(id) (
PARTITIONP0 VALUES LESS THAN(2500001),
PARTITIONP1 VALUES LESS THAN(5000001),
PARTITIONp2 VALUES LESS THAN(7500001),
PARTITIONp3 VALUES LESS THAN MAXVALUE
)

image

还有
Key
(键值) 、
List
(预定义列表)、
Composite
(复合模式) ,请翻开我的这两篇文章:

MySQL全面瓦解28:分库分表


MySQL全面瓦解29:分库分表之Partition功能详解

2.4.3 解决什么问题?

  1. 数据库/数据表瘦身,提升读写性能
  2. 缩小故障影响范围(故障爆炸半径)

3 总结

  1. 产品发展初期,使用单库模式
  2. 部分数据写固化,读频率降低,使用冷热隔离架构
  3. 业务量扩大,多读少些占比重,使用读写分离的主从架构
  4. 数据基量膨胀,让检索不堪重负,使用分片(分库分表)架构
  5. 呀,不写了,去跑步咯

大家好,我是码农先森。

在早些年前客户端想要实时获取到最新消息,都是使用定时长轮询的方式,不断的从服务器上获取数据,这种粗暴的骚操作实属不雅。不过现如今我也还见有人还在一些场景下使用,比如在 PC 端扫描二维码,然后使用长轮询的方式从服务端获取最新的扫码信息,来判断用户是否已经扫码完成,诸如这种场景还有不少。其实大家都知道长轮询的方式不好,那为什么还有人使用呢?

我想最直接的原因就是「开发起来简单明了」,人性决定了人类都是趋易避难的高级物种,那个容易上手就用那个。但是我想表达的是除了长轮询的方式外,WebSocket 技术其实也不难,只不过对于从来没有接触过长连接的人来说,刚开始上手时会有一些思维上的障碍。这次我分享的内容是基于 WebSocket 技术的消息推送中心,看起来很高大上,其实也就是通过一些小的例子来演示,从服务端推送数据到客户端的这个过程,接下来的例子简单明了容易上手,我们赶紧开始吧。

话不多说,开整!我们先来看一下整体的项目目录结构,内容主要分为 PHP 和 Go 两部分。

[manongsen@root php_to_go]$ tree -L 2
.
├── go_websocket
│   ├── app
│   │   ├── controller
│   │   |	|── message.go
│   │   │   └── websocket.go
│   │   └── route.go
│   ├── go.mod
│   ├── go.sum
│   └── main.go
└── php_websocket
│   ├── app
│   │   ├── controller
│   │   |	|── Push.php
│   │   │   └── Worker.php
│   ├── composer.json
│   ├── composer.lock
│   ├── config
│   │   |── worker_server.php
│   │   └── worker.php
│   ├── route
│   │   └── app.php
│   ├── think
│   ├── vendor
│   └── .env

ThinkPHP

使用 composer 创建基于 ThinkPHP 框架的 php_websocket 项目。

## 当前目录
[manongsen@root ~]$ pwd
/home/manongsen/workspace/php_to_go/php_websocket

## 安装 ThinkPHP 框架
[manongsen@root php_websocket]$ composer create-project topthink/think php_websocket
[manongsen@root php_websocket]$ cp .example.env .env

## 安装 Composer 依赖包
[manongsen@root php_websocket]$ composer require topthink/think-worker
[manongsen@root php_websocket]$ composer require predis/predis

使用
php think make:controller Worker
命令创建
Worker.php
控制器。这个控制器中主要实现了
onWorkerStart
这个方法,首先添加了一个 Timer 异步定时器,然后从 Redis 队列中读取消息,最后将消息推送到客户端,这个定时器会每间隔一秒钟调度一次。

// ./php_to_go/php_websocket/app/controller/Worker.php
<?php
declare (strict_types = 1);

namespace app\controller;

use think\Request;
use think\worker\Server;
use Workerman\Lib\Timer;
use think\facade\Cache;
use think\facade\Env;

class Worker extends Server
{
    protected $socket = 'websocket://0.0.0.0:2345';
    protected static $connections = [];

    public function onWorkerStart($worker) {
        // 添加一个异步定时器任务
        Timer::add(1, function () use ($worker) {
            // 从消息中心队列中读取消息
            $redis = Cache::store('redis')->handler();
            $content = $redis->rpop(Env::get("MESSAGE_CENTER_KEY"));

            // 发送消息到客户端
            foreach ($worker->connections as $connection) {
                if (!empty($content)) {
                    $connection->send("PHP语言消息中心: " . $content);
                }
            }
        });
    }

    public function onWorkerReload($worker) {
    }

    public function onConnect($connection) {
    }

	public function onMessage($connection, $data){
	}

    public function onClose($connection) {
    }

    public function onError($connection, $code, $msg) {
    }
}

使用
php think make:controller Push
命令创建
Push.php
控制器。这个控制器的主要作用是接收外部的消息内容,然后推送到 Redis 消息队列中,这里提供的是 API 接口,这个接口可以在外部的后台系统调用。

// ./php_to_go/php_websocket/app/controller/Push.php
<?php

namespace app\controller;

use app\BaseController;
use think\facade\Cache;
use think\facade\Env;

class Push extends BaseController
{
    public function msg()
    {
        // 接收 GET 参数
        $params = $this->request->param();
        if (empty($params["content"])) {
            return json(["code" => -1, "msg" => "内容不能为空"]);
        }
        $content = $params["content"];

        // 推送消息到消息中心队列
        $redis = Cache::store('redis')->handler();
        $redis->lpush(Env::get("MESSAGE_CENTER_KEY"), $content);

        return json(["code" => 0, "msg" => "success"]);
    }
}

先运行
php think worker
启动 HTTP 服务,再运行
php think worker:server
启动 WebSocket 服务,最后来测试一波。

Gin

通过 go mod 初始化 go_websocket 项目。

## 当前目录
[manongsen@root ~]$ pwd
/home/manongsen/workspace/php_to_go/go_websocket

## 初始化项目
[manongsen@root go_websocket]$ go mod init go_websocket

## 安装第三方依赖库
[manongsen@root go_websocket]$ go get github.com/gin-gonic/gin
[manongsen@root go_websocket]$ go get github.com/gorilla/websocket

在 go_websocket 项目中创建 websocket 控制器。这个控制器会将客户端连接存储到指定的 Map 数据结构中,其次还提供了
WaitMessage
等待消息的方法,如果从
MsgQueue
通道中读取到了消息,则把消息推送给所有的客户端。

// ./php_to_go/go_websocket/app/controller/websocket.php
package controller

import (
	"fmt"
	"net/http"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/gorilla/websocket"
)

// 定义一个消息传输通道
var MsgQueue = make(chan string, 10)

// 定义一个存储客户端连接的 Map
var Clients = make(map[*websocket.Conn]bool)

// 将 HTTP 协议升级至 WebSocket 协议
var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		return true // 允许所有来源
	},
}

// 将客户端连接存储到 Map
func HandleConnection(c *gin.Context) {
	conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
	if err != nil {
		fmt.Printf("客户端连接协议升级失败: %v\n", err)
		return
	}
	Clients[conn] = true
}

// 等待消息中
func WaitMessage() {
	go func() {
		for {
			select {
			case msg, ok := <-MsgQueue:
				if ok {
					for client := range Clients {
						err := client.WriteMessage(websocket.TextMessage, []byte("Go语言消息中心: "+string(msg)))
						if err != nil {
							fmt.Printf("消息推送失败: %v\n", err)
						}
					}
				}
			default:
				// 避免忙等
				time.Sleep(500 * time.Millisecond)
			}
		}
	}()
}

在 go_websocket 项目中创建 message 控制器。这个控制器的主要作用是接收外部的消息内容,然后推送到 MsgQueue 通道中,这里提供的是 API 接口,这个接口可以在外部的后台系统调用。这里和 PHP 中有一点不同的是,在 Go 中无需引入像 Redis 一样的第三方组件,而是利用自身的 Channel 特性即可实现消息的传递。

// ./php_to_go/go_websocket/app/controller/message.php
package controller

import (
	"net/http"

	"github.com/gin-gonic/gin"
)

func PushMsg(c *gin.Context) {
	// 接收 GET 参数
	content := c.Query("content")
	if len(content) == 0 {
		c.JSON(http.StatusOK, gin.H{
			"msg":  "内容不能为空",
			"code": -1,
		})
		return
	}

	// 往通道推送消息
	MsgQueue <- content

	c.JSON(http.StatusOK, gin.H{
		"msg":  "ok",
		"code": 0,
	})
}

运行
go run main.go
启动服务,然后进行消息推送测试。

通过这两个简单的例子,我相信大家已经对 WebSocket 技术已经有所了解吧。从例子中也可以看出来,其实在 PHP 和 Go 中实现上有所区别,PHP 中需要启动两个服务,一个是 HTTP 服务,一个是 WebSocket 服务,而且两者服务直接都是单独的进程,不能相互通信,需要额外借助第三方中间件 Redis 来实现数据的传输。反观 Go 中直接一个服务涵盖了 HTTP 服务和 WebSocket 服务,共享一个进程的数据资源,通过使用 Channel 通道传递消息。

此外,在 PHP 中需要使用 Timer 异步定时器来读取 Redis 消息队列中的数据,不能用 for 循环或者 Redis 的阻塞队列,因为它会阻塞整个进程的执行。而在 Go 中直接开启一个协程,在协程中等待通道中的消息即可,会一直阻塞到消息的到来,而且它不会阻塞整个进程的执行,由此可见在这个例子中 Go 相较于 PHP 的优势显著。最后可能有些从来没有使用过 WebSocket 技术的朋友,可能看完这篇文章之后也依然会云里雾里,所以建议这些朋友可以自己亲自实践一下文中的案例,实践过后我相信你会别有一番技术体验。如果有想要获取完整案例代码的朋友,可以在公众号内回复「2463」即可,希望对大家能有所帮助。

感谢大家阅读,个人观点仅供参考,欢迎在评论区发表不同观点。

欢迎关注、分享、点赞、收藏、在看,我是微信公众号「码农先森」作者。