2024年2月

本文分享自华为云社区《
基于OpenTelemetry实现Java微服务调用链跟踪
》,作者: 可以交个朋友。

一 背景

随着业务的发展,所有的系统都会走向微服务化体系,微服务进行拆分后,服务的依赖关系变得复杂,如果出现了错误和异常,定位的过程将会变得复杂,一个请求可能需要调用很多个服务,所以微服务架构中,分布式链路跟踪的实现至关重要,去跟进一个请求到底有哪些服务参与,参与的顺序又是怎样的,从而达到每个请求的步骤清晰可见。如何快速查询整个请求链路上的信息并呈现出来是解决排查问题复杂度的根本方法。

image.png

二 简介

Java 是世界上最流行的编程语言之一,很多大小项目都是通过Java进行微服务的开发来实现。
本篇博客将以springboot微服务为例,通过使用opentelemetry-java SDK 进行自动埋点以代码无侵入的方式实现微服务的分布式跟踪能力。

三 实践演示

demo项目一共3个service:foo-svc、bar-svc 、loo-svc 。
项目源码可前往:
https://github.com/HFfleming/springboot-trace-demo/tree/autoconfigure

访问效果如下:

image.png

3.1 前提条件

已创建k8s 集群,可使用CCE集群平台作为基础环境。

k8s 集群中已安装opentelemetry-collector组件

k8s 集群中已安装jaeger作为分布式跟踪数据展示的平台

3.2 集成opentelemetry-java-instrumentation

OpenTelemetry 提供了 Java agent(opentelemetry-java-instrumentation)。当附加到应用程序中时,它会修改各种流行库和框架的字节码以捕获遥测数据。可以以多种格式导出遥测数据。还可以通过命令行参数或环境变量配置代理和导出器。最终结果是无需更改代码即可从 Java 应用程序收集遥测数据。

下载otel-java jar包并添加到容器镜像中

前往官方仓库
https://github.com/open-telemetry/opentelemetry-java-instrumentation
下载opentelemetry-javaagent.jar

通过环境变量配置java agent和otlp导出器

  • 通过环境变量的形式配置java agent:
    ENV JAVA_TOOL_OPTIONS="-javaagent:/usr/app/opentelemetry-javaagent.jar"

  • 服务名称:
    ENV OTEL_SERVICE_NAME="foo-svc"

  • 使用otlp协议的导出器:
    ENV OTEL_TRACES_EXPORTER="otlp"

  • 关闭java agent的指标 otlp导出器:
    ENV OTEL_METRICS_EXPORTER="none"

  • 关闭java agent的日志 otlp导出器:
    ENV OTEL_LOGS_EXPORTER="none"

  • 指定OTLP导出器的端点,跨ns的场景下建议写上otel的ns:
    ENV OTEL_EXPORTER_OTLP_ENDPOINT="http://opentelemetry-collector.tracing.svc.cluster.local:4317"

    除了环境变量的形式,也可通过jvm参数形式进行agent和导出器的配置。
    详细配置或者欲开启更多导出信息可参考:
    https://github.com/open-telemetry/opentelemetry-java/tree/main/sdk-extensions/autoconfigure

容器镜像制作

建议重新制作镜像,将opentelemetry-javaagent等otlp基础配置打包在镜像中。后续如果有变动,也可通过deployment中env字段进行修改覆盖。

镜像Dockerfile文件可参照如下:

#基于官方的Maven镜像
FROM maven:
3.8.7-openjdk-18-slim AS build

#将本地代码复制到Docker容器中的
/usr/src/app 目录下
COPY src
/usr/src/foo-app/src
COPY pom.xml
/usr/src/foo-app
COPY opentelemetry
-javaagent.jar /usr/src/foo-app

# 在容器的
/usr/src//foo-app 目录下,运行mvn clean package 命令,构建项目 RUN mvn -f /usr/src/foo-app/pom.xml clean package

# 使用官方的openjdk 镜像作为基础镜像
FROM openjdk:
19-jdk-slim

# 将打包生成的jar文件复制到容器中
COPY
--from=build /usr/src/foo-app/target/*.jar /usr/app/foo-app.jar
COPY --from=build /usr/src/foo-app/opentelemetry-javaagent.jar /usr/app/opentelemetry-javaagent.jar

# 声明服务运行在8080端口
EXPOSE 8080

# 通过环境变量的形式配置java agent并且通过环境变量传递配置属性
ENV JAVA_TOOL_OPTIONS="-javaagent:/usr/app/opentelemetry-javaagent.jar"
# 服务的k8s servicename
ENV OTEL_SERVICE_NAME="foo-svc"
# 使用的是otlp协议的导出器
ENV OTEL_TRACES_EXPORTER="otlp"
# 关闭java agent的指标 otlp导出器
ENV OTEL_METRICS_EXPORTER="none"
## 关闭java agent的日志 otlp导出器
ENV OTEL_LOGS_EXPORTER="none"
# 指定OTLP导出器的端点,跨ns的场景下建议写上otel的ns
ENV OTEL_EXPORTER_OTLP_ENDPOINT="
http://opentelemetry-collector.tracing.svc.cluster.local:4317"

#指定docker容器的启动命令
ENTRYPOINT ["java", "-jar","/usr/app/foo-app.jar"]

镜像推送至镜像仓库,可使用SWR容器镜像仓库

docker build 构建完镜像后,然后docker push 至镜像仓库。
本人demo项目镜像如下,可供读者调试使用:
foo-svc:
swr.cn-north-4.myhuaweicloud.com/k8s-solution/foo-svc:v2
bar-svc:
swr.cn-north-4.myhuaweicloud.com/k8s-solution/bar-svc:v2
loo-svc:
swr.cn-north-4.myhuaweicloud.com/k8s-solution/loo-svc:v2

在k8s集群中部署demo项目

image.png

其中loadgenerator服务是个客户端工具用于访问demo项目。访问信息如下:

image.png

相比之前没有集成opentelemetry-javaagent.jar ,访问信息请求头多了traceparent信息,很明显该信息就是分布式跟踪所需要使用到的

3.3 OpenTelemetry配置遥测数据的接受和导出

在上述环境变量中,通过otlp-grpc协议进行java微服务遥测数据导出的。所以在opentelemetry-collector中的receivers接收器配置中需要配置otlp grpc规则进行数据的接受:

receivers:
otlp:
protocols:
grpc:
endpoint: ${env:MY_POD_IP}:
4317

otel接受到数据后,需要将数据处理后进行导出。导出后端可以是jaeger,通过jaeger进行分布式跟踪数据的展示,需要需要在opentelemery-collector中配置exporter导出器

exporters:
debug: {} #导出器配置log,可记录导出行为
otlp:
endpoint: jaeger
-collector.hu.svc.cluster.local:4317 #jaeger的otlp-grpc端口
tls:
insecure:
true

需要注意此处导出器后端jaeger使用的协议为otlp,如果jaeger-collector service未配置该端口,则会导出失败,建议检查jaeger相关service的配置。

- name: otlp-grpc
port:
4317protocol: TCP
targetPort:
4317

通过pipeline启用otel采集器中配置的各种组件。上面配置了接收器和导出器,如果不在pipeline中声明,则不会启用该组件。启用方式参考:

service: # 用于根据接收器、处理器、导出器和扩展部分中的配置来配置收集器中启用的组件
extensions:
-health_check
pipelines:
traces:
exporters:
-debug-otlp
processors:
-memory_limiter-batch
receivers:
- otlp

完整配置参考如下,这些配置以configmap形式挂载到otel-collector容器中使用。

apiVersion: v1
kind: ConfigMap
metadata:
name: opentelemetry
-collectornamespace: tracing
data:
relay:
|exporters:
debug: {}
otlp:
endpoint: jaeger
-collector.hu.svc.cluster.local:4317tls:
insecure:
trueextensions:
health_check:
endpoint: ${env:MY_POD_IP}:
13133processors:
batch: {}
memory_limiter:
check_interval: 5s
limit_percentage:
80spike_limit_percentage:25receivers:
otlp:
protocols:
grpc:
endpoint: ${env:MY_POD_IP}:
4317service:
extensions:
-health_check
pipelines:
traces:
exporters:
-debug-otlp
processors:
-memory_limiter-batch
receivers:
-otlp

telemetry:
metrics:
address: ${env:MY_POD_IP}:
8888

配置更新后,重启otel-collector容器。查看otel容器日志可以看到otel已经以配置的规则进行工作。

image.png

3.4 Jaeger查看调用链跟踪数据

访问jaeger UI,UI端口为16686。可以看到jaeger已经接收到trace信息,目前已有4条trace,每条trace均有8个span信息。

image.png

查看详细span信息,不仅可以看到服务级别的调用,还能看到方法级别的调用,以及方法级别的耗时。

image.png

点击关注,第一时间了解华为云新鲜技术~

ServerCnxnFactory

用于接收客户端连接、管理客户端session、处理客户端请求。

ServerCnxn抽象类

代表一个客户端连接对象:

  • 从网络读写数据
  • 数据编解码
  • 将请求转发给上层组件或者从上层组件接收响应
  • 管理连接状态,比如:enableRecv、sessionTimeout、stale、invalid等
  • 保存当前的packetsReceived、packetsSent、lastCxid、lastZxid等
  • 继承了Watcher接口,也可以作为监听器

两个实现类:

  • NIOServerCnxn - 基于NIO
  • NettyServerCnxn - 基于Netty

NIOServerCnxnFactory

基于NIO的非阻塞、多线程的ServerCnxnFactory实现类,多线程之间通过queue通信:

  • 1个accept线程,用来接收客户端连接,交给selector线程处理
  • 1-N个selector线程,每个线程会select 1/N个连接,多个selector线程的原因是,由于有大量连接,select()可能会成为性能瓶颈
  • 0-M个socket IO worker线程,做socket读写,如果配置为0则selector线程来做IO
  • 1个清理线程,用于关闭空闲连接

线程数量分配示例:32核的机器,1accept线程,1个清理线程,4个selector线程,64个worker线程。

configure方法

  • 不支持ssl

  • 创建ConnectionExpirerThread线程

  • 根据CPU核数确定各种线程的数量

    int numCores = Runtime.getRuntime().availableProcessors();
    // 32 cores sweet spot seems to be 4 selector threads
    numSelectorThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((float) numCores / 2), 1));
    
    // 64
    numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
    
  • 创建SelectorThread线程

  • 创建ServerSocketChannel、启动监听、设置非阻塞

  • 创建AcceptThread线程

start方法

启动acceptThread、selectorThreads、workerPool、expirerThread线程。

acceptThread线程

1个accept线程,用来接收客户端连接,交给selector线程处理:

  1. select查找acceptable的key

  2. doAccept接受连接

    if (key.isAcceptable()) {
        if (!doAccept()) {
            pauseAccept(10);
        }
    }
    
  3. 给sc(SocketChannel)设置非阻塞、验证远程IP连接数不超过maxClientCnxns(60)、获取SelectorThread开始select读写事件

    // Round-robin assign this connection to a selector thread
    if (!selectorIterator.hasNext()) {
        selectorIterator = selectorThreads.iterator();
    }
    SelectorThread selectorThread = selectorIterator.next();
    // 使用队列缓存SocketChannel
    if (!selectorThread.addAcceptedConnection(sc)) {
        throw new IOException("Unable to add connection to selector queue");
    }
    

selectorThread线程

run方法select读写事件、接受客户连接、为key注册"感兴趣"的事件:

  • run方法

    public void run() {
        try {
            while (!stopped) {
                try {
                    select(); // select读写事件
                    processAcceptedConnections(); // 接受客户连接
                    processInterestOpsUpdateRequests();
                } catch (RuntimeException e) {
                } catch (Exception e) {
                }
            }
        }
        // ...
    }
    
  • 接受客户连接会注册OP_READ、创建NIOServerCnxn、绑定到key上面

    private void processAcceptedConnections() {
        SocketChannel accepted;
        while (!stopped && (accepted = acceptedQueue.poll()) != null) {
            SelectionKey key = null;
            try {
                key = accepted.register(selector, SelectionKey.OP_READ);
                NIOServerCnxn cnxn = createConnection(accepted, key, this);
                key.attach(cnxn); // 绑定到key上
                addCnxn(cnxn); // 维护连接层会话
            } catch (IOException e) {
                //  略
            }
        }
    }
    
  • select到读写事件会交给handleIO方法处理

    private void handleIO(SelectionKey key) {
        IOWorkRequest workRequest = new IOWorkRequest(this, key);
        NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
    
        // Stop selecting this key while processing on its connection
        cnxn.disableSelectable();
        key.interestOps(0); // 重置感兴趣的事件,IO处理完成之后会重新注册读写事件
        touchCnxn(cnxn); // 维护连接层会话,刷新过期时间
        workerPool.schedule(workRequest); // workRequest.doWork方法做异步读写
    }
    
  • 为key注册"感兴趣"的事件

    private void processInterestOpsUpdateRequests() {
        SelectionKey key;
        while (!stopped && (key = updateQueue.poll()) != null) {
            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
            if (cnxn.isSelectable()) {
                key.interestOps(cnxn.getInterestOps());
            }
        }
    }
    

workRequest.doWork方法

workRequest是IOWorkRequest类型对象,doWork会read数据并传递给上层组件:

public void doWork() throws InterruptedException {
    // 略

    if (key.isReadable() || key.isWritable()) {
        cnxn.doIO(key); // 在workerPool线程上执行

        // 略
        touchCnxn(cnxn); // 维护连接层会话,刷新过期时间
    }

    // 略
}

数据包使用 len body 方式传输,read的过程不介绍了,cnxn在read到完整的数据之后会调用readConnectRequest或readRequest方法将数据传递给上层组件:

// 应用层建立连接
private void readConnectRequest() throws IOException, ClientCnxnLimitException {
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
    // ConnectRequest封装:
    // protocolVersion(0), lastZxidSeen(0), timeOut(3s), sessionId(0), passwd(16位byte), readOnly(F)
    ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
    zkServer.processConnectRequest(this, request);
    initialized = true;
}

protected void readRequest() throws IOException {
    RequestHeader h = new RequestHeader();
    // 请求头,封装客户端xid和type由客户端传递过来
    ByteBufferInputStream.byteBuffer2Record(incomingBuffer, h);
    // 转ByteBufferRequestRecord对象,封装请求字节流
    // readRecord将字节流反序列化为指定的Record实现类对象
    RequestRecord request = RequestRecord.fromBytes(incomingBuffer.slice());
    zkServer.processPacket(this, h, request);
}

NettyServerCnxnFactory

基于Netty的ServerCnxnFactory实现。

CnxnChannelHandler类

核心的网络层处理器,此处记录重要代码:

class CnxnChannelHandler extends ChannelDuplexHandler {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final Channel channel = ctx.channel();
        // 略
        // 创建NettyServerCnxn绑定到channel
        NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);
        ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);

        // 略
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            try {
                NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
                if (cnxn == null) {
                    LOG.error("channelRead() on a closed or closing NettyServerCnxn");
                } else {
                    // 读取请求数据
                    cnxn.processMessage((ByteBuf) msg);
                }
            } catch (Exception ex) {
                throw ex;
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
}

cnxn读取请求数据

void processMessage(ByteBuf buf) {
    if (throttled.get()) {
        // 略
    } else {
        if (queuedBuffer != null) {
            appendToQueuedBuffer(buf.retainedDuplicate());
            processQueuedBuffer();
        } else {
            receiveMessage(buf); // 解码逻辑在此方法中
            // Have to check !closingChannel, because an error in
            // receiveMessage() could have led to close() being called.
            if (!closingChannel && buf.isReadable()) {
                if (queuedBuffer == null) {
                    queuedBuffer = channel.alloc().compositeBuffer();
                }
                appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
            }
        }
    }
}

read到完整的数据之后会将数据传递给上层组件:

if (initialized) {
    RequestHeader h = new RequestHeader();
    ByteBufferInputStream.byteBuffer2Record(bb, h);
    RequestRecord request = RequestRecord.fromBytes(bb.slice());
    zks.processPacket(this, h, request);
} else {
    // 应用层建立连接
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
    ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
    zks.processConnectRequest(this, request);
    initialized = true;
}

ZooKeeperServer处理方法

processConnectRequest方法处理连接请求

ZooKeeperServer的processConnectRequest方法用来处理连接请求:

public void processConnectRequest(
        ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {

    long sessionId = request.getSessionId(); // 默认0
    int tokensNeeded = 1;
    // 略

    // ro验证
    if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
        String msg = "Refusing session request for not-read-only client";
        throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
    }
    // 客户端zxid验证
    if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
        String msg = "Refusing session request for client "
                     + cnxn.getRemoteSocketAddress()
                     + " as it has seen zxid 0x"
                     + Long.toHexString(request.getLastZxidSeen())
                     + " our last zxid is 0x"
                     + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                     + " client must try another server";
        throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
    }
    int sessionTimeout = request.getTimeOut(); // 客户端默认30000
    byte[] passwd = request.getPasswd();
    int minSessionTimeout = getMinSessionTimeout(); // 默认tickTime * 2
    if (sessionTimeout < minSessionTimeout) {
        sessionTimeout = minSessionTimeout;
    }
    int maxSessionTimeout = getMaxSessionTimeout(); // 默认tickTime * 20
    if (sessionTimeout > maxSessionTimeout) {
        sessionTimeout = maxSessionTimeout;
    }
    cnxn.setSessionTimeout(sessionTimeout); // 设置超时时长

    // We don't want to receive any packets until we are sure that the session is setup
    cnxn.disableRecv();

    if (sessionId == 0) {
        // 创建新session
        long id = createSession(cnxn, passwd, sessionTimeout);
    } else {
        validateSession(cnxn, sessionId);
        // 杀掉旧的session和连接
        if (serverCnxnFactory != null) {
            serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
        }
        if (secureServerCnxnFactory != null) {
            secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
        }
        // add新session
        cnxn.setSessionId(sessionId);
        // 返回connect响应
        reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
    }
}

// 重点看一下创建新session
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
    if (passwd == null) {
        passwd = new byte[0];
    }
    long sessionId = sessionTracker.createSession(timeout); // 创建Session返回sessionId
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd); // passwd会赋值给request.passwd
    CreateSessionTxn txn = new CreateSessionTxn(timeout);
    cnxn.setSessionId(sessionId);
    // 给业务层处理器提交createSession请求
    // RequestRecord.fromRecord(txn)返回SimpleRequestRecord对象,封装Record对象
    Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
    submitRequest(si);
    return sessionId;
}

processPacket方法处理业务请求

ZooKeeperServer的processPacket方法用来处理业务请求:

public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {

    cnxn.incrOutstandingAndCheckThrottle(h);

    if (h.getType() == OpCode.auth) {
        // 略
        return;
    } else if (h.getType() == OpCode.sasl) {
        // 略
    } else {
        if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
            return;
        } else {
            Request si = new Request(
                cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
            int length = request.limit();
            // 大请求验证
            if (isLargeRequest(length)) { // 默认返回false
                checkRequestSizeWhenMessageReceived(length);
                si.setLargeRequestSize(length);
            }
            si.setOwner(ServerCnxn.me);
            // 提交给业务层处理器
            submitRequest(si);
        }
    }
}

submitRequest流程

  1. 先把request提交给requestThrottler组件
  2. requestThrottler是一个限流(默认不启用)组件,内部使用队列缓存request,异步线程消费队列,将request提交给业务处理器
  3. 直到submitRequest方法,业务处理才离开workerPool线程
if (request != null) {
    if (request.isStale()) {
        ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
    }
    final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
    ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
    // 默认不限流
    if (shouldThrottleOp(request, elapsedTime)) {
      request.setIsThrottled(true);
      ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
    }
    // 提交
    zks.submitRequestNow(request);
}

submitRequestNow方法将请求提交给业务层处理器:

public void submitRequestNow(Request si) {
    // 略
    try {
        touch(si.cnxn); // 刷新session过期时间
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            setLocalSessionFlag(si);
            firstProcessor.processRequest(si); // 提交给业务层处理器
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            // Update request accounting/throttling limits
            requestFinished(si);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (MissingSessionException e) {
        requestFinished(si);
    } catch (RequestProcessorException e) {
        requestFinished(si);
    }
}

Leader客户端业务层处理器链

在之前的文章已经介绍,leader使用LeaderZooKeeperServer作为服务实现类。

本章节介绍"leader处理客户端请求"的流程。

处理器链

// 构建处理器链
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor toBeAppliedProcessor =
        new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
    commitProcessor = new CommitProcessor(
        toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
    commitProcessor.start();
    ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
    proposalProcessor.initialize();
    prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
    prepRequestProcessor.start();
    firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

    setupContainerManager();
}
  • FinalRequestProcessor - 处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾

  • ToBeAppliedRequestProcessor - 维护toBeApplied列表,之后必须是FinalRequestProcessor且processRequest必须同步处理

  • CommitProcessor - 等待commit完成之后调用下游RequestProcessor处理器

  • ProposalRequestProcessor - 发起proposal并将Request转发给内部的SyncRequestProcessor和AckRequestProcessor

    public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        // 内部有维护一个SyncRequestProcessor和AckRequestProcessor
        AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
        syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
    
        forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean(
                FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED);
    }
    
  • PrepRequestProcessor - 通常位于RequestProcessor链开头,为更新请求关联的事务做设置

  • LeaderRequestProcessor - 负责执行本地会话升级,只有直接提交给leader的Request才能通过这个处理器

LeaderRequestProcessor

public void processRequest(Request request) throws RequestProcessorException {
    // 略

    // 默认不支持localSession
    Request upgradeRequest = null;
    try {
        upgradeRequest = lzks.checkUpgradeSession(request);
    } catch (KeeperException ke) {
        // 略
    } catch (IOException ie) {
        // 略
    }
    // 此处upgradeRequest==null
    if (upgradeRequest != null) {
        nextProcessor.processRequest(upgradeRequest);
    }
    // 调用下游processor
    nextProcessor.processRequest(request);
}

PrepRequestProcessor

事务设置:

  • 使用队列缓存request
  • 消费线程从队列拉request设置事务

run方法

public void run() {
    try {
        while (true) {
            ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
            Request request = submittedRequests.take();
            ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
                .add(Time.currentElapsedTime() - request.prepQueueStartTime);
            // 略
            if (Request.requestOfDeath == request) {
                break;
            }

            request.prepStartTime = Time.currentElapsedTime();
            pRequest(request);
        }
    } catch (Exception e) {
        handleException(this.getName(), e);
    }
}

protected void pRequest(Request request) throws RequestProcessorException {
    request.setHdr(null);
    request.setTxn(null);

    if (!request.isThrottled()) {
      pRequestHelper(request);
    }

    request.zxid = zks.getZxid(); // zxid
    long timeFinishedPrepare = Time.currentElapsedTime();
    ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(timeFinishedPrepare - request.prepStartTime);
    nextProcessor.processRequest(request); // 调用下游processor
    ServerMetrics.getMetrics().PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - timeFinishedPrepare);
}

pRequestHelper方法

private void pRequestHelper(Request request) {
    try {
        switch (request.type) {
        case OpCode.createContainer:
        case OpCode.create:
        case OpCode.create2:
            // 创建节点请求封装path、data、acl、flag
            CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
            // zks.getNextZxid()获取递增zxid
            pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
            break;
        case OpCode.createTTL:
            // 创建ttl请求封装path、data、acl、flag、ttl
            CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
            break;
        case OpCode.deleteContainer:
            // 封装path
            DeleteContainerRequest deleteContainerRequest =
                request.readRequestRecord(DeleteContainerRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
            break;
        case OpCode.delete:
            // 删除节点请求封装path、version
            DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
            break;
        case OpCode.setData:
            // 设置节点数据请求封装path、data、version
            SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
            break;
        case OpCode.reconfig:
            // reconfig请求封装joiningServers、leavingServers、newMembers、curConfigId
            ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
            break;
        case OpCode.setACL:
            // 设置acl请求封装path、acl、version
            SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
            break;
        case OpCode.check:
            // check请求封装path、version
            CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
            break;
        case OpCode.multi:
            // 遍历 逐个pRequest2Txn
            // pRequest2Txn(op.getType(), zxid, request, subrequest)
            // 封装MultiTxn
            break;

        // create/close session don't require request record
        case OpCode.createSession:
        case OpCode.closeSession:
            if (!request.isLocalSession()) { // 非本地会话
                pRequest2Txn(request.type, zks.getNextZxid(), request, null);
            }
            break;

        // All the rest don't need to create a Txn - just verify session
        case OpCode.sync:
        // sync,exists,getData,getACL,getChildren,getAllChildrenNumber,getChildren2,ping
        // setWatches,setWatches2,checkWatches,removeWatches,getEphemerals,multiRead,addWatch
        case OpCode.whoAmI:
            zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
            break;
        default:
            break;
        }
    } catch (KeeperException e) {
        // 略
    } catch (Exception e) {
        // 略
    }
}

pRequest2Txn方法流程

代码量大,仅对重点的业务类型做简单分析。

该方法首先会为request设置TxnHeader信息:

if (request.getHdr() == null) {
    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
}

TxnHeader封装事务请求头:

public class TxnHeader implements Record {
  private long clientId; // 会话ID
  private int cxid; // 客户端xid
  private long zxid; // 服务端xid
  private long time;
  private int type; // 操作类型
}

pRequest2Txn - create相关操作

create/create2/createTTL/createContainer操作:

  1. 从flags创建createMode并验证ttl和ephemeral

  2. 验证acl

    zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
    
  3. 生成顺序节点path

    int parentCVersion = parentRecord.stat.getCversion();
    if (createMode.isSequential()) {
        // 形如/users/admin0000000001
        path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
    }
    validatePath(path, request.sessionId);
    
  4. request.setTxn

    int newCversion = parentRecord.stat.getCversion() + 1;
    if (type == OpCode.createContainer) {
        request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
    } else if (type == OpCode.createTTL) {
        request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
    } else {
        request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
    }
    
  5. 获取ephemeralOwner

    TxnHeader hdr = request.getHdr();
    long ephemeralOwner = 0;
    if (createMode.isContainer()) {
        ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
    } else if (createMode.isTTL()) {
        ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
    } else if (createMode.isEphemeral()) {
        ephemeralOwner = request.sessionId;
    }
    
  6. addChangeRecord

    StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
    parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
    parentRecord.childCount++;
    parentRecord.stat.setCversion(newCversion);
    parentRecord.stat.setPzxid(request.getHdr().getZxid());
    parentRecord.precalculatedDigest = precalculateDigest(
            DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
    addChangeRecord(parentRecord); // 维护outstandingChanges集
    ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
    nodeRecord.data = data;
    nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
    setTxnDigest(request, nodeRecord.precalculatedDigest);
    addChangeRecord(nodeRecord); // 维护outstandingChanges集
    

pRequest2Txn - delete操作

  1. 验证acl和version

    checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path);
    
    // stat.version与delete.version需要一致
    
  2. 验证没有子节点,有子节点无法删除

  3. 创建DeleteTxn

    request.setTxn(new DeleteTxn(path));
    
  4. addChangeRecord

    parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
    parentRecord.childCount--;
    parentRecord.stat.setPzxid(request.getHdr().getZxid());
    parentRecord.precalculatedDigest = precalculateDigest(
            DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
    addChangeRecord(parentRecord); // 维护outstandingChanges集
    
    nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
    nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
    setTxnDigest(request, nodeRecord.precalculatedDigest);
    addChangeRecord(nodeRecord); // 维护outstandingChanges集
    

pRequest2Txn - setData操作

  1. 验证acl、获取newVersion

  2. 创建SetDataTxn

    request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
    
  3. addChangeRecord

    nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
    nodeRecord.stat.setVersion(newVersion);
    nodeRecord.stat.setMtime(request.getHdr().getTime());
    nodeRecord.stat.setMzxid(zxid);
    nodeRecord.data = setDataRequest.getData();
    nodeRecord.precalculatedDigest = precalculateDigest(
            DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
    setTxnDigest(request, nodeRecord.precalculatedDigest);
    addChangeRecord(nodeRecord);
    

pRequest2Txn - setACL操作

  1. 验证acl、获取newVersion

  2. 创建SetACLTxn

    request.setTxn(new SetACLTxn(path, listACL, newVersion));
    
  3. addChangeRecord

pRequest2Txn - createSession操作

CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
request.setTxn(createSessionTxn);
// only add the global session tracker but not to ZKDb
zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
zks.setOwner(request.sessionId, request.getOwner());

pRequest2Txn - closeSession操作

long startTime = Time.currentElapsedTime();
synchronized (zks.outstandingChanges) {
    // 获取所有临时节点
    Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
    for (ChangeRecord c : zks.outstandingChanges) {
        if (c.stat == null) {
            // Doing a delete
            es.remove(c.path);
        } else if (c.stat.getEphemeralOwner() == request.sessionId) {
            es.add(c.path);
        }
    }
    for (String path2Delete : es) {
        if (digestEnabled) {
            parentPath = getParentPathAndValidate(path2Delete);
            parentRecord = getRecordForPath(parentPath);
            parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
            parentRecord.stat.setPzxid(request.getHdr().getZxid());
            parentRecord.precalculatedDigest = precalculateDigest(
                    DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
            addChangeRecord(parentRecord);
        }
        nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null);
        nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path2Delete);
        addChangeRecord(nodeRecord);
    }
    if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
        request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
    }
    zks.sessionTracker.setSessionClosing(request.sessionId);
}

ProposalRequestProcessor

processRequest方法

public void processRequest(Request request) throws RequestProcessorException {
    if (request instanceof LearnerSyncRequest) { // 处理sync命令,后续补充sync命令分析
        zks.getLeader().processSync((LearnerSyncRequest) request);
    } else {
        if (shouldForwardToNextProcessor(request)) {
            nextProcessor.processRequest(request); // 调用下游processor(CommitProcessor)
        }
        if (request.getHdr() != null) { // 事务消息需要发proposal、写磁盘
            // We need to sync and get consensus on any transactions
            try {
                zks.getLeader().propose(request); // 给follower发proposal
            } catch (XidRolloverException e) {
                throw new RequestProcessorException(e.getMessage(), e);
            }
            // 该对象的nextProcessor是AckRequestProcessor
            syncProcessor.processRequest(request);
        }
    }
}

发proposal

发起一个proposal并发给所有成员:

public Proposal propose(Request request) throws XidRolloverException {
    // zxid的低32位满了,强制重新选举,生成新一轮epoch和zxid
    if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
        String msg =
            "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
        shutdown(msg);
        throw new XidRolloverException(msg);
    }

    // 序列化
    byte[] data = request.getSerializeData();
    proposalStats.setLastBufferSize(data.length);
    // 封装数据包
    QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);

    // 封装Proposal对象
    Proposal p = new Proposal();
    p.packet = pp;
    p.request = request;

    synchronized (this) {
        p.addQuorumVerifier(self.getQuorumVerifier());

        if (request.getHdr().getType() == OpCode.reconfig) {
            // 此处会把lastSeenQuorumVerifier写入zoo.cfg.dynamic.next文件
            self.setLastSeenQuorumVerifier(request.qv, true);
        }

        if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
            p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }

        lastProposed = p.packet.getZxid();
        // 缓存到outstandingProposals中,processAck时会根据quorum状态确定是否提交
        outstandingProposals.put(lastProposed, p);
        // 给follower发数据
        sendPacket(pp);
    }
    ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
    return p;
}

syncProcessor.processRequest方法

processRequest方法将request放入queuedRequests队列,异步线程消费做业务处理:

  1. 从queuedRequests拉request

  2. 写txnlog

    zks.getZKDatabase().append(si);
    
  3. 滚动txnlog文件、生成snapshot文件

    // 默认当logCount超过了50000或logSize超过2GB时触发
    if (shouldSnapshot()) {
        resetSnapshotStats();
        // 滚动txnlog文件
        zks.getZKDatabase().rollLog();
        // 生成snapshot文件
        if (!snapThreadMutex.tryAcquire()) {
            LOG.warn("Too busy to snap, skipping");
        } else {
            // 异步线程生成snapshot文件
            new ZooKeeperThread("Snapshot Thread") {
                public void run() {
                    try {
                        zks.takeSnapshot();
                    } catch (Exception e) {
                    } finally {
                        snapThreadMutex.release();
                    }
                }
            }.start();
        }
    }
    
  4. 之后会把request传递给nextProcessor(AckRequestProcessor对象)

AckRequestProcessor

public void processRequest(Request request) {
    QuorumPeer self = leader.self;
    if (self != null) {
        request.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
        leader.processAck(self.getMyId(), request.zxid, null);
    }
}

processAck

Keep a count of acks that are received by the leader for a particular proposal.

public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {

    // 略

    if ((zxid & 0xffffffffL) == 0) {
        // We no longer process NEWLEADER ack with this method. However,
        // the learner sends an ack back to the leader after it gets
        // UPTODATE, so we just ignore the message.
        return;
    }

    if (outstandingProposals.size() == 0) {
        return;
    }
    // 说明zxid的数据已经提交
    if (lastCommitted >= zxid) {
        // The proposal has already been committed
        return;
    }
    Proposal p = outstandingProposals.get(zxid);
    if (p == null) {
        return;
    }

    // 略

    p.addAck(sid); // 添加ack

    boolean hasCommitted = tryToCommit(p, zxid, followerAddr);

    // reconfig类型命令的特殊处理,略
}

tryToCommit方法会判断quorum状态,即超过半数ack,如果到了quorum状态:

  1. 从outstandingProposals集移除

  2. 加入到toBeApplied集

  3. 给follower发COMMIT

    public void commit(long zxid) {
        synchronized (this) {
            lastCommitted = zxid;
        }
        QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
        sendPacket(qp); // 发给follower
    }
    
  4. 提交

    zk.commitProcessor.commit(p.request);
    
    // 会进入commitProcessor的committedRequests队列
    

CommitProcessor

processRequest方法

本地写磁盘之后即调用此方法:

  1. 把request提交到queuedRequests队列
  2. 写请求提交到queuedWriteRequests队列
public void processRequest(Request request) {
    request.commitProcQueueStartTime = Time.currentElapsedTime();
    queuedRequests.add(request); // 所有请求
    // If the request will block, add it to the queue of blocking requests
    if (needCommit(request)) { // 写请求
        queuedWriteRequests.add(request);
        numWriteQueuedRequests.incrementAndGet();
    } else {
        numReadQueuedRequests.incrementAndGet();
    }
    wakeup();
}

commit方法

follower对proposal到了quorum状态后,会使用这个方法提交事务,然后会将事务写到ZKDatabase中。

public void commit(Request request) {
    request.commitRecvTime = Time.currentElapsedTime();
    ServerMetrics.getMetrics().COMMITS_QUEUED.add(1);
    committedRequests.add(request); // 进committedRequests队列
    wakeup();
}

run方法

对比queuedRequests、queuedWriteRequests、committedRequests这几个队列,将提交成功的请求或读请求转发给下游的ToBeAppliedRequestProcessor处理器。

ToBeAppliedRequestProcessor

维护toBeApplied列表:清理已提交成功的request数据。

FinalRequestProcessor

处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾。

  1. 执行事务
  2. 区分OpCode执行对应逻辑,发回响应
  3. 使用cnxn把响应返回给客户端

执行事务

if (!request.isThrottled()) {
  rc = applyRequest(request);
}

// ProcessTxnResult rc = zks.processTxn(request);

createSession操作

zks.finishSessionInit(request.cnxn, true);

closeSession操作

给客户端发closeConn数据包:

cnxn.sendCloseSession();

create相关操作

create、create2、createTTL、createContainer操作,创建对应的Response对象。

delete相关操作

略。

setData操作

返回SetDataResponse响应。

setACL操作

返回SetACLResponse响应。

getData操作

private Record handleGetDataRequest(
        Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
    GetDataRequest getDataRequest = (GetDataRequest) request;
    String path = getDataRequest.getPath();
    DataNode n = zks.getZKDatabase().getNode(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    // 检查权限
    zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
    Stat stat = new Stat();
    // 查询数据、addWatcher
    byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
    return new GetDataResponse(b, stat);
}

setWatches相关操作

setWatches、setWatches2操作:

SetWatches2 setWatches = request.readRequestRecord(SetWatches2::new);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
        setWatches.getDataWatches(),
        setWatches.getExistWatches(),
        setWatches.getChildWatches(),
        setWatches.getPersistentWatches(),
        setWatches.getPersistentRecursiveWatches(),
        cnxn);

addWatch操作

AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());

removeWatches操作

RemoveWatchesRequest removeWatches = request.readRequestRecord(RemoveWatchesRequest::new);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);

getChildren相关操作

getChildren、getChildren2操作:

GetChildren2Request getChildren2Request = request.readRequestRecord(GetChildren2Request::new);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);

zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n),
             ZooDefs.Perms.READ, request.authInfo, path, null);
List<String> children = zks.getZKDatabase()
                           .getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);

Follower处理Leader数据

处理器链

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(
        finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
    ((FollowerRequestProcessor) firstProcessor).start();

    syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
    syncProcessor.start();
}

处理器链:

FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor

commitProcessor和syncProcessor处理leader的proposal和commit请求。

processPacket方法

在"zookeeper源码(08)leader、follower和observer"中已经介绍,Follower使用processPacket方法处理来自leader的数据包:

protected void processPacket(QuorumPacket qp) throws Exception {
    switch (qp.getType()) {
    case Leader.PING:
        ping(qp);
        break;
    case Leader.PROPOSAL: // 提案
        ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
        TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
        TxnHeader hdr = logEntry.getHeader();
        Record txn = logEntry.getTxn();
        TxnDigest digest = logEntry.getDigest();
        // 略
        lastQueued = hdr.getZxid();

        // 略

        // 记录log数据
        // 使用syncProcessor持久化log数据,之后给leader发ack
        fzk.logRequest(hdr, txn, digest);
        // 略
        if (om != null) {
            // 略
        }
        break;
    case Leader.COMMIT: // 提交
        ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
        fzk.commit(qp.getZxid()); // 使用commitProcessor提交log
        if (om != null) {
            // 略
        }
        break;
    case Leader.COMMITANDACTIVATE: // Similar to COMMIT, only for a reconfig operation
        // get the new configuration from the request
        Request request = fzk.pendingTxns.element();
        SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
        QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));

        // get new designated leader from (current) leader's message
        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
        long suggestedLeaderId = buffer.getLong();
        final long zxid = qp.getZxid();
        boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid, true);
        // commit (writes the new config to ZK tree (/zookeeper/config)
        fzk.commit(zxid);

        // 略
        break;
    case Leader.UPTODATE:
        // leader告知follower已处于最新状态,可以开始响应客户端
        // 正常情况下不应该再出现该类型请求
        break;
    case Leader.REVALIDATE:
        if (om == null || !om.revalidateLearnerSession(qp)) {
            revalidate(qp);
        }
        break;
    case Leader.SYNC:
        fzk.sync(); // sync命令
        break;
    default:
        LOG.warn("Unknown packet type");
        break;
    }
}

处理PROPOSAL

syncProcessor.processRequest方法

processRequest方法将request放入queuedRequests队列,异步线程消费做业务处理。

在本地持久化之后,调用下游处理器(SendAckRequestProcessor对象)。

SendAckRequestProcessor

public void processRequest(Request si) {
    if (si.type != OpCode.sync) {
        // 确认zxid已持久化
        QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
        try {
            si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
            learner.writePacket(qp, false);
        } catch (IOException e) {
            // learner.sock.close();
        }
    }
}

public void flush() throws IOException {
    try {
        learner.writePacket(null, true);
    } catch (IOException e) {
        // learner.sock.close();
    }
}

处理COMMIT

提交给commitProcessor处理器,该处理器会继续向下游(FinalRequestProcessor)传递。

FinalRequestProcessor

上文已经介绍,此处省略。

Observer处理Leader数据

处理器链

protected void setupRequestProcessors() {
    // We might consider changing the processor behaviour of
    // Observers to, for example, remove the disk sync requirements.
    // Currently, they behave almost exactly the same as followers.
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(
        finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
    ((ObserverRequestProcessor) firstProcessor).start();

    if (syncRequestProcessorEnabled) {
        syncProcessor = new SyncRequestProcessor(this, null);
        syncProcessor.start();
    }
}

processPacket方法

protected void processPacket(QuorumPacket qp) throws Exception {
    TxnLogEntry logEntry;
    TxnHeader hdr;
    TxnDigest digest;
    Record txn;
    switch (qp.getType()) {
    case Leader.PING:
        ping(qp);
        break;
    case Leader.PROPOSAL:
        LOG.warn("Ignoring proposal");
        break;
    case Leader.COMMIT:
        LOG.warn("Ignoring commit");
        break;
    case Leader.UPTODATE:
        LOG.error("Received an UPTODATE message after Observer started");
        break;
    case Leader.REVALIDATE:
        revalidate(qp);
        break;
    case Leader.SYNC:
        ((ObserverZooKeeperServer) zk).sync();
        break;
    case Leader.INFORM:
        ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
        logEntry = SerializeUtils.deserializeTxn(qp.getData());
        hdr = logEntry.getHeader();
        txn = logEntry.getTxn();
        digest = logEntry.getDigest();
        Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
        request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
        request.setTxnDigest(digest);
        ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
        obs.commitRequest(request);
        break;
    case Leader.INFORMANDACTIVATE: // 处理reconfig请求
        // get new designated leader from (current) leader's message
        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
        long suggestedLeaderId = buffer.getLong();

        byte[] remainingdata = new byte[buffer.remaining()];
        buffer.get(remainingdata);
        logEntry = SerializeUtils.deserializeTxn(remainingdata);
        hdr = logEntry.getHeader();
        txn = logEntry.getTxn();
        digest = logEntry.getDigest();
        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8));

        request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
        request.setTxnDigest(digest);
        obs = (ObserverZooKeeperServer) zk;

        boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);

        obs.commitRequest(request);

        if (majorChange) {
            throw new Exception("changes proposed in reconfig");
        }
        break;
    default:
        LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
        break;
    }
}

网关:

一:apisix

doc:https://apisix.apache.org/zh/docs/apisix/getting-started/README/

github:https://github.com/apache/apisix

二:Kong

github:https://github.com/Kong/kong

三:Ocelot

github:https://github.com/ThreeMammals/Ocelot

四:janus

github:https://github.com/motiv-labs/janus

前置条件:docker,yaml

microservice.yaml


version: "3.8"networks:
caseor_bridge:
driver: bridge
ipam:
config:
- subnet: 172.0.10.0/24services:

mysql:
container_name: mysql
image: mysql
privileged:
truecommand:--character-set-server=utf8mb4 --collation-server=utf8mb4_general_ci --max_connections=2000 --max_allowed_packet=64M
environment:
- TZ=Asia/Shanghai- MYSQL_ROOT_PASSWORD=123456volumes:- ./mysql:/var/lib/mysql
ports:
- "3306:3306"healthcheck:
test: [
"CMD", "mysqladmin" ,"ping", "-h", "localhost"]
interval: 5s
timeout: 10s
retries:
10networks:
caseor_bridge:
ipv4_address:
172.0.10.3redis:
image: redis
container_name:
"redis"ports:- "6379:6379"volumes:- ./redis/data:/data- ./redis/conf:/usr/local/etc/redis
networks:
caseor_bridge:
ipv4_address:
172.0.10.4nacos1:
container_name: nacos1
hostname: nacos1
image: nacos
/nacos-server
environment:
- MODE=cluster- PREFER_HOST_MODE=hostname- NACOS_SERVERS=nacos1:8848 nacos2:8848 nacos3:8848 - SPRING_DATASOURCE_PLATFORM=mysql- MYSQL_SERVICE_HOST=172.0.10.3 - MYSQL_SERVICE_PORT=3306 - MYSQL_SERVICE_USER=root- MYSQL_SERVICE_PASSWORD=123456 - MYSQL_SERVICE_DB_NAME=nacos- MYSQL_SERVICE_DB_PARAM=characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true - JVM_XMS=128m- JVM_XMX=128m- JVM_XMN=128m
volumes:
- ./nacos/cluster-logs/nacos1:/home/nacos/logs- ./nacos/init.d:/home/nacos/init.d
ports:
- 8850:8848 - 7850:7848 - 9870:9848 - 9852:9849depends_on:-mysql
networks:
caseor_bridge:
ipv4_address:
172.0.10.5nacos2:
container_name: nacos2
hostname: nacos2
image: nacos
/nacos-server
environment:
- MODE=cluster- PREFER_HOST_MODE=hostname- NACOS_SERVERS=nacos1:8848 nacos2:8848 nacos3:8848 - SPRING_DATASOURCE_PLATFORM=mysql- MYSQL_SERVICE_HOST=172.0.10.3 - MYSQL_SERVICE_PORT=3306 - MYSQL_SERVICE_USER=root- MYSQL_SERVICE_PASSWORD=123456 - MYSQL_SERVICE_DB_NAME=nacos- MYSQL_SERVICE_DB_PARAM=characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true - JVM_XMS=128m- JVM_XMX=128m- JVM_XMN=128m
volumes:
- ./nacos/cluster-logs/nacos2:/home/nacos/logs- ./nacos/init.d:/home/nacos/init.d
ports:
- 8849:8848 - 7849:7848 - 9869:9848 - 9851:9849depends_on:-mysql
networks:
caseor_bridge:
ipv4_address:
172.0.10.6nacos3:
container_name: nacos3
hostname: nacos3
image: nacos
/nacos-server
environment:
- MODE=cluster- PREFER_HOST_MODE=hostname- NACOS_SERVERS=nacos1:8848 nacos2:8848 nacos3:8848 - SPRING_DATASOURCE_PLATFORM=mysql- MYSQL_SERVICE_HOST=172.0.10.3 - MYSQL_SERVICE_PORT=3306 - MYSQL_SERVICE_USER=root- MYSQL_SERVICE_PASSWORD=123456 - MYSQL_SERVICE_DB_NAME=nacos- MYSQL_SERVICE_DB_PARAM=characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true - JVM_XMS=128m- JVM_XMX=128m- JVM_XMN=128m
volumes:
- ./nacos/cluster-logs/nacos3:/home/nacos/logs- ./nacos/init.d:/home/nacos/init.d
ports:
- 8848:8848 - 7848:7848 - 9848:9848 - 9849:9849depends_on:-mysql
networks:
caseor_bridge:
ipv4_address:
172.0.10.7etcd:
container_name: etcd
hostname: etcd
image: bitnami
/etcd
volumes:
- ./etcd/data:/bitnami/etcd
environment:
ETCD_ENABLE_V2:
"true"ALLOW_NONE_AUTHENTICATION:"yes"ETCD_ADVERTISE_CLIENT_URLS:"http://etcd:2379" #https://github.com/apache/apisix-dashboard/issues/2756 需要更换为host域名不能使用0.0.0.0 ETCD_LISTEN_CLIENT_URLS: "http://0.0.0.0:2379"ports:- "2379:2379/tcp"networks:
caseor_bridge:
ipv4_address:
172.0.10.8apisix:
container_name: apisix
hostname: apisix
image: apache
/apisix
volumes:
- ./apisix/log:/usr/local/apisix/logs- ./apisix/conf/config.yaml:/usr/local/apisix/conf/config.yaml:ro
depends_on:
-etcd
ports:
- "9088:9088/tcp" - "9180:9180/tcp" - "127.0.0.1:9090:9090/tcp"networks:
caseor_bridge:
ipv4_address:
172.0.10.9apisix-dashboard:
container_name: apisix
-dashboard
image: apache
/apisix-dashboard
depends_on:
-etcd
ports:
- "9188:9188"volumes:- ./apisix/conf/dashboard.yaml:/usr/local/apisix-dashboard/conf/conf.yaml
networks:
caseor_bridge:
ipv4_address:
172.0.10.10rabbitmq01:
image: rabbitmq
container_name: rabbitmq01
hostname: rabbitmq01
environment:
- TZ=Asia/Shanghai- RABBITMQ_DEFAULT_USER=root #自定义登录账号- RABBITMQ_DEFAULT_PASS=123456#自定义登录密码- RABBITMQ_ERLANG_COOKIE='secret_cookie'ports:- "15672:15672" - "5672:5672"volumes:- ./rabbitmq/mq1/data:/var/lib/rabbitmq- ./rabbitmq/mq1/conf:/etc/rabbitmq
command: bash
-c "sleep 10; rabbitmq-server;"networks:
caseor_bridge:
ipv4_address:
172.0.10.11rabbitmq02:
image: rabbitmq
container_name: rabbitmq02
hostname: rabbitmq02
environment:
- TZ=Asia/Shanghai- RABBITMQ_DEFAULT_USER=root #自定义登录账号- RABBITMQ_DEFAULT_PASS=123456#自定义登录密码- RABBITMQ_ERLANG_COOKIE='secret_cookie'ports:- "15673:15672" - "5673:5672"depends_on:-rabbitmq01
volumes:
- ./rabbitmq/mq2/data:/var/lib/rabbitmq- ./rabbitmq/mq2/conf:/etc/rabbitmq
command: bash
-c "sleep 10; rabbitmq-server;"networks:
caseor_bridge:
ipv4_address:
172.0.10.12rabbitmq03:
image: rabbitmq
container_name: rabbitmq03
hostname: rabbitmq03
environment:
- TZ=Asia/Shanghai- RABBITMQ_DEFAULT_USER=root #自定义登录账号- RABBITMQ_DEFAULT_PASS=123456#自定义登录密码- RABBITMQ_ERLANG_COOKIE='secret_cookie'ports:- "15674:15672" - "5674:5672"depends_on:-rabbitmq01
volumes:
- ./rabbitmq/mq3/data:/var/lib/rabbitmq- ./rabbitmq/mq3/conf:/etc/rabbitmq
command: bash
-c "sleep 10; rabbitmq-server;"networks:
caseor_bridge:
ipv4_address:
172.0.10.13# 开启web管理
# rabbitmq
-plugins enable rabbitmq_management

# # 加入rabbitmq集群

# # rabbit1
# rabbitmqctl stop_app
# rabbitmqctl reset
# rabbitmqctl start_app

# # rabbit2
# rabbitmqctl stop_app
# rabbitmqctl reset
# rabbitmqctl join_cluster
--ram rabbit@rabbit1
# rabbitmqctl start_app

# # rabbit3
# rabbitmqctl stop_app
# rabbitmqctl reset
# rabbitmqctl join_cluster
--ram rabbit@rabbit1
# rabbitmqctl start_app

View Code

目前使用docker启动apisix,需要依赖etcd,

etcd:
container_name: etcd
hostname: etcd
image: bitnami
/etcd
volumes:
- ./etcd/data:/bitnami/etcd
environment:
ETCD_ENABLE_V2:
"true"ALLOW_NONE_AUTHENTICATION:"yes"ETCD_ADVERTISE_CLIENT_URLS:"http://etcd:2379" #https://github.com/apache/apisix-dashboard/issues/2756 需要更换为host域名不能使用0.0.0.0 ETCD_LISTEN_CLIENT_URLS: "http://0.0.0.0:2379"ports:- "2379:2379/tcp"networks:
caseor_bridge:
ipv4_address:
172.0.10.8apisix:
container_name: apisix
hostname: apisix
image: apache
/apisix
volumes:
- ./apisix/log:/usr/local/apisix/logs- ./apisix/conf/config.yaml:/usr/local/apisix/conf/config.yaml:ro
depends_on:
-etcd
ports:
- "9088:9088/tcp" - "9180:9180/tcp" - "127.0.0.1:9090:9090/tcp"networks:
caseor_bridge:
ipv4_address:
172.0.10.9apisix-dashboard:
container_name: apisix
-dashboard
image: apache
/apisix-dashboard
depends_on:
-etcd
ports:
- "9188:9188"volumes:- ./apisix/conf/dashboard.yaml:/usr/local/apisix-dashboard/conf/conf.yaml
networks:
caseor_bridge:
ipv4_address:
172.0.10.10

使用apisix dashboard的时候 会出现一个bug,需要在启动etcd的时候设置ETCD_ADVERTISE_CLIENT_URLS为host域名 而不能使用0.0.0.0

apisix的config yaml


apisix:
node_listen:
- port: 9088enable_ipv6:trueenable_control:truecontrol:
ip:
"[::]"port:9090discovery:
nacos:
host:
- "http://172.0.10.7:8848"deployment:
role: traditional
role_traditional:
config_provider: etcd
admin:
admin_listen:
port:
9180allow_admin:- 0.0.0.0/0admin_key:- name: "admin"key: b848941cd4e1003f2f961a7786ecf75f
role: admin
- name: "viewer"key: dd3bc5bde63f272f554b91336bfcfcb3
role: viewer
etcd:
host:
- http://etcd:2379 prefix: /apisix
timeout:
30#plugin_attr:
# prometheus:
# export_addr:
# ip:
"0.0.0.0"# port:9091#END

View Code

dashboard的config yaml


conf:
listen:
host:
0.0.0.0# `manager api` listening ip or host name
port:
9188# `manager api` listening port
allow_list: # If we don
't set any IP list, then any IP access is allowed by default. - 0.0.0.0/0etcd:
endpoints: # supports defining multiple etcd host addresses
foran etcd cluster- "http://etcd:2379"# yamllint disable rule:comments-indentation
# etcd basic auth info
# username:
"root" # ignore etcd username ifnot enable etcd auth
# password:
"123456" # ignore etcd password ifnot enable etcd auth
mtls:
key_file:
"" # Path of your self-signed client side key
cert_file:
"" # Path of your self-signed client side cert
ca_file:
"" # Path of your self-signed ca cert, the CA is used to sign callers'certificates # prefix: /apisix # apisix config's prefix in etcd, /apisix by default log:
error_log:
level: warn # supports levels, lower to higher: debug, info, warn, error, panic, fatal
file_path:
logs
/error.log # supports relative path, absolute path, standard output
# such
as: logs/error.log, /tmp/logs/error.log, /dev/stdout, /dev/stderr
access_log:
file_path:
logs
/access.log # supports relative path, absolute path, standard output
# such
as: logs/access.log, /tmp/logs/access.log, /dev/stdout, /dev/stderr
# log example:
2020-12-09T16:38:09.039+0800 INFO filter/logging.go:46 /apisix/admin/routes/r1 {"status": 401, "host": "127.0.0.1:9000", "query": "asdfsafd=adf&a=a", "requestId": "3d50ecb8-758c-46d1-af5b-cd9d1c820156", "latency": 0, "remoteIP": "127.0.0.1", "method": "PUT", "errs": []}
authentication:
secret:
secret # secret
forjwt token generation.
# NOTE: Highly recommended to modify
thisvalue to protect `manager api`.
#
if it's default value, when `manager api` start, it will generate a random string to replace it. expire_time: 3600 # jwt token expire time, insecond
users: # yamllint enable rule:comments
-indentation- username: admin # username and password forlogin `manager api`
password: admin
-username: user
password: user

plugins: # plugin list (sorted
inalphabetical order)- api-breaker- authz-keycloak- basic-auth- batch-requests- consumer-restriction-cors
#
- dubbo-proxy-echo
#
- error-log-logger
#
- example-plugin- fault-injection- grpc-transcode- hmac-auth- http-logger- ip-restriction- jwt-auth- kafka-logger- key-auth- limit-conn- limit-count- limit-req
#
- log-rotate
#
- node-status- openid-connect-prometheus- proxy-cache- proxy-mirror- proxy-rewrite-redirect- referer-restriction- request-id- request-validation- response-rewrite- serverless-post-function- serverless-pre-function
#
-skywalking- sls-logger-syslog- tcp-logger- udp-logger- uri-blocker- wolf-rbac-zipkin- server-info- traffic-split

View Code

在文件夹下启动

docker-compose -f microservice.yaml up

本地打开 http://localhost:9188 使用admin ,admin 登录

配置路由信息

这里使用的是nacos作为服务发现,具体查看nacos配置

启动以8083端口的服务

dotnet run --urls=http://*:8083

打开浏览器调试下接口

接下来使用网关请求

再启动以8084端口的服务

nacos中出现了两个实例

继续使用网关请求

在log中的access.log 可以查看到网关请求到不同端口的服务

几个注意点:

1.etcd的ETCD_ADVERTISE_CLIENT_URLS 需要更换为host域名不能使用0.0.0.0

2.apisix的nacos配置在 config.yaml中的discovery

尽量使用host名称

简介

Kettle(也称为 Pentaho Data Integration)是一款开源的 ETL(Extract, Transform, Load)工具,由 Pentaho 开发。ETL 是指从一个数据源(通常是数据库)中提取数据,进行转换,然后加载到目标系统中。Kettle 为数据集成和数据仓库开发提供了强大的工具和功能。

环境

kettle 版本:
7.1

数据库:
sql sever 2022

系统环境:
windows

kettle 连接 sql sever

  1. 打开 Spoon(Kettle 的图形化界面工具): 启动 Kettle 中的 Spoon 工具。

  2. 创建数据库连接: 在 Spoon 中,选择“View” > “Database Connections”以打开数据库连接视图。右键单击空白区域,选择“Create Connection”。

  3. 选择数据库类型: 在弹出的对话框中,选择数据库类型为“Microsoft SQL Server”。

  4. 填写连接信息: 输入以下连接信息:

  • Connection Name: 连接的名称,可以随意取。
  • Database Hostname: SQL Server 数据库的主机名或 IP 地址。
  • Database Port: SQL Server 数据库的端口,默认为 1433。
  • Database Name: 要连接的数据库名称。
  • Username: 数据库的用户名。
  • Password: 数据库用户的密码。
  1. 测试连接: 点击“Test”按钮,确保连接测试成功。
  2. 保存连接: 如果测试成功,点击“OK”按钮保存你的数据库连接。

遇到的问题

  1. 驱动问题
错误连接数据库 [xxx] : org.pentaho.di.core.exception.KettleDatabaseException:
Error occurred while trying to connect to the database

Driver class 'org.gjt.mm.mysql.Driver' could not be found, make sure the 'MySQL' driver (jar file) is installed.
org.gjt.mm.mysql.Driver

org.pentaho.di.core.exception.KettleDatabaseException:
Error occurred while trying to connect to the database
    ...

下载连接 :
JDBC Driver for SQL Server
https://learn.microsoft.com/zh-cn/sql/connect/jdbc/release-notes-for-the-jdbc-driver?view=sql-server-ver16

解决办法:
根据 jdk 版本进行选择。将下载的驱动文件拷贝到 kettle 安装目录的 data-integration/lib 下,重启进行测试

  1. SQL Server 提示不能通过 1433 端口登录

解决办法:

  • 打开 cmd 用 telnet localhost 1433 进行测试,如果无法连接表示 1433 端口还未打开
  • 开始菜单找到

    ,点击运行
  • 对 MSSQLSERVER 的协议(本机数据库的实例名)、客户端协议启用 TCP/IP,再右键到 TCP/IP 协议,将 IP 和端口进行设置并且启用

  • 设置完成后,将 sql sever 服务进行重启

偶然在网上清华大学电子系科协软件部2023暑期培训的内容中发现了这个东西,后面随着了解发现以后学习有关项目时会用到,便写个随笔记录一下这次学习的经历。作为一种序列化协议,与使用文本方式存储的xml、json不同,protobuf使用的是二进制格式进行存储,有利于在类似分布式LInux性能分析监控的项目中构建出整个项目的数据结构。

[零] 序列化与Protobuf

实际传输中,我们会面临各种问题,例如:

  • 要传输的数据量很大,但其实有效的数据却不多 例如,传输下面这样一个数组:

    // 传递一个长整型数组
    long long arr[5] = {1, 2, 3, 4, 1000000000000}
    
  • 要传输的的数据类型非常复杂,难以传递:

    // 传递一个结构体
     struct Bar	{
     	int integer;
     	std::string str;
     	float flt[100];
     };
    

那么我们如何正确而高效地进行这种传递呢?在发送端,我们需要使用一定的规则,将对象转换为一串字节数组,这就是序列化;在接收端,我们再以同样的规则将字节数组还原,这就是反序列化。

我们平时常见的文本序列化协议有 XML和JSON,这两种序列化协议在进行AI语料人工标注时很常见,可读性很好。但我们这里讲的protobuf却是一种
可读性为零
的协议——它使用二进制格式来进行数据的转储。

Google Protocol Buffer(简称 Protobuf)是一种轻便高效的结构化数据存储格式,平台无关、语言无关、可扩展,可用于
通讯协议

数据存储
等领域。

下面来看一个表格,来对比这三种序列化协议的差异。这里就不对XML和JSON做详细介绍了,建议先去学习一下。

XML JSON Protobuf
数据存储 文本 文本 二进制
序列化存储消耗 较大 小(XML的
1/3~1/10
序列化/反序列化速度 快(XML的
20-100
倍)
数据类型 支持广泛的数据类型 支持基本的数据类型 需要通过message定义来指定数据类型
跨平台支持 支持 支持 支持

再来看一个小例子。我们需要传输一个结构体类型的数据,结构体如下:

struct Student {
	int id;
	std::string name;
}

使用XML序列化:

 <student>
   <id>101</id>
   <name>hello</name>
 </student>

使用json序列化:

 {
  "id": 101,
  "name": "hello"
 }

使用Protobuf二进制序列化:

 08 65 12 06 48 65 6C 6C 6F 77

为什么要用 protobuf ? Generated by GPT4.0.

1. 效率和性能:
Protobuf是一种高效的二进制序列化格式,相比于其他文本格式(如JSON和XML),它具有更小的数据体积和更快的序列化/反序列化速度。这使得Protobuf在网络通信和数据存储方面表现出色,特别适合传输大量数据或需要高性能的场景。

2. 跨语言支持:
Protobuf支持多种编程语言,包括C++、Java、Python等。通过定义通用的消息格式和服务接口,不同编程语言的应用程序可以相互通信和交换数据,实现跨平台和跨语言的互操作性。

3. 数据版本控制:
Protobuf支持在数据结构发生变化时进行向前和向后兼容的数据版本控制。通过定义消息的字段编号而非字段名称,可以避免在数据结构演化时出现命名冲突或解析错误。这使得在应用程序升级和数据迁移时更加灵活和可靠。

4. 紧凑的数据格式:
Protobuf使用二进制编码,将数据紧凑地表示为字节序列。相比于文本格式,二进制编码占用更少的存储空间,减少了网络传输的带宽消耗,并提高了数据传输的效率。

5. 自动生成代码:
Protobuf使用定义数据结构的.proto文件,可以自动生成与编程语言相关的代码,包括消息类、序列化和反序列化方法等。这简化了开发过程,减少了手动编写和维护序列化代码的工作量。

6. 可扩展性:
Protobuf支持向已定义的消息结构中添加新字段,而不会破坏已有的解析逻辑。这种可扩展性使得应用程序可以逐步演化和升级,而无需对整个数据结构进行全面修改。

GPT给我们介绍的优点会在后面我们对“如何使用 protobuf ”进行详细学习体现。


[一] Protobuf 安装

官方 C++ && CMake 版本安装文档——
C++ && CMake Protobuf Installation

进行学习时用的是C++,跟着上手搓一搓。注意Protobuf需要使用CMake进行编译安装,所以需要对CMake有一定的了解。

本机使用环境如下:

  • Ubuntu 20.04.6 LTS
  • cmake version 3.16.3
  • git version 2.25.1
  • 内核版本信息:Linux version 5.15.0-94-generic (buildd@lcy02-amd64-118)
  • GNU编译工具:gcc (Ubuntu 9.4.0-1ubuntu1~20.04.2) 9.4.0, GNU ld (GNU Binutils for Ubuntu) 2.34

进行安装前,需要检查是否具备:
CMake
,
Git
, 以及
Abseil
库。(在这里我进行了Abseil的拉取源码自行安装,按照官方文档傻瓜式操作就行,比较简单。)

首先进行 protobuf 源码的获取:,要注意通过GitHub拉取源码时,要使用第三行的 git 命令进行子模块和 configure 文件生成检查。

git clone https://github.com/protocolbuffers/protobuf.git
cd protobuf
git submodule update --init --recursive

然后使用cmake进行构建。我这里没有完全按照官方文档那样直接在源码的根目录进行构建,而是采用了比较常见的“out of source”构建方法,即在源码根目录新建一个build目录用来存放构建文件。注意,protobuf使用了C++14及以上的语言标准,使用CMake编译时可能需要进行手动设定:

mkdir build && cd build
cmake .. -DCMAKE_CXX_STANDARD=14
# 注意线程数量与自己的机器线程数适配,不然编译时会爆内存
cmake --build . --parallel 4

(过程中碰到了virtual box虚拟机硬盘扩容的问题,搞了好久。。最后直接用GParted的GUI来搞定了)

接下来是进行测试:

 ctest --verbose

测试完成后就可以进行安装了:

 sudo cmake --install .

大功告成!!以上操作会将protoc可执行文件以及与 protobuf 相关的头文件、库安装至本机,在终端输入protoc,若输出提示信息则表示安装成功。


[二] 如何使用 Protobuf

官方英文学习文档:
Protocol Buffers Documentation

我们接下来将围绕一个“地址簿”的应用例子。每个在地址簿上的人物都有名字、ID、电子邮箱和手机号码四个属性。

那么我们怎样去将这些结构化的数据进行序列化和反序列化呢?直接采用原始的raw二进制数据传输?太过fragile且扩展性太低;采用点对点定制的编码string传输?这种一次性的方法往往只在简单的数据传输中有效;采用大名鼎鼎的 XML ?空间消耗太大且 XML DOM树太过复杂了……

所以我们使用
protobuf

protobuf
是为传输数据服务的,它为我们提供了用来定义消息格式的语言工具,我们可以使用
protobuf
语言的语法来编写一个
.proto
文件,并围绕这个文件展开代码的编写和数据的传输。在这里我们学习C++方面的使用,分为三个步骤逐步介绍。

2.1 编写.proto文件

我们需要先从一个
.proto
文件开始。我们为每一个想要进行序列化的结构化数据都创建一个
message
(其实
message
也是一种类似
struct
的结构体语法格式),并在
message
里面声明每一个field的名字和类型。

我们从例子入手去学习如何编写一个这样的文件。下面是地址簿应用的
.proto
文件示例:

// [START declaration]
syntax = "proto3";
package tutorial;
import "google/protobuf/timestamp.proto";
// [END declaration]

// [START messages]
message Person {
  string name = 1;
  int32 id = 2;  // Unique ID number for this person.
  string email = 3;

  enum PhoneType {
    MOBILE = 0;
    HOME = 1;
    WORK = 2;
  }

  message PhoneNumber {
    string number = 1;
    PhoneType type = 2;
  }

  repeated PhoneNumber phones = 4;
  // 引入在另一个.proto定义的消息类型
  google.protobuf.Timestamp last_updated = 5; 
}

// Our address book file is just one of these.
message AddressBook {
  repeated Person people = 1; // repeated类型字段(数组)
}
// [END messages]

2.1.1 语法

protobuf
有两个主要版本,分别为
proto2

proto3
,两套语法不完全兼容。我们可以使用
syntax
关键字指 定
protobuf
遵循的语法标准,如例子中使用的就是 proto3.

我们在这只记录一些简单但必要的proto3语法,详细还得查官方文档,这里只是做一个简单的备忘录的作用。proto2 的例子可以看这位博主的博文:
Protobuf学习 - 入门
,但我也会在下面列出的东东简略提到一下两个版本的差异。

  • syntax
    关键字必须为第一行非空非注释的行,用于指定protobuf版本,如果不指定则后面编译时会默认你为 proto2 。
  • package
    关键字为消息类型提供了命名空间的分隔,避免命名冲突。
    在这个例子中,所有的消息类型都属于名为 tutorial 的命名空间。
  • import
    关键字用来引入外部的
    .proto
    文件。(只能import当前目录及子目录?)
  • message
    是一个类似
    struct
    的关键字,用来定义程序要传递的结构化消息类型,每一个字段都有自己的数据类型和字段名。
  • 定义字段时,必须对字段赋值
    标识号
    (即每个数据字段后的
    = 1
    ,
    = 2
    ……),并且有以下限制:
    • 标识号范围为 1 到 536,870,911 (0x1至0x3FFFFFFF);
    • 每个标识号必须独一无二;
    • 19000 到 19999 的标识号是预留值,一旦使用编译时就会报warning;
    • 一旦定义好的消息类型开始使用,标识号就不能再更改,因为标识号 “it identifies the field in the
      message wire format
      .”
    • 为频繁访问的字段使用 1 - 15 的标识号,以节省编码空间消耗。
  • enum
    关键字定义枚举类型。每个枚举定义都必须包含一个映射到 0 的常量作为枚举的默认值,但后续值不再自动递增,每个值需要显式指定。
    如例子中从 MOBILE 开始。
  • 简单数据类型

    bool
    ,
    int32
    ,
    float
    ,
    double
    , 和
    string
    . 除此之外,proto语法支持
    嵌套
    ,即用自己定义的
    message
    来作为数据类型。
    如上面例子中, Person 消息类型中嵌套了 PhoneNumber 消息类型,而 AddressBook 消息类型中又嵌套了 Person 消息类型。
    • 数据类型与各个语言中的类型对应见文档:
      Scalar Value Types
    • 字段的默认值在proto3中不能手动指定,只能由系统根据字段类型决定(通常为零值或空值),同样见上面给出的文档链接。
  • 前缀标签(字段规则)
    :proto3取消了proto2的
    required
    规则,只剩两种:singular(单数,相当于proto2的optional)和 repeated(重复)。
    • optional
      :有点像正则表达式中的
      ?
      ,表明该字段可以有0个或1个值,若不设置则为默认值,且编码时不会被编进去。
    • repeated
      :表明该字段可以重复任意多次(包括0次),即数组,顺序有序。
      如例子中的 phones 数组。
  • 注释
    :采用 C/C++ 注释格式。

2.1.2 默认命名规则

  • proto2中,默认情况下,字段、消息和枚举值的命名采用驼峰命名法(如
    myField

    MyMessage

    MY_ENUM
    )。
  • proto3中,默认情况下,字段、消息和枚举值的命名采用下划线命名法(如
    my_field

    My_Message

    MY_ENUM
    )。

2.1.3 高级语法

Any

Any 类型是一种特殊的消息类型,它允许在没有
.proto
定义的情况下,可以将任意类型的数据包装成 Any 消息,并将其嵌入到其他消息类型中,
这样可以将不同类型的数据存储在同一个字段中

Any 消息类型的定义如下:

message Any {
  string type_url = 1;
  bytes value = 2;
}
  • type_url
    :用于存储被包装数据的类型信息,
    唯一
    地标识了被封装的消息的类型。它是一个表示数据类型的
    URL字符串
    ,通常遵循 "
    type.googleapis.com/_packagename_._messagename_
    " 的格式,例如 "com.example.myapp.MyMessage"(即消息类型的全限定名,前面加上一个包名或域名的前缀)。通过类型URL,接收方可以识别出如何解析和处理被包装的数据。
  • value
    :用于存储被包装的数据。它是一个
    字节数组
    ,可以存储任意类型的数据,例如序列化的消息或其他二进制数据。

我们来看一个 Any 消息类型使用的例子。假设我们现在有一个电子商务平台,需要存储用户的订单信息,但每个订单的详细信息结构可能因不同商家自定义而不同。这时候我们可以使用 Any 消息类型来存储订单的详细信息。

首先定义一个通用的订单信息类型:

syntax = "proto3";
// 要使用 Any 消息类型,需要先import对应的any.proto
import "google/protobuf/any.proto";

message Order {
  string order_id = 1;
  google.protobuf.Any details = 2;
}

接下来,我们定义两个具体的订单详细信息类型:
ProductOrder

ServiceOrder
。它们使用不同的消息类型来表示不同的订单信息:

// product.proto
message ProductOrder {
  string product_id = 1;
  int32 quantity = 2;
  // 其他与产品订单相关的字段
}

// service.proto
message ServiceOrder {
  string service_id = 1;
  // 其他与服务订单相关的字段
}

后面经过一系列的程序运行,我们可以得到一条这样的
Order
订单信息:

Order {
  order_id: "000001"
  details: Any {
    type_url: "type.googleapis.com/Product.ProductOrder"
    value: <可解析为ProductOrder类型的二进制数据>
  }
}

在这个例子中,我们将产品订单的详细信息序列化为字节数组,并将其赋值给 Any 消息类型的
value
字段。同时,我们指定了类型URL为 "
type.googleapis.com/Product.ProductOrder
" ,以便接收方能够正确解析和处理这个订单的详细信息。

Oneof

oneof
类型就像 C/C++ 中的
Union
, 它包含的多个字段共享一段内存。protobuf 提供了
case()

WhichOneof()
两个API,用以检查
oneof
类型中哪个字段被赋值了。

message SampleMessage {
  oneof test_oneof {
    string name = 4;
    SubMessage sub_message = 9;
  }
}

对于两个proto版本之间的差异,proto2 支持
oneof
语法,用于指定一组互斥的字段,只能设置其中一个字段的值;而 proto3 仍然支持
oneof
语法,但是在proto3中,
oneof
字段可以为空,也就是可以没有任何字段被设置。

有一些需要注意的特点:

  • oneof
    类型里面可以嵌套
    除了
    map

    repeated
    的所有数据类型。如果你有着在
    oneof
    中加入
    repeated
    类型的需求,则可以用一个包含
    repeated
    类型的
    message
    来代替。

  • 注意最后一次赋值会像 Union 那样覆盖之前的赋值(清空 oneof 中其它的字段)。

  • oneof
    类型不能通过
    repeated
    修饰。

  • 在使用 C++ 进行编码时,特别注意内存管理问题:在给
    oneof
    中的字段赋值时,可能会导致旧值被覆盖,并且如果没有适当地释放内存,可能会导致内存泄漏或非法内存访问。

Map

map
字段可以定义关联映射类型,即键值对类型。其定义语法如下:

map<key_type, value_type> map_field = N;

其中
key_type
可以为任意整型或string类型(注意枚举类型并不归属在内),
value_type
可以为任何除了
map
的数据类型。

如果你想定义一个以string类型为键,value为
Project
消息类型的
map
映射,则如下:

map<string, Project> projects = 3;

很简单吧!
map
也有一些特点:

  • map
    类型不能通过
    repeated
    修饰。
  • 如果为
    map
    字段提供键但没有值,在序列化该字段时,其行为取决于编程语言。在C++、Java、Kotlin和Python中,将序列化该类型的默认值,而在其他语言中,则不会序列化任何内容。

不支持 map 类型的 protobuf 实现版本 可以这样手动实现对 map 的支持:

message MapFieldEntry {
  key_type key = 1;
  value_type value = 2;
}
repeated MapFieldEntry map_field = N;

除此之外,protobuf 中的高级语法还有很多,在这不做展开,可以去翻阅官方文档。

2.2 编译protobuf定义

在上一个步骤中,我们已经写好了一个
.proto
文件,接下来要做的就是根据这个
.proto
去生成一系列用于读写地址簿数据的类。

在这里要使用 protobuf 的编译器:
protoc

如果本机环境中没有该编译器,在
这里
下载,按照 README 操作。

protoc
运行时,若无指定路径,则当前工作路径即为其默认路径;最简单的格式如下:

protoc -I=$SRC_DIR --cpp_out=$DST_DIR $SRC_DIR/xxx.proto

这条命令运行后,
protoc
会编译生成两个文件:
xxx.pb.h

xxx.pb.cc

2.3 使用 C++ protobuf API 读写消息

经过
protoc
编译后,我们就可以使用生成的类以及protobuf提供的API来进行愉快的程序编写了。

2.3.1 生成的类与 API

我们先来看生成的类要怎么用。
protoc
采用了面向对象的思想,把转化的 C++ 类的声明和实现放到生成的两个文件中,这两个文件是很大的,硬读的话肯定不太行。下面是一些
protoc
在编译过程中的行为要点,简要分析了这些类和成员函数是个怎样的情况。

  • 每个
    message
    都对应生成了一个类,每个字段都是类的成员变量;

  • 每个字段都有自己的 accessors,如对于
    .proto
    例子中
    Person
    消息类型的
    id
    ,
    email
    , 和
    phones
    字段,生成的成员函数如下:

    // id
    inline bool has_id() const;
    inline void clear_id();
    inline int32_t id() const;
    inline void set_id(int32_t value);
    
    // email
    inline bool has_email() const;
    inline void clear_email();
    inline const ::std::string& email() const;
    inline void set_email(const ::std::string& value);
    inline void set_email(const char* value);
    inline ::std::string* mutable_email();
    
    // phones
    inline int phones_size() const;
    inline void clear_phones();
    inline const ::google::protobuf::RepeatedPtrField< ::tutorial::Person_PhoneNumber >& phones() const;
    inline ::google::protobuf::RepeatedPtrField< ::tutorial::Person_PhoneNumber >* mutable_phones();
    inline const ::tutorial::Person_PhoneNumber& phones(int index) const;
    inline ::tutorial::Person_PhoneNumber* mutable_phones(int index);
    inline ::tutorial::Person_PhoneNumber* add_phones();
    

    可以看到有
    has_field

    set_field

    clear_field
    这些成员函数,并且对于不同数据类型的字段,成员函数也会有增加/减少。如
    string
    类型的字段会有一个
    mutable_field
    的方法,用于直接获取指向字段存储字符串的指针。

  • repeated
    类型的字段没有
    set_field
    方法。它可以利用
    field_size
    方法来检查当前元素个数;利用元素下标获取/修改特定元素;利用
    add_field
    方法添加新的元素,
    该方法会创建一个未经设值的类型成员,并返回它的指针

  • .proto
    中定义的枚举类型前加上外层的
    message
    名作为命名空间,如例子中的枚举类型生成为
    Person::PhoneType
    ,值为
    Person::MOBILE

    Person::HOME

    Person::WORK

  • 对于嵌套在
    message
    里面的 子
    message
    ,如例子中的
    PhoneNumber
    ,实际在代码文件中它是与类
    Person
    分开定义的,类名为
    Person_PhoneNumber
    (C++没有嵌套类定义,这里也没有用继承什么的),只不过
    Person
    定义域里面使用了它的别名:

    using PhoneNumber = Person_PhoneNumber;	// 使得看起来就像一个 nested class
    

对于整个
message
的数据,也有相应的成员函数来对其进行检查/操作。这些函数与 I/O 函数 共同构建起了 父类
Message
的接口。如
Person
类中:

  • bool IsInitialized() const;
    : 检查是否所有字段都已经赋值;

  • string DebugString() const;
    : 字面意义,返回可读性高的
    message
    字符串,用于debug;

  • void CopyFrom(const Person& from);
    : 就是复制赋值函数,覆盖现有的数据。

  • void Clear();
    : 全部字段值归零。

    更多信息见文档:
    complete API documentation for
    Message

最后当然是类中使用 protobuf binary 格式进行 message 读写的成员函数:

  • bool SerializeToString(string* output) const;
    : 将 message 序列化到一个string中,注意string存储的是序列化后的二进制数据,而不是文本。

  • bool ParseFromString(const string& data);
    : 解析函数,功能与上面函数相反。

  • bool SerializeToOstream(ostream* output) const;
    : 序列化 message 数据后直接输出到指定的 ostream。

  • bool ParseFromIstream(istream* input);
    : 以指定的 istream 作为二进制数据输入,进行反序列化解析。

    除此提供的更多序列化/反序列化函数,如与字节流配对的
    SerializeToArray

    ParseFromArray
    ,详细见
    文档

2.3.2 写入 message

我们现在的第一个需求是能够将个人信息写入到地址簿中,这个过程包括信息输入、序列化、写入地址簿数据存储文件。

这里是官方的代码:
add_person.cc

基本数据操作上面API讲得也差不多了,看一下代码里怎样运用即可。这里还有几点值得注意的:

  • 善用 宏
    GOOGLE_PROTOBUF_VERIFY_VERSION
    ,来检查兼容性问题;

    int main(int argc, char* argv[]) {
      // Verify that the version of the library that we linked against is
      // compatible with the version of the headers we compiled against.
      GOOGLE_PROTOBUF_VERIFY_VERSION;
    
  • 打开 fstream 时可以见到打开的方式为
    ios::in | ios::binary
    ,反序列化解析时是通过
    ParseFromIstream()
    直接将文件数据解析到
    Address
    类中;如下:

        // Read the existing address book.
        fstream input(argv[1], ios::in | ios::binary);
        if (!input) {
          cout << argv[1] << ": File not found.  Creating a new file." << endl;
        } else if (!address_book.ParseFromIstream(&input)) {
          cerr << "Failed to parse address book." << endl;
          return -1;
        }
    


    Address
    类写回文件中同理,不过输出的 fstream 打开方式为
    ios::out | ios::trunc | ios::binary

  • 最后使用
    ShutdownProtobufLibrary()
    来结束程序,不是很必要但是一个良好的习惯(特别对于C++):

      // Optional:  Delete all global objects allocated by libprotobuf.
      google::protobuf::ShutdownProtobufLibrary();
    

2.3.3 读取 message

我们的第二个需求就是将地址簿中的所有人信息列举出来。

这里是官方的代码:
list_people.cc

代码中可以看到对
repeated
类型数据的访问,确实是用下标来确认具体位置:

void ListPeople(const tutorial::AddressBook& address_book) {
	// select the person by index
    for (int i = 0; i < address_book.people_size(); i++) {
        const tutorial::Person& person = address_book.people(i);

    // ...

    // select the phone number by index
	for (int j = 0; j < person.phones_size(); j++) {
      const tutorial::Person::PhoneNumber& phone_number = person.phones(j);

      switch (phone_number.type()) {
		// ...
      }
      cout << phone_number.number() << endl;
    }
    if (person.has_last_updated()) {
      cout << "  Updated: " << TimeUtil::ToString(person.last_updated()) << endl;
    }
  }
}

其余注意地方基本和写入 message 时一样。

2.3.4 编译生成整个程序

现在我们有了
.proto
生成的
.h

.cc
类文件,还有了两个源程序代码文件,接下来要做的就是将它们编译链接了。

如果我们直接进行 g++ 编译:

g++ add_person.cc address.pb.cc

报大错!正确编译命令应该要加上包含的头文件路径以及需要链接的库:

g++ --std=c++14 main.cc xxx.pb.cc -I $INCLUDE_PATH -L $LIB_PATH

这里有很重要的点:

  1. C++ 版本必须在
    cpp14 及以上
    ,这一点在安装 protobuf 也很明确了;
  2. 对于需要包含的头文件位置和需要链接的库文件,一个个去尝试属实麻烦。
    用 pkg-config 帮忙查找!!

不妨看看官方给出的 Makefile 文件中是怎么做的:

c++ -std=c++14 add_person.cc addressbook.pb.cc -o add_person_cpp `pkg-config --cflags --libs protobuf`

它使用
pkg-config
将要链接的东西都链接进来了。(注意这个不是引号,而是 "
`
" 号)

写入程序运行与存储的文件内容展示:

输出程序运行与结果展示:


[三] 本篇结语

OK!洋洋洒洒写了很多,但都是一些自己入门学习 protobuf 的心得,学习这些知识时看官方文档真的很必要! :)

下一篇再打算学习一下为什么 protobuf 这么好,它里面到底有什么样的编码原理,不能成为只会调 API 的家伙哈哈……以及还有 gRPC 这种东西要学习呢……

参考资料:

(全文完)