2024年3月

一、背景

在Kafka的组成部分(Broker、Consumer、Producer)中,设计理念迥异,每个部分都有自己独特的思考。而把这些部分有机地组织起来,使其成为一个整体的便是「网络传输」。区别于其他消息队列的方式(
RocketMQ处理网络部分直接使用成熟的组件Netty
),Kafka则是直接对java的NIO进行了二次包装,从而实现了高效的传输

然而处理网络相关的工作是非常复杂的,本文我们只聚焦于网络传输的Producer端,而Producer端也只聚焦在消息发送的部分,使用场景带入的方式来分析一下消息发送的环节,Producer是如何将其扔给网络并发送出去的

二、概述

我们首先回想一下Producer消息发送的整体流程

  1. 客户端线程会不断地写入数据,当前线程并不会阻塞,而是马上返回。这个时候消息被Producer放在了缓存内,消息并没有真正发送出去
  2. Producer内部为每个Partition维护了一个RecordBatch的队列,先进先出的模式,统称为RecordAccumulator,数据第一步会先放在这个组件中。放在这个组件中的数据什么时候会真正发送出去呢?这里其实是“大巴车”逻辑(
    大巴车逻辑是指:1、如果某辆大巴车已经坐满人了,这个时候无条件立即发车。2、虽然本辆大巴车没有坐满人,但是车已经在原地等待了1个小时,即便是只有1位乘客,也要立即发车
    ),以下2个条件满足其一即可:
    1. RecordBatch已满,无法写入新数据
    2. RecordBatch虽然还未满,但是已超时
  1. 为了给RecordAccumulator提速,又引申出了MemoryPool的概念,它主要的作用是一次性分配了一块内存池,避免每次RecordBatch新建时,临时开辟内存空间
    1. 为什么开辟内存空间会很慢呢? 单纯分配一个连续的内存空间不会有太多的耗时,这里主要是JDK为ByteBuffer的置0操作,byte[]数组开辟空间耗时也是同理。例如DirectByteBuffer.java的构造函数是这样实现的
      unsafe.setMemory(base, size, (byte) 0);
  1. 以上所有部分均不涉及网络相关操作,真正网络发送/接收的逻辑是放在了Sender.java线程中。Sender线程会不断地从RecordAccumulator中拉取满足条件的消息记录,从而与Broker建联并发送

以上是Producer发送消息的主流程,本文将会聚焦在Sender网络线程及其相关的逻辑

三、Java NIO

因为Kafka的网络实现是对Java NIO的二次封装,因此在真正开始分析之前,我们有必要先对NIO做个简单回顾

3.1、Server端

public void startServer() throws IOException {
    // Selector选择器,NIO中的核心组件,也就是用它来监听所有的网络事件
    selector = Selector.open();
    // 打开服务端的ServerSocketChannel,只有服务端需要打开ServerSocket
    server = ServerSocketChannel.open();
    // 为服务端绑定端口,之后所有的client均连接次端口
    server.bind(new InetSocketAddress(PORT));
    // 这里配置为非阻塞
    server.configureBlocking(false);
    // 主要是将ServerSocketChannel与selector进行绑定
    server.register(selector, SelectionKey.OP_ACCEPT);

    SocketChannel client;
    SelectionKey key;
    Iterator<SelectionKey> keyIterator;

    while (true) {
        // 调用select()方法,如果有新的请求,则马上返回,否则最多阻塞100ms
        int select = selector.select(100);
        if (select == 0) {
            continue;
        }

        // 将所有的事件拿出来,准备迭代
        keyIterator = selector.selectedKeys().iterator();
        while (keyIterator.hasNext()) {
            key = keyIterator.next();
            if (key.isAcceptable()) {        
                // 处理新进来的链接请求
                operateConnectEvent(key);
            }
            if (key.isReadable()) {
                // 处理读取事件的请求
                operateReadEvent(key);
            }
            // 事件处理完后,将其移除
            keyIterator.remove();               
        }
    }
}

想必大家对上面的demo不会太陌生,这就是JDK给我们提供的server端启动端口监听的经典demo,这里需要注意的是:处理读写请求时,尽量开辟线程来异步处理,避免影响selector监听线程

3.2、Client端

public void startClient() throws IOException {
    // 客户端打开自己的Selector
    selector = Selector.open();                           
    // 与Server端开始建联
    socketChannel = SocketChannel.open(new InetSocketAddress(port));
    // 设置非阻塞
    socketChannel.configureBlocking(false);
    // 注册读时间到selector上,这样通过selector就可以监听网络读取的事件了
    SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
    // attach方法并没有实质的含义,一般是将自己业务中的辅助类绑定在key上,方便后续的读取及处理
    selectionKey.attach(null);

    Iterator<SelectionKey> ikeys;
    SelectionKey key;
    SocketChannel client;
    try {
        while (flag) {
            // 调用此方法一直阻塞,直到有channel可用
            selector.select();   
            ikeys = selector.selectedKeys().iterator();
            while (ikeys.hasNext()) {
                key = ikeys.next();
                if (key.isReadable()) {    
                    // 处理读事件,一般开多线程处理
                    handleReadEvent();
                }
                ikeys.remove();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

主要思想也是通过selector监听不同的事件,一旦获取到事件,便开线程异步处理,不阻塞selector,从而达到高效通信的目的

Server端+Client端不足百行的demo代码,会被Kafka如何包装呢?

四、Sender线程概述

我们在具体介绍Sender线程之前,首先对其有个全貌的认知,也就是先整体后细节。而本节就是一个整体认知,虽然很多细节不会展开,但是对网络线程的理解至关重要

4.1、单网络线程

Sender线程是在KafkaProducer对象初始化启动的,启动后便是一个无限循环的调用。run()方法如下

@Override
public void run() {
    log.debug("Starting Kafka producer I/O thread.");

    // main loop, runs until close is called
    while (running) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    ....  // 此处是一些优雅关机的代码
}

由此我们可以得出结论

  • Sender线程是一个单线程,一个KafkaProducer只会对应一个网络线程
  • Sender伴随Producer的启动而启动,同时也伴随Producer的消亡而消亡

其实这也就解释了当我们启动一个Producer进行压测的时候,通常不能把带宽打满。因为网络线程所做的事儿,无非是将用户生产的数据从JVM中拷贝至网卡,而这个动作又是典型的cpu密集型,单个线程确实无法提高资源利用率

而相比较Broker而言,Broker提供了更为丰富的线程数量配置策略,比如网络线程数的配置
num.network.threads
及IO线程数的配置
num.io.threads
等,为什么Producer的网络线程要配置为单个呢?我个人认为主要是以下原因:

  • Producer客户端的定位;虽然Producer与Broker都存在网络传输的功能,但是Producer终究只是一个客户端,通常客户端所在的机器配置是比不上Broker端的,因此单个网络线程足以满足绝大部分场景的需求
  • 容易横向扩展;我们可以通过简单地启动多个Producer从而实现性能的提升,而这个操作是轻量的
  • 开发的复杂性;即便是在单网络线程的case下,Producer需要处理数据收集、发送、接收、维护元数据等,复杂性已然不低,如果需要单KafkaProducer同时启动多个网络线程的话,势必带来较大的复杂性

4.2、Sender概览

由上节我们知道Sender线程一直在重复调用runOnce()方法,这个方法又做了什么操作呢?

void runOnce() {
    if (transactionManager != null) {......}

    long currentTimeMs = time.milliseconds();
    // 拉取已经准备好的数据
    long pollTimeout = sendProducerData(currentTimeMs);
    // 执行网络数据发送
    client.poll(pollTimeout, currentTimeMs);
}

简单概括就是两件事儿:

  1. 将RecordAccumulator中已经准备好的数据,进行字节粒度的协议编码,最终放入待发送区
  2. 真正执行NIO数据发送、接收响应

五、线程相关类

在NIO中很重要的2个概念是:Selector及SelectionKey,即一个是多路复用器,一个可以简单认为是Channel。Kafka分别对这个概念进行了包装,整体的类图如下:

5.1、NetworkClient

顾名思义,这个类是为客户端而服务的,它是所有客户端访问网络的
总入口
,他只实现了一个接口KafkaClient,也是KafkaClient的唯一实现类。接口有如下方法:

  • boolean isReady(Node node, long now);
  • boolean ready(Node node, long now);
  • long connectionDelay(Node node, long now);
  • long pollDelayMs(Node node, long now);
  • boolean connectionFailed(Node node);
  • AuthenticationException authenticationException(Node node);
  • void send(ClientRequest request, long now);
  • List<ClientResponse> poll(long timeout, long now);
  • void disconnect(String nodeId);
  • void close(String nodeId);
  • Node leastLoadedNode(long now);
  • int inFlightRequestCount();
  • boolean hasInFlightRequests();
  • int inFlightRequestCount(String nodeId);
  • boolean hasInFlightRequests(String nodeId);
  • boolean hasReadyNodes(long now);
  • void wakeup();
  • ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder,long createdTimeMs, boolean expectResponse);
  • ClientRequest newClientRequest(String nodeId,AbstractRequest.Builder<?> requestBuilder,long createdTimeMs,boolean expectResponse,int requestTimeoutMs,RequestCompletionHandler callback);
  • void initiateClose();
  • boolean active();

可以看到大部分都是与网络相关的,而里面比较重要且高频的方法不外乎以下两个,也是本文着重展开介绍的2个方法

  • void send(ClientRequest request, long now);
  • List<ClientResponse> poll(long timeout, long now);

字面含义,即一个发送、一个接受,然而这里封装的这2个方法却是更偏重业务语义

void send(ClientRequest request, long now);

  • 对消息进行协议编码
  • 是将消息放入待发送区,但并不会真正出发网络请求

List<ClientResponse> poll(long timeout, long now);

而poll方法中则包含了真正网络的发送与接收,也就是
只有
poll方法

会调用Java NIO的相关api接口


  • 真正提交网络发送
  • 处理发送成功的请求列表
  • 处理接收到的response
  • connection相关处理
  • .......

后续的分析也主要围绕这两个入口方法展开

NetworkClient作为网络请求的入口类,很多关于底层网络的操作都是交由Selector来实现的,而NetworkClient则更多的将自己的职责放在了网络上层的建设上,诸如维护发送列表、inFlightRequests等业务控制上

List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutConnections(responses, updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);

5.2、Selector

注意,与nio包中Selector不同的是,这里的Selector的全路径是
org.apache.kafka.common.network.Selector
,我们看一下它的主要成员变量,也大致能猜到它在网络请求中发挥哪些功能

private final java.nio.channels.Selector nioSelector;
private final Map<String, KafkaChannel> channels;
private final Set<KafkaChannel> explicitlyMutedChannels;
private final List<NetworkSend> completedSends;
private final LinkedHashMap<String, NetworkReceive> completedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
private final Map<String, KafkaChannel> closingChannels;
private Set<SelectionKey> keysWithBufferedRead;
private final Map<String, ChannelState> disconnected;
private final List<String> connected;
private final List<String> failedSends;
private final ChannelBuilder channelBuilder;
.........
  • 它持有nio的java.nio.channels.Selector引用,这样可以直接进行一些底层的网络操作,比如真正与broker建联
  • 所有已建联的channel列表,这样通过nodeId可以快速找到对应的KafkaChannel
  • 保存已完成发送的列表,当NetworkClient需要处理这些请求时,可以直接在这里获取到;注意这个list的实现类就是简单的ArrayList,因为网络线程是单线程,所以不存在并发冲突的问题
  • 同样存储了接收请求列表LinkedHashMap<String, NetworkReceive>,不再展开
  • 以及存储正在关闭的channel、已经关闭的Channel等
  • 。。。。。。

Selector相对比NetworkClient来说,它做了更贴合网络的操作,比如真正与broker建联、维护Channel列表、处理超时时,关闭Channel等等

值得一提的是,Selector对于SocketChannel的配置,我们看一下关于配置的方法
org.apache.kafka.common.network.Selector#configureSocketChannel

private void configureSocketChannel(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize)
        throws IOException {
    socketChannel.configureBlocking(false);
    Socket socket = socketChannel.socket();
    socket.setKeepAlive(true);
    if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socket.setSendBufferSize(sendBufferSize);
    if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socket.setReceiveBufferSize(receiveBufferSize);
    socket.setTcpNoDelay(true);
}

有这样几个配置项:

  • socketChannel.configureBlocking(false); 配置为非阻塞
  • socket.setKeepAlive(true); 保持连接
  • socket.setSendBufferSize(sendBufferSize); 配置发送缓冲区,默认为128K
  • socket.setReceiveBufferSize(receiveBufferSize); 配置接收缓冲区,默认32K
  • socket.setTcpNoDelay(true); 非Delay

我们注意到,上面几个配置都比较好理解,包括发送的缓冲区比接收缓冲区大了不少,因为Producer是典型的发送多,接收少的场景;而TcpNoDelay属性为什么要设置为true呢? 设置为false不是能提高网络性能吗?

其实在TCP中有个Nagle算法,这个算法的目的是减少网络中小包的数量,通俗点来讲就是将诸多的小包整合为一个大包进行传输,这样做的好处是,提高网络利用率,使得整体发送速率更快;弊端是可能会增加某些包的延迟。那Kafka为什么要禁用这个算法呢?其实本质原因就是Kafka在Producer内部已经完成了消息攒批,这些待发送的包无非以下两种状态:

  • 攒批充足,默认是16K,包足够大,不需要
    Nagle算法的干预
  • 攒批不理想,待发送的数据可能很小;但是即便是数据很少,这些消息也在批次中停留了足够久的时间,这个时候唯一要做的就是尽快把数据发出去,不要再帮我为了优化TCP而增加延迟了,否则超过Producer等待的最大时长,Producer会将本次请求标记超时。其次这种攒批不理想的场景,通常是Producer发送量很小,数据在时间轴上很稀疏,即便是启用了Nagle算法,大概率也不能产生TCP变大包的预期

总结一句话就是Producer的聚批做的已经足够好了,Nagle算法的介入只会带来负优化

5.3、KafkaChannel

上文可知,Selector管理了所有的Channel,而KafkaChannel则是与某个broker连接的具体通道。Selector的各类网络请求也均交给KafkaChannel来执行。

需要说明的是,KafkaChannel其实是通道注册的时候attach在SelectionKey上的,也就是KafkaChannel与SelectionKey是1对1的关系。以下是KafkaChannel所对外暴露的方法

除了网络相关的操作外,KafkaChannel还有一个很重要的特性,就是缓存当天通道的即将要发送的数据,以及接收到的数据

  • private NetworkSend send;
  • private NetworkReceive receive;

当然放在这里的数据都是已经经过编解码完毕的数据

小结:3个类的总代码行数超过了3000行,是Producer发送消息的核心类,我们先对这3个类有个大致的概念,以及它们每个类主要实现的功能点,这样非常有益于对整体的认知。至于Kafka将网络操作的相关操作抽象为这3个类是否合理,就仁者见仁智者见智了,其实这3个类是nio及Kafka自身业务的混合体,它的核心就是为Kafka的客户端网络请求提供服务;笔者认为虽然这样的设计并没有把一些特性或者功能的界限画的很清楚,但是整体运行Kafka的网络功能还是游刃有余的

六、数据准备

单纯地将数据放入聚合器Accumulator还不算是数据准备完成,消息发送需要将其转换为面向字节的网络协议的格式,才是真正具备待发送的前提

6.1、消息编码

发送请求的编码在类org.apache.kafka.common.protocol.SendBuilder中:

private static Send buildSend(
    Message header,
    short headerVersion,
    Message apiMessage,
    short apiVersion
) {
    ObjectSerializationCache serializationCache = new ObjectSerializationCache();

    MessageSizeAccumulator messageSize = new MessageSizeAccumulator();
    header.addSize(messageSize, serializationCache, headerVersion);
    apiMessage.addSize(messageSize, serializationCache, apiVersion);

    SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
    builder.writeInt(messageSize.totalSize());
    header.write(builder, serializationCache, headerVersion);
    apiMessage.write(builder, serializationCache, apiVersion);

    return builder.build();
}

编码分2部分,首先需要计算当前类型编码占用空间的总大小,然后将内容进行逐一填充;这里Kafka将各类消息定义了统一的接口 org.apache.kafka.common.protocol.Message,这样每种类型的消息都写好各自的编解码协议即可,类似于命令模式,对各类编解码进行了解耦,方便后续的扩展与维护。而Message接口的实现类居然有
357
个之多,可见其软件设计的复杂度

如此之多的实现类,我们无法穷举,因此以发送消息类型的Message为demo来阐述一下编码的格式;任何消息类型的网络编码格式均分为header与body两部分,发送消息类型的header与body分别为

  • org.apache.kafka.common.message.RequestHeaderData
  • org.apache.kafka.common.message.ProduceRequestData

6.1.1、header编码

首先看header部分的编码

下面分别对上述字段进行简要说明

  • api key
    • 协议的类型编码,即每种类型的协议编码均不同,例如消息发送-0,消息拉取-1,获取位点-2,获取metadata-3等等,2.8.2版本的kafka的协议类型已经到达了64种,具体可以查看枚举类
      org.apache.kafka.common.message.ApiMessageType
  • api key version
    • 根据调整会不断迭代升级,2.8.2版本已经升级到了9
  • correlation id
    • 相关性id,客户端指定,从0开始累加,服务端返回时,也需要回应同样的编号,这样客户端就可以将一次request跟response联系起来。这里可以看到,相关性id的长度是4个字节,难道不怕越界吗?其实相关性id不需要保证唯一性,当达到了int最大值后,下一次从0继续即可
  • client id length
    • 客户端id其实是一个字符串,比如“producer-1”,因为是可变内容,因此照例使用length+content的方式进行存储
  • client id content
    • 同上,真正存储 client id 的内容
  • tagged field

另外我们还留意到某些字段使用了varint,这个又是什么鬼呢?简单来说就是可变长度的int,还有varlong等类型。当我们想将一个字段设置为int型,而大多数的情况下,它可能只占用了很小的空间,例如0、1、2等,这个时候可变长度就派上了用场,本文不再对此展开,相关的论文读者可参考
http://code.google.com/apis/protocolbuffers/docs/encoding.html

Kafka中varint的实现:

public static void writeUnsignedVarint(int value, ByteBuffer buffer) {
    while ((value & 0xffffff80) != 0L) {
        byte b = (byte) ((value & 0x7f) | 0x80);
        buffer.put(b);
        value >>>= 7;
    }
    buffer.put((byte) value);
}

6.1.2、body编码

消息发送对应的body编码实现类是
org.apache.kafka.common.message.ProduceRequestData

虽然body真正的内容会比header多很多,但是其编码协议并不复杂,大部分字段上图均已说明。值得一提的是,Kafka协议为了压缩协议体积,可谓“无所不用其极”,还有很多很多细节层面的优化,也正是由于这些一点一滴的积累,成就了Kafka在消息队列中行业大佬的地位

6.2、TCP粘包/拆包

这里没有太多需要展开论述的,Kafka处理粘包/拆包的问题,采用的就是经典的length+content的方式;也就是首先写入4个字节的包长度,后面紧跟消息内容;而拆包时,先读取4个字节,继而完整读取后续的消息

6.3、零拷贝?

细心的读者可能会发现,在构造Send类的时候,也就是数据编码的时候
org.apache.kafka.common.protocol.SendBuilder#buildSend
,有这样的代码:

SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
builder.writeInt(messageSize.totalSize());
header.write(builder, serializationCache, headerVersion);
apiMessage.write(builder, serializationCache, apiVersion);

感觉像是所有的协议层均用堆内存的ByteBuffer来构建,但是消息体本身却让它走零拷贝。消息发送也可以零拷贝吗?消息本身不是已经进入到了JVM堆内存中了吗?零拷贝又是如何实现呢

而且我们在类
org.apache.kafka.common.message.ProduceRequestData.PartitionProduceData#addSize
中也发现了操作零拷贝的代码
_size.addZeroCopyBytes(records.sizeInBytes())

但是在真正写入消息记录的时候,却只是将消息记录放入了SendBuilder的成员变量中
org.apache.kafka.common.protocol.SendBuilder#buffers
。究竟是怎么回事儿呢?

原来这里闹了乌龙,SendBuilder也仅是复用了
org.apache.kafka.common.protocol.MessageSizeAccumulator
类的功能,而这个类除了给Producer使用外,还会给消息拉取、同broker内同步流量等使用,在消费的场景确实用到了零拷贝的能力,而Producer却是一定无法执行零拷贝的,因为消息体已经存在于了JVM堆中

6.4、放入待发送区

经过上述一大圈工作,Kafka将编码好的消息封装进入了
org.apache.kafka.common.network.ByteBufferSend
中,接下来就是放入网络待发送区了

其实放入待发送区就是将ByteBufferSend放入KafkaChannel的成员变量send中,
org.apache.kafka.common.network.KafkaChannel#setSend

public void setSend(NetworkSend send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
    this.send = send;
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

因为马上就要执行write操作了,因此开始监听channel的OP_WRITE写入事件

有个细节需要注意下,这里会判断
send != null
,这个网络send会有不等于null的时候吗?这里先卖个关子,后文会提及。

另外有同学说,“Producer有两个队列,一个攒批队列,一个网络待发送的队列”,我们通过读源码发现,这个说法肯定是有问题的,Sender网络线程只有一个,也不存在网络待发送队列,而是每个Channel在同一时刻只会发送一个Send

七、执行发送

7.1、发送流程

真正执行发送的逻辑反而变得简单,入口方法为
org.apache.kafka.clients.NetworkClient#poll

最终会调用至KafkaChannel的write()方法

public long write() throws IOException {
    if (send == null)
        return 0;

    midWrite = true;
    return send.writeTo(transportLayer);
}

这里简单提一下接口
org.apache.kafka.common.network.TransportLayer
,这个接口的实现类有2个:PlaintextTransportLayer、SslTransportLayer,也就是Kafka抽象了一个TransportLayer层,通过对TransportLayer层的切换来实现网络加解密的动作,这样把对SSL的操作放在了最底层,即便于维护,又不会对上层的代码产生影响,的确是高

继续往下追的话,我们在PlaintextTransportLayer发现了NIO的jdk调用

public int write(ByteBuffer[] srcs) throws IOException {
    return socketChannel.write(srcs);
}

至此写入动作完结

7.2、只发送一次?

通常我们向网络或者文件发送ByteBuffer时,为了确保ByteBuffer中的内容全部发送完毕,会一直判断ByteBuffer的remaining()已经为空,才停止发送,这样能保证将ByteBuffer中的数据全部发送出去,如下:

while (byteBuffer.remaining() > 0) {
    socketChannel.write(byteBuffer);
}

但在真正执行发送的代码中,我们注意到,调用网络请求发送的代码中,只触发了
一次

public long writeTo(TransferableChannel channel) throws IOException {
    long written = channel.write(buffers);
    if (written < 0)
        throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
    remaining -= written;
    pending = channel.hasPendingWrites();
    return written;
}

再结合6.4小节中,如果发现send属性不为空,则会抛出异常:

public void setSend(NetworkSend send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
    this.send = send;
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

抛出异常后,Producer就会把当前的Channel关闭。那我们能否得出这样一个结论:一旦某个Channel中的数据不能一次性发送完毕,Kafka就会抛出异常,然后关闭当前Channel ?

答案是否定的,因为在数据准备准备阶段,Kafka会判断,如果当前的Channel中有待发送数据,则不会从Accumulator中拉取该Channel的数据,而在发送阶段,会任然将未发送出去的数据再次执行发送

org.apache.kafka.clients.NetworkClient#isReady
的方法的相关调用如下

// org.apache.kafka.clients.NetworkClient#isReady
@Override
public boolean isReady(Node node, long now) {
    return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString(), now);
}

// org.apache.kafka.clients.NetworkClient#canSendRequest
private boolean canSendRequest(String node, long now) {
    return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
        inFlightRequests.canSendMore(node);
}

// org.apache.kafka.clients.InFlightRequests#canSendMore
public boolean canSendMore(String node) {
    Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
    return queue == null || queue.isEmpty() ||
           (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}

// org.apache.kafka.common.network.ByteBufferSend#completed
public boolean completed() {
    return remaining <= 0 && !pending;
}

Kafka为什么要这样设计,一次性将数据发送出去不好吗?一定要这样费尽周章的各种判断吗?

这里正印证了那句老话:细节之处见功力!通常情况下,当我们调用write()接口,向操作系统发送网络数据时,只要网络不太繁忙,一般都是能够一次性将数据发完的,OS也会尽量将数据全部发送出去。而如果一次没有将缓冲区的数据发送完毕,那很有可能印证出当前channel的网络压力非常大,如果采用不发送完毕不罢休的逻辑,那很有可能当前Sender线程要在这里“卡”很久,而Sender我们知道是单线程的,他是会向多个Broker也就是多个Channel发送数据的,我们不能因为一个Broker的网络阻塞,而影响了整个Producer的吞吐。当然未发送的数据也不是不管了,而是等到下一个周期再尝试发送,这样即便是某个Broker网络拥塞了,不会影响其他Broker的吞吐

Kafka在接受网络数据的时候,同样也是采用“只接受一次”的策略

八、处理响应

处理响应的过程与发送数据一样,入口均在
org.apache.kafka.clients.NetworkClient#poll
中,相比较发送而言,处理响应是一个相对轻量的操作,主要经历以下几个步骤:

  1. 接收网络read信号
    1. 这个的触发代码就是JDK的select,
      this.nioSelector.selectedKeys()
      ,只要有read信号就会进来
  1. 读取4字节的header
    1. 有同学会说,4个字节,还有可能读不满吗?的确,因为虽然只读4个字节,但也是通过TCP打包进行传输,很有可能出现粘包的情况,只读取到1、2个字节。如果遇到读不满的情况,Producer会将已读取的数据暂时放入ByteBuffer缓存中,等待下一次的Sender线程的轮循,直到读取满4个字节
  1. 读取消息体X字节
    1. 消息体的读取跟header一致,只不过读取header的时候,明确知道是4个字节,而读取body的时候,是一个动态值,需要读取从header获取
  1. 将消息放入待处理列表
  2. 消息协议解码
    1. 这里正好是消息发送时的逆操作,进行协议解码。需要注意,消息解码分两步,一个是header解码,一个是body解码
    2. 主要注意的是,header解码的时候,会判断response的correlationId与request的是否一致,如果发现匹配不上,将会抛出异常
    3. 解码的入口方法为
      org.apache.kafka.common.requests.AbstractResponse#parseResponse(org.apache.kafka.common.protocol.ApiKeys, java.nio.ByteBuffer, short)
      里面根据请求类型的不同,分别做了对应的处理,此处不再赘述
  1. 执行回调函数
    1. Producer的
      org.apache.kafka.clients.NetworkClient#inFlightRequests
      成员变量存储了所有已经发送给Broker但是还未收到响应的请求列表,这个列表默认大小是5,即最多同时允许5个请求没有收到响应,且里面存储了回调函数
      org.apache.kafka.clients.NetworkClient.InFlightRequest#callback
      ,方便收到响应后,触发后续的业务逻辑,比如归还MemoryPool等

九、总结

至此,一次网络的发送与接收便告一段落,不过需要指明的是,本文仅仅是阐述了消息发送时候,涉及网络相关部分的主流程,还有很多细节并没有展开,比如Selector是如何关闭Channel的、Channel遇到不预期异常后如何触发重连等。另外Producer还有很多请求类型,比如获取meta数据、心跳、sync api version等,不过这些连接类型虽然与消息发送不一样,但是网络流程均是相通的。网络策略是Kafka的基建,对网络部分有个整体的了解,有利于我们更快地学习、吸收Kafka

虽然 talk is cheap, show me the code,但我本人不喜欢在技术的文章中,黏贴大量的代码段,因为要读源码的话,读者直接在github上拉取对应版本代码阅读更方便。不过由于Kafka网络部分相对还是比较复杂的,也掺杂了很多业务处理的逻辑,本文还是黏贴了很多关键部分的代码以及出处的method,所幸代码篇幅都很短,希望不要给阅读带来不便

简单总结一下Producer网络部分的特点

  • 异步
    ;异步处理也是Producer的整体特点,Producer将网络线程与发送线程独立开来,也就为消息攒批提供了底层支持;同时接收到消息响应后,异步调用callback等,设计都非常合理
  • 细节
    ;我理解Kafka的高吞吐并不是某个设计带来的宏利,而是对于诸多细节完美处理而实现的。例如屏蔽
    Nagle算法、一次发送/接收、可变长度编码、使用队列缓存等等
  • 原生JDK NIO
    ;Kafka对NIO的接口进行了二次封装,不过封装的很轻量,基本上均是使用JDK的接口。此外使用header+body的方式解决了TCP粘包/拆包的问题

在当今数字化时代,构建高效、可靠的分布式系统是许多企业和开发团队面临的挑战。微软的 Orleans 框架为解决这些挑战提供了一个强大而简单的解决方案。本文将介绍 Orleans 的核心概念,并通过一个简单的示例代码来演示其用法。

什么是 Orleans?

Orleans 是由微软开发的一个开源分布式应用框架,它基于 Actor 模型,采用了一种称为 "Virtual Actor" 的概念。

在 Orleans 中,应用程序被分解为多个独立的 Actor 实体,每个 Actor 都有自己的状态和行为,能够独立地处理消息和计算。

什么是Actor

Actor 模型是一种并发计算模型,旨在简化并发编程,特别适用于构建分布式系统。

在 Actor 模型中,计算单元被称为 Actor,每个 Actor 都是独立的个体,具有自己的状态、行为和邮箱。Actors 之间通过消息传递进行通信,而不共享内存,从而避免了传统并发编程中常见的锁和共享状态问题。

Orleans 能应用于哪些场景?

Orleans 框架适用于各种不同的应用场景,包括但不限于:

  • 实时数据处理:例如实时分析、实时推荐系统等。
  • 在线游戏:构建大规模多人在线游戏(MMOG)。
  • 物联网(IoT):处理大规模传感器数据和设备状态。
  • 分布式计算:执行复杂的分布式计算任务和任务调度。

Orleans 如何避免了锁的使用

Orleans 使用了一种异步消息传递的方式来避免锁的使用,Grain 之间的通信是异步的,而不是使用传统的同步锁机制,从而避免了死锁和性能下降的问题。

Orleans 中的 Grain 与 Silo

Grain:Grain 是 Orleans 中的基本执行单元,代表了应用程序的业务逻辑和状态。每个 Grain 都有自己的状态和行为,能够独立地处理消息和进行计算。

Silo:Silo 是 Orleans 中的执行节点,负责执行和协调所有的 Grains。Silo 之间通过网络进行通信,构成一个分布式的 Orleans 集群。Grains 在 Silos 中执行,通过 Silos 来实现分布式部署和水平扩展。

示例代码

下面是一个简单的 Orleans 示例代码,演示了如何定义一个简单的 Grain 类并在 Silo 中进行部署:

首先安装Neget包

<PackageReference Include="Microsoft.Orleans.Server" Version="8.0.0" />

public interfaceIHelloGrain : IGrainWithIntegerKey
{
Task
<string>SayHello();
}
public classHelloGrain : Grain, IHelloGrain
{
public Task<string>SayHello()
{
return Task.FromResult("Hello from Orleans 7.0!");
}
}
classProgram
{
static async Task Main(string[] args)
{
var host =Host.CreateDefaultBuilder()
.ConfigureServices((context, services)
=>{
services.AddOrleans(builder
=>{
builder
.UseLocalhostClustering()
.Configure
<ClusterOptions>(options =>{
options.ClusterId
= "dev";
options.ServiceId
= "OrleansExample";
});
});
})
.Build();
awaithost.StartAsync();var client = host.Services.GetRequiredService<IClusterClient>();var grain = client.GetGrain<IHelloGrain>(0);var response = awaitgrain.SayHello();
Console.WriteLine(response);

Console.ReadKey();
awaithost.StopAsync();
}
}

在这个示例中,我们定义了一个名为 IHelloGrain 的接口和一个对应的实现类 HelloGrain,并在主程序中进行了部署和调用。通过这个简单的示例,我们可以看到 Orleans 框架的基本用法以及 Grains 和 Silos 之间的关系。

通过这个示例,读者可以更好地理解 Orleans 框架的核心概念,并在实际应用中尝试构建分布式系统。

Matlab图形属性检查器

和其他语言的绘图不一样的是,Matlab允许我们通过非编程的方式来自定义调整绘图。下面介绍Matlab图形的构成以及几种调整绘图时的常用操作。

图形构成

什么是Figure

当我们使用绘图函数创建图形时,总会弹出一个窗口以显示我们绘制的图形,这整个窗口我们把它叫做Figure或者称为画板(即我们作画需要的载体)。比如像下面的图形就是一个Figure:

尽管在我们进行绘图时,系统为我们给定了一套默认的参数,比如图形的大小,颜色,位置等等,但是这种方式由于缺乏灵活性,不利于我们对图形进行修正比如,若将上面的图形导出则会出现大片的空白区域,后期仍需要进行二次裁剪十分浪费时间,因此我们需要深入了解Figure,以实现我们客制化的需求。

Figure由哪些部分组成

我将Matlab中Figure的最常用组成总结如下:

其中Figure是整个画布,legend是图例显示,axes是我们真正绘图的区域,xlabel,ylabel即x,y轴表示含义,title即图的标题,tick和tick_label为刻度值和在该刻度值上显示的文字。值得注意的是我们在进行绘图时,上面各个部分的位置信息都是参考其上一级的位置信息而言的的。

图形调整

位置调整

位置表示

Matlab中位置通常用向量[left bottom width height]来表示,left表示距离左侧边界的距离,bottom表示距离下边界的距离,width表示图像的宽度,height表示图像的高度。

在表示这些量时有很多单位可以选择,建议选择normalized。

Figure位置调整

在Figure中共有三个位置信息,它们表示的含义大体相近,但亦有所区别。

  • Position 属性是最常用的,用于直接设置图形窗口在屏幕上的位置和大小。
  • OuterPosition 属性则用于获取或设置包括所有装饰元素在内的整个图形窗口的大小和位置。
  • InnerPosition 属性主要用于调整绘图区域的大小和位置,而不考虑窗口的边框、标题栏等装饰元素。
    在实际使用上,我们往往只用设置其中一个就可,实验上来看,Position和OuterPosition似乎没有太大区别。另外Figure的位置信息是参考屏幕左下角的点而言的。

axes位置调整

axes同样有上面上个位置信息,由于Figure是其上一级,因此它的位置信息是参考Figure的左下角而言的。如下图:

其他部分位置调整

对于其他部分的位置而言,大多数都包含了Position信息,此外对于title还有额外的HorizontalAlignment和VertivalAlignment信息(可以设置为left,right和center),当设置为居中时,那么将以居中点来确定到参考点的距离。

坐标轴刻度调整

在我们进行绘图时,有时候我们期望坐标的刻度以日期形式显示,这时候x,y的tick_label就排上用场了。比如设置xtick = [1,2,3,4], xtick_label为['a','b','c','d']那么实际出来的效果就是在刻度为[1,2,3,4]的地方显示a,b,c,d。
如下图所示:

放大指定区域的图形

局部区域放大图的原理很简单,其实就是copy一份一模一样的图像,然后重新设置的新图的xlim和ylim然后对图片放大。
下面展示,设置放大图的全过程。

  • 拷贝一份图像放一起

  • 设置拷贝图像的显示范围(属性检查器->标尺->xlim,ylim)

    这样就基本上完成了图像的放大

  • 将放大图像移动到原图像中,并添加箭头

前言:

在这一部分中,我们将深入讨论动态程序集中模块的概念以及如何构建和管理模块。

1、模块的概念:

模块是动态程序集中的基本单位,它类似于一个独立的代码单元,可以包含类型、方法、字段等成员。

在动态程序集中,模块扮演着组织代码和实现代码复用的关键角色。

它们允许开发人员将相关功能和数据组织在一起,并在需要时进行引用和重用。

一个程序集可以包含一个或多个模块,这种模块化的设计有助于提高代码的可维护性和可扩展性。

通俗的讲人话:

即在设计上:在运行时,一个程序集可以包含多个模块,每个模块允许用不同的语言编写,比如VB模块混合C#模块。

在使用上:在编绎后,一个程序集只能包含一个模块。

下面来看一个组问答题:

2、程序集和模块的关系问答:

既然一个程序集可以定义多个Module,为什么通过反编绎dll,发现所有的dll文件都只有一个module呢?

在使用 C# 或 .NET Framework 动态创建程序集和模块时,无论你创建了多个模块,最终生成的 DLL 文件通常只会包含一个默认模块。

这是因为在 .NET 中,一个程序集(assembly)通常对应一个 DLL 或者 EXE 文件,而每个程序集只包含一个默认的模块。

即使你在代码中使用 DefineDynamicModule 创建了多个模块,最终生成的 DLL 文件也只会包含默认模块的信息。

其他通过 DefineDynamicModule 创建的模块并不会以独立的形式出现在最终的 DLL 文件中。

实际上,在 .NET 中,程序集可以包含多个模块,但这些附加的模块一般不会直接保存在磁盘文件中,而是在运行时动态加载到程序集中。这也是为什么反编译时只能看到一个模块的原因。

总的来说,即使你在代码中动态创建了多个模块,最终生成的 DLL 文件也只会包含一个默认模块。其他动态创建的模块会以其他方式与程序集关联,并不会直接体现在生成的 DLL 文件中。

了解完程序集与模块的对应关系,下面看看如何创建动态模块:

3、创建动态模块:

使用C# Emit 技术可以在运行时动态创建模块。

首先,需要获得 AssemblyBuilder(这个在构建程序集一文中,已经讲解了 .NET 和 .NET Core 下的相关获取用法)。

然后,通过 AssemblyBuilder 的 DefineDynamicModule方法,即可获得 ModuleBuilder(后续章节会通过它,来添加类型、方法、字段等成员到模块中,从而构建出所需的模块结构)。

下面是一个获得 ModuleBuilder 示例代码:

 AssemblyBuilder ab =......
ModuleBuilder mb
= ab.DefineDynamicModule("一个名称");

4、创建静态模块(仅.NET系列支持):

在.NET中,如果要将该模块持久化到 dll 中,则需要在重载中指定 dll 的名称,如:

 AssemblyBuilder assemblyBuilder =AppDomain.CurrentDomain.DefineDynamicAssembly(assemblyName, AssemblyBuilderAccess.RunAndSave);//......
ModuleBuilder moduleBuilder= assemblyBuilder.DefineDynamicModule("一个名称", dllName + ".dll");

//......
assemblyBuilder.Save(dllName
+ ".dll");

正如问答所说,一个程序集在持久化到文件中时,只能包含一个默认模块。

因此,通过指定和程序集相同的名称,来持久化到相同的程序集中。

如果定义多个模块,都指向同一个程序集名称中呢?

如下图,抛重复的文件名异常:

如果定义多个模块,都给予不同的程序集名称呢?

如下图,每个模块单独生成程序集:

因此,在运行时,可以定义多个模块,模块在运行时可以互动,但持久化到文件中,只能有一个模块。

5、模块间的交互

模块之间可能存在依赖关系,一个模块可能需要引用另一个模块中的类型或成员。

在动态程序集中,可以使用 AssemblyBuilder 来创建程序集并将模块进行组合,从而实现模块间的交互和依赖管理。

通常而言,不建议尝试在程序集中定义多个模块,下面给几个教训的示例:

因情节需要,以下内容中会提前出现 TypeBuilder、MethodBuilder 和 IL代码。

在 .NET 中定义多个模块,并尝试进行交互:

public static voidStart()
{
//创建第一个模块 AssemblyName assemblyName = new AssemblyName("MyAssembly");
AssemblyBuilder assemblyBuilder
=AppDomain.CurrentDomain.DefineDynamicAssembly(assemblyName, AssemblyBuilderAccess.Run);
ModuleBuilder moduleBuilder1
= assemblyBuilder.DefineDynamicModule("Module1");//在第一个模块中定义一个类型 TypeBuilder typeBuilder1 = moduleBuilder1.DefineType("MyClass", TypeAttributes.Public);

MethodBuilder methodBuilder1
= typeBuilder1.DefineMethod("MyMethod", MethodAttributes.Public | MethodAttributes.Static, typeof(void), null);
ILGenerator ilGenerator
=methodBuilder1.GetILGenerator();
ilGenerator.EmitWriteLine(
"Call hello from Module1!");
ilGenerator.Emit(OpCodes.Ret);
typeBuilder1.CreateType();
//在第二个模块中引用第一个模块中定义的类型 MethodInfo myMethod = moduleBuilder1.GetType("MyClass").GetMethod("MyMethod");//创建第二个模块 ModuleBuilder moduleBuilder2 = assemblyBuilder.DefineDynamicModule("Module2");
TypeBuilder typeBuilder2
= moduleBuilder2.DefineType("MyClass2", TypeAttributes.Public);
MethodBuilder methodBuilder2
= typeBuilder2.DefineMethod("MyMethod2", MethodAttributes.Public | MethodAttributes.Static, typeof(void), null);var ilGenerator2 =methodBuilder2.GetILGenerator();
ilGenerator2.EmitWriteLine(
"Exe hello from Module2!");
ilGenerator2.EmitCall(OpCodes.Call, myMethod,
null);
ilGenerator2.Emit(OpCodes.Ret);
typeBuilder2.CreateType();
//assemblyBuilder.LoadModule() var myMethod2 = moduleBuilder2.GetType("MyClass2").GetMethod("MyMethod2");//myMethod.Invoke(null, null); myMethod2.Invoke(null, null);

Console.Read();
}

结果如下:

同样的代码,在 .NET Core 中执行,你将得到以下结果:

所以,你懂的,别折腾这个。

总结:

嗯,构建模块,一行代码的事情,愣是让我写成了一篇教程,太难了,下面进行总结。

在这个入门教程的第三部分中,我们学习了如何使用.NET Emit 构建模块(Module)。

通过创建和定义模块,我们可以更好地组织和管理我们的代码。

在这个过程中,我们了解了如何使用 AssemblyBuilder 和 ModuleBuilder 来动态生成模块。

通过学习构建模块的过程,我们可以更深入地理解.NET Emit 的强大功能,并且能够在运行时动态地生成和加载代码。

构建模块是.NET Emit中非常重要的一部分,它为我们提供了灵活性和扩展性,让我们能够更好地应对各种编程需求。

在接下来的学习中,我们将继续探索.NET Emit的各种功能和用法,不断丰富我们的动态代码生成技能,为我们的项目带来更多可能性。

希望这个入门教程能够帮助你更好地理解.NET Emit,并为你的编程之路增添新的技能和知识!