2024年8月


这个并不是一个通用性编程问题,只属于在Java领域内专有问题。

要做好心理准备,这是一个复杂类的问题,要解答这个问题,需要梳理清楚两个函数和其它类之间的关系,并且它们之间的关系有点交织。

equals用法

在 Object 类中还包含了 equals() 方法:

public boolean equals(Object obj) {
    return (this == obj);
}

说明:

  • == 用于比较
    变量
    所对应的内存中所存储的
    数值
    是否相同,要比较
    两个基本类型的数据(注意是基本类型)

    两个引用变量
    是否相等,只能用==操作符。

hashCode用法

在 Object 类中还包含了 hashCode() 方法:

public native int hashCode();

请回答,为什么 Object 类需要一个 hashCode() 方法呢?

在 Java 中,hashCode() 方法的主要作用就是为了配合哈希表使用的。

哈希表(Hash Table),也叫散列表,是一种可以通过关键码值(key-value)直接访问的数据结构,它最大的特点就是可以快速实现查找、插入和删除。其中用到的算法叫做哈希,就是把任意长度的输入,变换成固定长度的输出,该输出就是哈希值。像 MD5、SHA1 都用的是哈希算法。

像 Java 中的 HashSet、Hashtable、HashMap 都是基于哈希表的具体实现。其中的 HashMap 就是最典型的代表。

大家想一下,
如果没有哈希表,但又需要这样一个数据结构,它里面存放的数据是不允许重复的,它是怎么实现的?

  • √ 要不使用 equals() 方法进行逐个比较?这种方案当然是可行的。
    但如果数据量特别特别大,采用 equals() 方法进行逐个对比的效率肯定很低很低,总结:能解决,但效率不高。
  • √√ 最好的解决方案就是哈希表。总结:不光能解决,还效率不高。

案例说明:
拿 HashMap 来说吧,当我们要在它里面添加对象时,先调用这个对象的 hashCode() 方法,得到对应的哈希值,然后将哈希值和对象一起放到 HashMap 中。当我们要再添加一个新的对象时:

  1. 获取对象的哈希值;
  2. 和之前已经存在的哈希值进行比较,如果不相等,直接存进去;
  3. 如果有相等的,再调用 equals() 方法进行对象之间的比较,如果相等,不存了;
  4. 如果不等,说明哈希冲突了,增加一个链表,存放新的对象;
  5. 如果链表的长度大于 8,转为红黑树来处理。

就这么一套下来,调用 equals() 方法的频率就大大降低了。也就是说,只要哈希算法足够的高效,把发生哈希冲突的频率降到最低,哈希表的效率就特别的高。

总结

== 用于比较变量所对应的内存中所存储的数值是否相同,要比较两个基本类型的数据(注意是基本类型)或两个 引用变量是否相等,只能用==操作符。

equals 比较的是
值和地址
,如果没有重写equals方法,其作用与==相同;
在String类中,重写了equals方法,比较的是

是否相等;

hashCode用于散列数据结构中的
hash值计算

equals两个对象相等,那hashcode一定相等,hashcode相等,不一定是同一个对象(hash冲突现象);
hashCode 一般与 equals 一起使用,两个对象作「相等」比较时,因判断 hashCode 是判断 equals 的
先决条件
.

为什么一个类中需要两个比较方法

因为重写的 equals() 里一般比较的比较全面比较复杂,这样效率就比较低,而利用hashCode()进行对比,则只要生成一个 hash 值进行比较就可以了,效率很高,那么 hashCode() 既然效率这么高为什么还要 equals() 呢?

  • 因为 hashCode() 并不是完全可靠,有时候不同的对象他们生成的 hashcode 也会一样(hash冲突),所以 hashCode()只能说是大部分时候可靠,并不是绝对可靠。

  • equals() 相等的两个对象他们的 hashCode() 肯定相等,也就是用 equals() 对比是绝对可靠的。

为什么重写 equals 方法时必须同时重写 hashCode 方法?

可以先看看Java这B 给出的一些建议,就是事前就规定好了...

public class Object {

    /**
     * Returns a hash code value for the object. This method is
     * supported for the benefit of hash tables such as those provided by
     * `java.util.HashMap`.
     *
     * The general contract of `hashCode` is:
     *
     * a) Whenever it is invoked on the same object more than once during
     *    an execution of a Java application, the `hashCode` method must
     *    consistently return the same integer, provided no information
     *    used in `equals` comparisons on the object is modified.
     *    This integer need not remain consistent from one execution of an
     *    application to another execution of the same application.
     *
     * b) If two objects are equal according to the `equals(Object)` method,
     *    then calling the `hashCode` method on each of the two objects must
     *    produce the same integer result.
     *
     * c) It is not required that if two objects are unequal according to the
     *    `equals(Object)` method, then calling the `hashCode` method on each of
     *    the two objects must produce distinct integer results.
     *    However, the programmer should be aware that producing distinct integer
     *    results for unequal objects may improve the performance of hash tables.
     */
    @IntrinsicCandidate
    public native int hashCode();

    /**
     * Indicates whether some other object is "equal to" this one.
     *
     * @apiNote
     * It is generally necessary to override the `hashCode` method whenever this
     * method is overridden, so as to maintain the general contract for the `hashCode`
     * method,  which states that equal objects must have equal hash codes.
     */
    public boolean equals(Object obj) {
        return (this == obj);
    }
}

上面介绍了 hashCode 方法注释上列出的三个通用约定,equals 方法的注释上也有这么一句话:「每当重写 equals 方法时,都需要重写 hashCode 方法,这样才没有破坏 hashCode 方法的通用约定,即:两个对象为 Equal 的话(调用 equals 方法为 true), 那么这两个对象分别调用 hashCode 方法也需要返回相同的哈希值」。

所以只重写 equals 方法不重写 hashCode 方法的话,可能会造成两个对象调用 equals 方法为 true,而 hashCode 值不同的情形,这样即可能造成异常的行为。

这个情形是什么?
两个内容相等的Person对象p1和p2的hashCode()不同,是因为在Person类中没有重写hashCode()方法,它们使用的是Object类继承下来的hashCode()方法的默认实现。

在Object类中,hashCode()方法的默认实现是将对象的内存地址值作为哈希码返回。

总结:
就是一个约定而已。也是为了逻辑的自洽。

Reference

Java hashCode方法深入解析
https://www.javabetter.cn/basic-extra-meal/hashcode.html

Java:为什么重写 equals 方法时必须同时重写 hashCode 方法?
https://leileiluoluo.com/posts/always-override-hashcode-when-override-equals.html

原文:https://last9.io/blog/convert-opentelemetry-traces-to-metrics-using-spanconnector/

如果您已经实施了跟踪但缺乏强大的指标功能怎么办? SpanConnector 是一个通过将跟踪数据转换为可操作指标来弥补这一差距的工具。这篇文章详细介绍了 SpanConnector 的工作原理,提供了有关其配置和实现的指南。

OpenTelemetry 的一个常见问题是语言支持跟踪检测(Trace埋点),但指标方面正在进行中或尚不可用。在这种情况下,您可以使用 SpanConnector 将跟踪生成的跨度(Span)转换为指标。

什么是 Connector?

SpanConnector 是 OpenTelemetry Collector 中的一个组件,允许您从跨度(Span)数据中获取指标。当您拥有强大的跟踪功能但您的语言或框架缺乏原生指标支持时,这尤其有用。

将跟踪(Trace)转换为指标可以提供有关系统性能和运行状况的宝贵见解,而无需单独的插桩埋点。这种统一的方法创建了更全面的可观测性视野,并减少了管理两个不同埋点系统的开销。

SpanMetrics 连接器的工作原理

SpanMetrics 相关配置

聚合跨度(Span)数据中的请求(Request)、错误(Error)和持续时间(Duration) (R.E.D) OpenTelemetry 指标。

connectors:
  spanmetrics:
    histogram:
      explicit:
        buckets: [100us, 1ms, 2ms, 6ms, 10ms, 100ms, 250ms]
    dimensions:
      - name: http.method
        default: GET
      - name: http.status_code
      - name: host.name
    exemplars:
      enabled: true
    dimensions_cache_size: 1000
    aggregation_temporality: "AGGREGATION_TEMPORALITY_CUMULATIVE"
    metrics_flush_interval: 15s
    metrics_expiration: 5m
    events:
      enabled: true
      dimensions:
        - name: exception.type
        - name: exception.message
    resource_metrics_key_attributes:
      - service.name
      - telemetry.sdk.language
      - telemetry.sdk.name

了解 SpanMetrics 配置

让我们分解一下此配置的关键部分:

  • Histogram Buckets:
    histogram.explicit.buckets
    字段定义指标的延迟桶。这使您可以查看请求持续时间的分布。
  • Dimensions:这些是 Span 中的属性,将用于为指标创建标签。在此示例中,我们使用
    http.method

    http.status_code

    host.name
  • Exemplars:启用后,您可以将指标链接回特定的跟踪示例,从而为您的指标提供更多上下文。
  • Dimensions Cache:设​​置要存储的“维度组合”的最大数量。它有助于管理内存使用情况。
  • Aggregation Temporality:这决定了指标如何随时间聚合。 “CUMULATIVE” 意味着指标从流程开始就累积。
  • Metrics Flush Interval:从连接器生成指标的频率。
  • Metrics Expiration:这定义了指标在未更新时被丢弃之前在内存中保留的时间。
  • Events:启用后,您可以从跨度事件(例如异常)创建指标。
  • Resource Metrics Key Attributes:与跨度(Span)关联的资源中的这些属性将作为标签添加到所有生成的指标中。

使用 SpanMetrics 连接器的好处

  • 统一的可观察性:将跟踪转换为指标可以让您更全面地了解系统的性能,而无需单独的指标检测。
  • 一致性:确保您的指标与来自同一来源的跟踪完美一致。
  • 减少开销:消除了应用程序代码中双重检测(跟踪和指标)的需要。
  • 灵活性:您可以根据您的需求和跨度属性生成自定义指标。

实施 SpanMetrics 的指南

1. 设置 OpenTelemetry 跟踪:首先,确保您的应用程序已正确检测以进行跟踪。

下面是一个使用 Python 的简单示例:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    ConsoleSpanExporter,
    BatchSpanProcessor,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# Set up the tracer provider
trace.set_tracer_provider(TracerProvider())

# Create an OTLP exporter
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)

# Create a BatchSpanProcessor and add the exporter to it
span_processor = BatchSpanProcessor(otlp_exporter)

# Add the span processor to the tracer provider
trace.get_tracer_provider().add_span_processor(span_processor)

# Get a tracer
tracer = trace.get_tracer(__name__)

# Use the tracer to create spans in your code
with tracer.start_as_current_span("main"):
    # Your application code here
    pass

2. 安装和配置 OpenTelemetry Collector

a. 下载 OpenTelemetry 收集器:

curl -OL https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.81.0/otelcol-contrib_0.81.0_linux_amd64.tar.gz
tar xzf otelcol-contrib_0.81.0_linux_amd64.tar.gz

b. 创建名为otel-collector-config.yaml的配置文件

receivers:
  otlp:
    protocols:
      grpc:
      http:

processors:
  batch:

connectors:
  spanmetrics:
    histogram:
      explicit:
        buckets: [100us, 1ms, 2ms, 6ms, 10ms, 100ms, 250ms]
    dimensions:
      - name: http.method
        default: GET
      - name: http.status_code
      - name: host.name
    exemplars:
      enabled: true
    dimensions_cache_size: 1000
    aggregation_temporality: "AGGREGATION_TEMPORALITY_CUMULATIVE"
    metrics_flush_interval: 15s
    metrics_expiration: 5m
    events:
      enabled: true
      dimensions:
        - name: exception.type
        - name: exception.message
    resource_metrics_key_attributes:
      - service.name
      - telemetry.sdk.language
      - telemetry.sdk.name

exporters:
  prometheus:
    endpoint: "0.0.0.0:8889"
  logging:
    verbosity: detailed

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [logging, spanmetrics]
    metrics:
      receivers: [spanmetrics]
      exporters: [logging, prometheus]

3. 启动 OpenTelemetry 收集器

使用您的配置运行收集器:

./otelcol-contrib --config otel-collector-config.yaml

4. 将跟踪发送到收集器

修改您的应用程序以将跟踪发送到收集器。如果您使用步骤1中的 Python 示例,则您已设置为将跟踪发送到
http://localhost:4317

5. 查看生成的指标

Otel Collector 会在
http://localhost:8889/metrics
公开指标,您可以通过 curl 查看原始指标:

curl http://localhost:8889/metrics

为了获得更用户友好的视图,您可以设置 Prometheus 来抓取这些指标,创建 prometheus.yml 文件:

global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'otel-collector'
    static_configs:
      - targets: ['localhost:8889']

启动 Prometheus(假设您已经下载了它):

./prometheus --config.file=prometheus.yml

您现在可以通过
http://localhost:9090
访问 Prometheus UI 来查询和可视化您的指标。

小结

SpanConnector 是 OpenTelemetry 生态系统中的一个强大工具,它弥合了跟踪和指标之间的 gap。通过利用现有跟踪数据生成有意义的指标,您可以增强可观察性策略,而无需额外的埋点开销。这种方法对于过渡到 OpenTelemetry 特别有价值,对于那些指标埋点不太完备的语言,也很有价值。

大家好,我是Edison。

全球电子制造主要集中在中国,面向未来工业4.0、中国制造2025的战略转型升级,互联互通是基础、数据是核心,如何从用户角度来定义设备加工数据的内容完整性、有效性、可扩展性将是工厂通讯连接交换的工作重点。

IPC-CFX是什么?

首先,解释下这两个缩写的意思:IPC是国际电子工业联接协会,CFX是互联工厂数据交换。

IPC-CFX是一个开放式的国际标准,它简化了机器到机器之间的通信。IPC-CFX基于IPC-2591,是全球互联工厂
数据交换标准统一定义
数据语言格式,我们也可以将其理解为是一个协议,它为制造商、设备、硬件及软件供应商节约沟通和再开发成本,CFX标准的应用将简化并标准化机器设备之间的通信。

总结一下,IPC-CFX就是机器设备之间通信的“
统一语言
”,是大家都懂的“普通话”而不是“方言”。

话外音:对于IT开发者在追求的DDD(领域驱动设计),其核心思想也是“统一语言”,统一业务和技术之间的语言,提高沟通效率,进而提高软件质量。

IPC-CFX的适用范围

目前IPC-CFX主要在电子制造行业得到应用,特别是在
SMT(表面组装技术)
行业,越来越多的SMT设备厂商开始加入CFX联盟,比如SMT检测设备知名厂商Koh Young就是其中一员。

IPC-CFX标准不仅适用于
SMT(表面组装技术)
的相关生产,也支持机械装配、定制化、包装和运输等上游环节,甚至电气、机械子部件等上游环节。我们使用基于PC-CFX标准的传输信息来开发一些常见的应用例如设备利用率、生产线效率 和 整体设备效率(OEE)指标等。

IPC-CFX下设备如何通信?

在IPC-CFX的标准下,设备的数据被定义为
制造主题(Topic)

消息结构(Message)
。设备不太需要关注数据发送到哪里,数据来源于哪里,只需要知道在什么时机下发送什么数据,收到什么数据执行什么操作即可。

CFX都定义了哪些标准的Topic呢,如下图所示:

以一个Topic "WorkOrdersScheduled"为例,顾名思义:工单已排程。这个Topic代表会发出一个已经排好执行计划的工单,该工单即将在稍后某个特定时间某条生产线如SMT Line 1开始执行生产,其定义的消息数据体如下所示,还是比较完善的:

{"ScheduledWorkOrders": [
{
"WorkOrderIdentifier": {"WorkOrderId": "WO1122334455","Batch": null},"Scheduler": {"OperatorIdentifier": "BADGE4486","ActorType": "Human","LastName": "Doe","FirstName": "John","LoginName": "john.doe@domain1.com"},"WorkArea": "SMT Line 1","StartTime": "2018-08-02T11:00:00","EndTime": "2018-08-02T15:00:00","ReservedResources": ["L1PRINTER1","L1PLACER1","L1PLACER2","L1OVEN1"],"ReservedTools": [
{
"UniqueIdentifier": "UID23890430","Name": "TorqueWrench_123"}
],
"ReservedOperators": [
{
"OperatorIdentifier": "BADGE489435","ActorType": "Human","LastName": "Smith","FirstName": "Joseph","LoginName": "joseph.smith@abcdrepairs.com"}
],
"ReservedMaterials": [
{
"UniqueIdentifier": "UID23849854385","InternalPartNumber": "PN4452","Quantity": 0.0},
{
"UniqueIdentifier": "UID23849854386","InternalPartNumber": "PN4452","Quantity": 0.0},
{
"UniqueIdentifier": "UID23849854446","InternalPartNumber": "PN3358","Quantity": 0.0}
]
}
]
}

又如 "WorkOrdersUnschedule" 这个Topic,代表SMT Line 1的某个工单即将被取消,其数据格式就简单很多:

{"ScheduledWorkOrderIdentifiers": [
{
"WorkOrderIdentifier": {"WorkOrderId": "WO1122334455","Batch": null},"WorkArea": "SMT Line 1"}
]
}

除此之外,CFX还用一个统一的消息信封来包裹这个消息体,我们可以理解为定义了一个如下所示的统一消息格式:

public classCFXEnvelope
{
public string MessageName {get;set;} //eg. CFX.Produciton.Application.MaterialsApplied public string Version {get;set;} //eg. 1.7 ......public T MessageBody {get;set;} //消息体内容:泛型 }

Anyway,这个对于我们IT工程师是比较好理解的,这就跟我们的系统和系统之间通过消息队列(如Kafka)进行发布订阅模式的异步通信一模一样。不过这里呢,是机器与机器,机器与企业之间的通信。

但是,
IPC-CFX标准下是基于AMQP协议做消息传输的,每台设备都可以看作是一个AMQP端点
,通过发布和订阅实现数据的交互。此外,IPC-CFX
还支持点对点(Point-to-Point)
的消息模式(请求/响应模式)。

我们都知道,Kafka是不支持AMQP协议的,因此,要使用IPC-CFX就不能直接使用Kafka作为Message Broker,而IPC-CFX官方的案例也都是使用RabbitMQ来写的,虽然我觉得在设备数据交换场景Kafka的性能会更好。

如何开发机台程序?

如何让一台台的设备变成符合IPC-CFX标准的AMQP节点呢?常规做法就是在机台侧开发一个程序,这里IPC-CFX组织为我们提供了一个SDK,其实是一个.NET开发包(Nuget安装即可),所以对咱们.NET开发者是十分友好的。

  • 对于.NET 4.6可以使用 CFX.CFXSDK

  • 对于.NET Core及以上可以使用 CFX.CFXSDK.NetStandard

这个SDK提供了以下功能:

  • 将所有CFX消息的用Class/Object表示。

  • 能够将CFX消息对象序列化和反序列化为JSON格式。

  • 能够通过AMQP传输协议将CFX消息发布到一个或多个目的地。

  • 能够通过AMQP传输协议从一个或多个订阅源接收CFX消息。

  • 完全自动化的网络连接故障管理(即使在网络宕机或其他不可靠的情况下保持AMQP连接)。

  • CFX消息“假脱机”。维护由于网络条件错误而无法传输的CFX消息的持久队列。一旦网络服务恢复,消息将自动按原来的顺序传输。

  • 点对点CFX端点命令(请求/响应)支持。

  • 支持AMQP 1.0 SASL认证和TLS加密。

官方SDK文档传送门:
SDK文档

不过,通过学习发现,这个SDK主要还是提供了统一的Topic和Message的数据结构,至于和RabbitMQ的连接,个人感觉不太方便使用,我们完全可以使用其他成熟的RabbitMQ API组件来完成发布和订阅。

接下来,我们来快速实践一下CFX的两种通信方式:发布订阅 和 点对点。

快速开始:搭建一个RabbitMQ

既然IPC-CFX是基于AMQP协议来的,那我们就搭一个RabbitMQ吧。这里我们快速用docker-compose安装一个RabbitMQ。

version: '3'services:
rabbitmq1:
image: rabbitmq:
3.8-management
container_name: rabbit-mq-service
hostname: rabbit-mq-server
ports:
-
"5672:5672"-"15672:15672"restart: always
environment:
- RABBITMQ_DEFAULT_USER=rabbit # user account
- RABBITMQ_DEFAULT_PASS=EdisonTalk2024 # password
volumes:
- rabbitmq_data:/var/lib/rabbitmq

volumes:
rabbitmq_data:
driver: local

然后,通过下面的命令把RabbitMQ Run起来:需要注意的点就是需要手动开启AMQP1.0协议!

docker-compose up -d

#进入RabbitMQ容器
docker exec -it rabbit-mq-service /bin/bash
#开启AMQP1.0协议
rabbitmq-plugins enable rabbitmq_amqp1_0

成功运行起来后,能够成功打开RabbitMQ管理界面:

快速开始:实现基于CFX标准的发布订阅通信

发布者

这里我们通过Visual Studio创建一个控制台应用程序,基于.NET Framework 4.8来实现。

首先,安装Nuget包:

  • CFX.CFXSDK
  • EasyNetQ

其次,完成联接Broker 和 发布Message 的代码:

namespaceAMQP.MachineA
{
/// <summary> ///MachineA: SEWC.SMT.001/// </summary> public classProgram
{
private const string _machineName = "SDC.SMT.001";private const string _amqpBroker = "rabbit-mq-server"; //RabbitMQ-Host private const string _amqpUsername = "rabbit"; //RabbitMQ-User private const string _amqpPassword = "rabbit-password"; //RabbitMQ-Password public static void Main(string[] args)
{
Console.WriteLine($
"Current Machine: {_machineName}");
Console.Write($
"Current Role: Publisher {Environment.NewLine}");var connStr = $"host={_amqpBroker};username={_amqpUsername};password={_amqpPassword}";using (var amqpBus =RabbitHutch.CreateBus(connStr))
{
while (true)
{
Console.WriteLine($
"[Info] Starting to send a message to AMQP broker.");//Build a CFX Message of MaterialsApplied var message = new CFXEnvelope(newMaterialsApplied()
{
TransactionId
=Guid.NewGuid(),
AppliedMaterials
= new List<InstalledMaterial>{newInstalledMaterial()
{
QuantityInstalled
= 1,
QuantityNonInstalled
= 2}
}
});

amqpBus.PubSub.Publish(message);
Console.WriteLine($
"[Info] Finished to send a message to AMQP broker.");
Console.WriteLine(
"-------------------------------------------------------------------");

Thread.Sleep(
1000 * 3);
}
}
}
}
}

Note:
这里只是为了快速演示,实际中账号密码以及Broker地址建议写到配置文件中,并使用AMQPS协议联接,否则你的账号密码会被明文在网络中传输。

订阅者

参考发布者,仍然创建一个控制台应用程序,安装两个NuGet包。

然后,实现消费者逻辑:

namespaceAMQP.MachineB
{
/// <summary> ///MachineB: SEWC.SMT.002/// </summary> public classProgram
{
private const string _machineName = "SDC.SMT.002";private const string _amqpBroker = "rabbit-mq-server"; //RabbitMQ-Host private const string _amqpUsername = "rabbit"; //RabbitMQ-User private const string _amqpPassword = "rabbit-password"; //RabbitMQ-Password public static void Main(string[] args)
{
Console.WriteLine($
"Current Machine: {_machineName}");
Console.WriteLine($
"Current Role: Subscriber {Environment.NewLine}");var connStr = $"host={_amqpBroker};username={_amqpUsername};password={_amqpPassword}";using (var amqpBus =RabbitHutch.CreateBus(connStr))
{
amqpBus.PubSub.Subscribe
<CFXEnvelope>(_machineName, message =>{if (message.MessageBody isMaterialsApplied)
{
Console.WriteLine($
"[Info] Got a message with topic {message.MessageName} :{Environment.NewLine}{message.ToJson(true)}");
Console.WriteLine(
"-------------------------------------------------------");
}
});

Console.WriteLine(
"Press any key to exit.");
Console.ReadLine();
}
}
}
}

最终的Demo效果如下图所示:

两个控制台应用程序模拟两个机台程序,实现了基于AMQP协议和CFX标准格式的异步通信。但是整体来讲,实现异步通信并不是重点,而是两个机台采用了所谓的“
统一语言
”。

快速开始:实现基于CFX标准的点对点通信

基于上面的了解,我们知道基于CFX我们还可以让设备之间实现点对点的通信,也可以不通过Broker转发,而且它仍然是基于AMQP协议的。

在点对点模式下,基于CFX SDK,会自动帮你创建一个基于Socket的通信进程,机台程序之间可以互相应答。

(1)机台A

namespaceP2P.MachineA
{
/// <summary> ///MachineA: SEWC.SMT.001/// </summary> public classProgram
{
private const string _sendCfxHandle = "SDC.SMT.001"; //Sender private const string _receiveCfxHandle = "SDC.SMT.002"; //Receiver private const string _sendRequestUri = "amqp://127.0.0.1:8234"; //Sender private const string _receiveRequestUri = "amqp://127.0.0.1:8235"; //Receiver public static void Main(string[] args)
{
Console.WriteLine($
"Current Machine: {_sendCfxHandle}");
Console.WriteLine($
"Current Uri: {_sendRequestUri}");
OpenRequest();
Console.WriteLine(
"Press Enter Key to start the CFX Sender");
Console.ReadKey();
while (true)
{
SendRequest();
Thread.Sleep(
1000 * 5); //Send message every 5 seconds }
}
#region AMQP Sender private staticAmqpCFXEndpoint _sendRequestEndpoint;private static voidOpenRequest()
{
if (_sendRequestEndpoint != null)
{
_sendRequestEndpoint.Close();
_sendRequestEndpoint
= null;
}

_sendRequestEndpoint
= newAmqpCFXEndpoint();
Console.WriteLine($
"[Debug] SendCFXEndpoint.IsOpen : {_sendRequestEndpoint.IsOpen.ToString()}");
_sendRequestEndpoint.Open(_sendCfxHandle,
newUri(_sendRequestUri));
Console.WriteLine($
"[Debug] SendCFXEndpoint.IsOpen : {_sendRequestEndpoint.IsOpen.ToString()}");

AmqpCFXEndpoint.RequestTimeout
= TimeSpan.FromSeconds(10 * 2);
}
private static voidSendRequest()
{
var message = CFXEnvelope.FromCFXMessage(newMaterialsApplied()
{
TransactionId
=Guid.NewGuid(),
AppliedMaterials
= new List<InstalledMaterial>{newInstalledMaterial()
{
QuantityInstalled
= 1,
QuantityNonInstalled
= 2}
}
});
message.Source
=_sendCfxHandle;
message.Target
=_receiveCfxHandle;
message.TimeStamp
=DateTime.Now;try{
Console.WriteLine($
"[Info] Starting to send a message to Target Machine {_receiveCfxHandle}.");var response =_sendRequestEndpoint.ExecuteRequest(_receiveRequestUri, message);
Console.WriteLine($
"[Info] Target Machine {_receiveCfxHandle} returns : {Environment.NewLine}{response.ToJson(true)}");
}
catch(Exception ex)
{
Console.WriteLine($
"[Error] Exception message: {ex.Message}");
}
finally{
Console.WriteLine(
"-------------------------------------------------------");
}
}
#endregion}
}

Note:
既然是点对点,那发送者就必须要知道接收者的位置。

(2)机台B

namespaceP2P.MachineB
{
/// <summary> ///MachineB: SEWC.SMT.002/// </summary> public classProgram
{
private const string _receiveCfxHandle = "SDC.SMT.002";private const string _receiveRequestUri = "amqp://127.0.0.1:8235";public static void Main(string[] args)
{
Console.WriteLine($
"Current Machine: {_receiveCfxHandle}");
Console.WriteLine($
"Current Uri: {_receiveRequestUri}");
OpenListener();
Console.WriteLine(
"Press Entery Key to end the CFX Listener");

Console.ReadKey();
}
#region AMQP Receiver private staticAmqpCFXEndpoint _receiveRequestEndpoint;private static voidOpenListener()
{
if (_receiveRequestEndpoint != null)
{
_receiveRequestEndpoint.Close();
_receiveRequestEndpoint
= null;
}

_receiveRequestEndpoint
= newAmqpCFXEndpoint();
_receiveRequestEndpoint.OnRequestReceived
-=CFXMessageOnRequestReceived;
_receiveRequestEndpoint.OnRequestReceived
+=CFXMessageOnRequestReceived;

Console.WriteLine($
"[Debug] SendCFXEndpoint.IsOpen: {_receiveRequestEndpoint.IsOpen.ToString()}");
_receiveRequestEndpoint.Open(_receiveCfxHandle,
newUri(_receiveRequestUri));
Console.WriteLine($
"[Debug] SendCFXEndpoint.IsOpen: {_receiveRequestEndpoint.IsOpen.ToString()}");

AmqpCFXEndpoint.RequestTimeout
= TimeSpan.FromSeconds(10 * 2);
}
private staticCFXEnvelope CFXMessageOnRequestReceived(CFXEnvelope message)
{
Console.WriteLine($
"[Info] Got a message from Source Machine {message.Source} :{Environment.NewLine}{message.ToJson(true)}");
Console.WriteLine(
"-------------------------------------------------------");var result = (CFXEnvelope)null;if (message.MessageBody isWhoIsThereRequest)
{
result
= CFXEnvelope.FromCFXMessage(newWhoIsThereResponse()
{
CFXHandle
=_receiveCfxHandle,
RequestNetworkUri
=_receiveRequestUri,
RequestTargetAddress
= "..."});
}
else if (message.MessageBody isMaterialsApplied)
{
result
= CFXEnvelope.FromCFXMessage(newWhoIsThereResponse()
{
CFXHandle
=_receiveCfxHandle,
RequestNetworkUri
=_receiveRequestUri,
RequestTargetAddress
= "..."});
}
else{return null;
}

result.Source
=_receiveCfxHandle;
result.Target
=result.Source;
result.TimeStamp
=DateTime.Now;returnresult;
}
#endregion}
}

点对点Demo效果:

小结

本文我们了解了IPC-CFX标准产生的背景 和 用途,它是机器设备之间通信的“
统一语言
”,是大家都懂的“普通话”而不是“方言”。

首先,IPC-CFX使用AMQP v1.0传输协议实现安全的连接,使用JSON进行数据编码,提供了明确的消息结构和数据内容,确保即插即用。

其次,我们通过两个Demo快速了解了如何实现一个基于CFX标准的机台端应用程序,来实现“统一语言”的设备间通信。

最后,就目前互联网上的资料来看,国内社区对于CFX的应用来看整体都还是不多的,我们也还处于学习阶段,希望未来或许有新的更新分享。

参考资料

IPC CFX 官方文档:Getting Started with SDK

齐开得科技:IPC-CFX在SMT领域的应用

MQTT vs AMQP:物联网通信协议对比

开心一刻

昨天发了一条朋友圈:酒吧有什么好去的,上个月在酒吧当服务员兼职,一位大姐看上了我,说一个月给我 10 万,要我陪她去上海,我没同意

朋友评论道:你没同意,为什么在上海?

我回复到:上个月没同意

嘴真硬

前情回顾

关于
DataX
,官网有很详细的介绍,鄙人不才,也写过几篇文章

异构数据源同步之数据同步 → datax 改造,有点意思

异构数据源同步之数据同步 → datax 再改造,开始触及源码

异构数据源同步之数据同步 → DataX 使用细节

异构数据源数据同步 → 从源码分析 DataX 敏感信息的加解密

不了解的小伙伴可以按需去查看,所以了,
DataX
就不做过多介绍了;官方提供了非常多的插件,囊括了绝大部分的数据源,基本可以满足我们日常需要,但数据源种类太多,DataX 插件不可能包含全部,比如
kafka
,DataX 官方是没有提供读写插件的,大家知道为什么吗?你们如果对数据同步了解的比较多的话,一看到 kafka,第一反应往往想到的是
实时同步
,而 DataX 针对的是
离线同步
,所以 DataX 官方没提供 kafka 插件是不是也就能理解了?因为不合适嘛!

但如果客户非要离线同步也支持 kafka

人家要嘛

你能怎么办?直接怼过去:实现不了?

实现不了

所以没得选,那就只能给 DataX 开发一套 kafka 插件了;基于
DataX插件开发宝典
,插件开发起来还是非常简单的

kafkawriter

  1. 编程接口

    自定义
    Kafkawriter
    继承 DataX 的
    Writer
    ,实现 job、task 对应的接口即可

    /**
     * @author 青石路
     */
    public class KafkaWriter extends Writer {
    
        public static class Job extends Writer.Job {
    
            private Configuration conf = null;
    
            @Override
            public List<Configuration> split(int mandatoryNumber) {
                List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
                for (int i = 0; i < mandatoryNumber; i++) {
                    configurations.add(this.conf.clone());
                }
                return configurations;
            }
    
            private void validateParameter() {
                this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);
                this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
            }
    
            @Override
            public void init() {
                this.conf = super.getPluginJobConf();
                this.validateParameter();
            }
    
    
            @Override
            public void destroy() {
    
            }
        }
    
        public static class Task extends Writer.Task {
            private static final Logger logger = LoggerFactory.getLogger(Task.class);
            private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
    
            private Producer<String, String> producer;
            private Configuration conf;
            private Properties props;
            private String fieldDelimiter;
            private List<String> columns;
            private String writeType;
    
            @Override
            public void init() {
                this.conf = super.getPluginJobConf();
                fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
                columns = conf.getList(Key.COLUMN, String.class);
                writeType = conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null);
                if (CollUtil.isEmpty(columns)) {
                    throw DataXException.asDataXException(KafkaWriterErrorCode.REQUIRED_VALUE,
                            String.format("您提供配置文件有误,[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN));
                }
    
                props = new Properties();
                props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
                //这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
                props.put(ProducerConfig.ACKS_CONFIG, conf.getUnnecessaryValue(Key.ACK, "0", null));
                props.put(CommonClientConfigs.RETRIES_CONFIG, conf.getUnnecessaryValue(Key.RETRIES, "0", null));
                props.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));
                props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
    
                Configuration saslConf = conf.getConfiguration(Key.SASL);
                if (ObjUtil.isNotNull(saslConf)) {
                    logger.info("配置启用了SASL认证");
                    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaWriterErrorCode.REQUIRED_VALUE));
                    props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaWriterErrorCode.REQUIRED_VALUE));
                    String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaWriterErrorCode.REQUIRED_VALUE);
                    String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaWriterErrorCode.REQUIRED_VALUE);
                    props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
                }
    
                producer = new KafkaProducer<String, String>(props);
            }
    
            @Override
            public void prepare() {
                if (Boolean.parseBoolean(conf.getUnnecessaryValue(Key.NO_TOPIC_CREATE, "false", null))) {
    
                    ListTopicsResult topicsResult = AdminClient.create(props).listTopics();
                    String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
    
                    try {
                        if (!topicsResult.names().get().contains(topic)) {
                            new NewTopic(
                                    topic,
                                    Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),
                                    Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))
                            );
                            List<NewTopic> newTopics = new ArrayList<NewTopic>();
                            AdminClient.create(props).createTopics(newTopics);
                        }
                    } catch (Exception e) {
                        throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());
                    }
                }
            }
    
            @Override
            public void startWrite(RecordReceiver lineReceiver) {
                logger.info("start to writer kafka");
                Record record = null;
                while ((record = lineReceiver.getFromReader()) != null) {//说明还在读取数据,或者读取的数据没处理完
                    //获取一行数据,按照指定分隔符 拼成字符串 发送出去
                    if (writeType.equalsIgnoreCase(WriteType.TEXT.name())) {
                        producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                                recordToString(record),
                                recordToString(record))
                        );
                    } else if (writeType.equalsIgnoreCase(WriteType.JSON.name())) {
                        producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                                recordToString(record),
                                recordToKafkaJson(record))
                        );
                    }
                    producer.flush();
                }
            }
    
            @Override
            public void destroy() {
                logger.info("producer close");
                if (producer != null) {
                    producer.close();
                }
            }
    
            /**
             * 数据格式化
             *
             * @param record
             * @return
             */
            private String recordToString(Record record) {
                int recordLength = record.getColumnNumber();
                if (0 == recordLength) {
                    return NEWLINE_FLAG;
                }
                Column column;
                StringBuilder sb = new StringBuilder();
                for (int i = 0; i < recordLength; i++) {
                    column = record.getColumn(i);
                    sb.append(column.asString()).append(fieldDelimiter);
                }
    
                sb.setLength(sb.length() - 1);
                sb.append(NEWLINE_FLAG);
    
                return sb.toString();
            }
    
            private String recordToKafkaJson(Record record) {
                int recordLength = record.getColumnNumber();
                if (recordLength != columns.size()) {
                    throw DataXException.asDataXException(KafkaWriterErrorCode.ILLEGAL_PARAM,
                            String.format("您提供配置文件有误,列数不匹配[record columns=%d, writer columns=%d]", recordLength, columns.size()));
                }
                List<KafkaColumn> kafkaColumns = new ArrayList<>();
                for (int i = 0; i < recordLength; i++) {
                    KafkaColumn column = new KafkaColumn(record.getColumn(i), columns.get(i));
                    kafkaColumns.add(column);
                }
                return JSONUtil.toJsonStr(kafkaColumns);
            }
        }
    }
    

    DataX 框架按照如下的顺序执行 Job 和 Task 的接口


    job_task 接口执行顺序

    重点看 Task 的接口实现


    • init:读取配置项,然后创建 Producer 实例

    • prepare:判断 Topic 是否存在,不存在则创建

    • startWrite:通过 RecordReceiver 从 Channel 获取 Record,然后写入 Topic

      支持两种写入格式:
      text

      json
      ,细节请看下文中的
      kafkawriter.md

    • destroy:关闭 Producer 实例


    实现不难,相信大家都能看懂

  2. 插件定义


    resources
    下新增
    plugin.json

    {
        "name": "kafkawriter",
        "class": "com.qsl.datax.plugin.writer.kafkawriter.KafkaWriter",
        "description": "write data to kafka",
        "developer": "qsl"
    }
    

    强调下
    class
    ,是
    KafkaWriter
    的全限定类名,如果你们没有完全拷贝我的,那么要改成你们自己的

  3. 配置文件


    resources
    下新增
    plugin_job_template.json

    {
        "name": "kafkawriter",
        "parameter": {
            "bootstrapServers": "",
            "topic": "",
            "ack": "all",
            "batchSize": 1000,
            "retries": 0,
            "fieldDelimiter": ",",
            "writeType": "json",
            "column": [
                "const_id",
                "const_field",
                "const_field_value"
            ],
            "sasl": {
                "securityProtocol": "SASL_PLAINTEXT",
                "mechanism": "PLAIN",
                "username": "",
                "password": ""
            }
        }
    }
    

    配置项说明:
    kafkawriter.md

  4. 打包发布

    可以参考官方的
    assembly
    配置,利用 assembly 来打包

至此,
kafkawriter
就算完成了

kafkareader

  1. 编程接口

    自定义
    Kafkareader
    继承 DataX 的
    Reader
    ,实现 job、task 对应的接口即可

    /**
     * @author 青石路
     */
    public class KafkaReader extends Reader {
    
        public static class Job extends Reader.Job {
    
            private Configuration originalConfig = null;
    
            @Override
            public void init() {
                this.originalConfig = super.getPluginJobConf();
                this.validateParameter();
            }
    
            @Override
            public void destroy() {
    
            }
    
            @Override
            public List<Configuration> split(int adviceNumber) {
                List<Configuration> configurations = new ArrayList<>(adviceNumber);
                for (int i=0; i<adviceNumber; i++) {
                    configurations.add(this.originalConfig.clone());
                }
                return configurations;
            }
    
            private void validateParameter() {
                this.originalConfig.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaReaderErrorCode.REQUIRED_VALUE);
                this.originalConfig.getNecessaryValue(Key.TOPIC, KafkaReaderErrorCode.REQUIRED_VALUE);
            }
        }
    
        public static class Task extends Reader.Task {
    
            private static final Logger logger = LoggerFactory.getLogger(Task.class);
    
            private Consumer<String, String> consumer;
            private String topic;
            private Configuration conf;
            private int maxPollRecords;
            private String fieldDelimiter;
            private String readType;
            private List<Column.Type> columnTypes;
    
            @Override
            public void destroy() {
                logger.info("consumer close");
                if (Objects.nonNull(consumer)) {
                    consumer.close();
                }
            }
    
            @Override
            public void init() {
                this.conf = super.getPluginJobConf();
                this.topic = conf.getString(Key.TOPIC);
                this.maxPollRecords = conf.getInt(Key.MAX_POLL_RECORDS, 500);
                fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
                readType = conf.getUnnecessaryValue(Key.READ_TYPE, ReadType.JSON.name(), null);
                if (!ReadType.JSON.name().equalsIgnoreCase(readType)
                        && !ReadType.TEXT.name().equalsIgnoreCase(readType)) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
                            String.format("您提供配置文件有误,不支持的readType[%s]", readType));
                }
                if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
                    List<String> columnTypeList = conf.getList(Key.COLUMN_TYPE, String.class);
                    if (CollUtil.isEmpty(columnTypeList)) {
                        throw DataXException.asDataXException(KafkaReaderErrorCode.REQUIRED_VALUE,
                                String.format("您提供配置文件有误,readType是JSON时[%s]是必填参数,不允许为空或者留白 .", Key.COLUMN_TYPE));
                    }
                    convertColumnType(columnTypeList);
                }
                Properties props = new Properties();
                props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, conf.getString(Key.BOOTSTRAP_SERVERS));
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, conf.getUnnecessaryValue(Key.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer", null));
                props.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getNecessaryValue(Key.GROUP_ID, KafkaReaderErrorCode.REQUIRED_VALUE));
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
                Configuration saslConf = conf.getConfiguration(Key.SASL);
                if (ObjUtil.isNotNull(saslConf)) {
                    logger.info("配置启用了SASL认证");
                    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslConf.getNecessaryValue(Key.SASL_SECURITY_PROTOCOL, KafkaReaderErrorCode.REQUIRED_VALUE));
                    props.put(SaslConfigs.SASL_MECHANISM, saslConf.getNecessaryValue(Key.SASL_MECHANISM, KafkaReaderErrorCode.REQUIRED_VALUE));
                    String userName = saslConf.getNecessaryValue(Key.SASL_USERNAME, KafkaReaderErrorCode.REQUIRED_VALUE);
                    String password = saslConf.getNecessaryValue(Key.SASL_PASSWORD, KafkaReaderErrorCode.REQUIRED_VALUE);
                    props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", userName, password));
                }
                consumer = new KafkaConsumer<>(props);
            }
    
            @Override
            public void startRead(RecordSender recordSender) {
                consumer.subscribe(CollUtil.newArrayList(topic));
                int pollTimeoutMs = conf.getInt(Key.POLL_TIMEOUT_MS, 1000);
                int retries = conf.getInt(Key.RETRIES, 5);
                if (retries < 0) {
                    logger.info("joinGroupSuccessRetries 配置有误[{}], 重置成默认值[5]", retries);
                    retries = 5;
                }
                /**
                 * consumer 每次都是新创建,第一次poll时会重新加入消费者组,加入过程会进行Rebalance,而 Rebalance 会导致同一 Group 内的所有消费者都不能工作
                 * 所以 poll 拉取的过程中,即使topic中有数据也不一定能拉到,因为 consumer 正在加入消费者组中
                 * kafka-clients 没有对应的API、事件机制来知道 consumer 成功加入消费者组的确切时间
                 * 故增加重试
                 */
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                int i = 0;
                if (CollUtil.isEmpty(records)) {
                    for (; i < retries; i++) {
                        records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                        logger.info("第 {} 次重试,获取消息记录数[{}]", i + 1, records.count());
                        if (!CollUtil.isEmpty(records)) {
                            break;
                        }
                    }
                }
                if (i >= retries) {
                    logger.info("重试 {} 次后,仍未获取到消息,请确认是否有数据、配置是否正确", retries);
                    return;
                }
                transferRecord(recordSender, records);
                do {
                    records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
                    transferRecord(recordSender, records);
                } while (!CollUtil.isEmpty(records) && records.count() >= maxPollRecords);
            }
    
            private void transferRecord(RecordSender recordSender, ConsumerRecords<String, String> records) {
                if (CollUtil.isEmpty(records)) {
                    return;
                }
                for (ConsumerRecord<String, String> record : records) {
                    Record sendRecord = recordSender.createRecord();
                    String msgValue = record.value();
                    if (ReadType.JSON.name().equalsIgnoreCase(readType)) {
                        transportJsonToRecord(sendRecord, msgValue);
                    } else if (ReadType.TEXT.name().equalsIgnoreCase(readType)) {
                        // readType = text,全当字符串类型处理
                        String[] columnValues = msgValue.split(fieldDelimiter);
                        for (String columnValue : columnValues) {
                            sendRecord.addColumn(new StringColumn(columnValue));
                        }
                    }
                    recordSender.sendToWriter(sendRecord);
                }
                consumer.commitAsync();
            }
    
            private void convertColumnType(List<String> columnTypeList) {
                columnTypes = new ArrayList<>();
                for (String columnType : columnTypeList) {
                    switch (columnType.toUpperCase()) {
                        case "STRING":
                            columnTypes.add(Column.Type.STRING);
                            break;
                        case "LONG":
                            columnTypes.add(Column.Type.LONG);
                            break;
                        case "DOUBLE":
                            columnTypes.add(Column.Type.DOUBLE);
                        case "DATE":
                            columnTypes.add(Column.Type.DATE);
                            break;
                        case "BOOLEAN":
                            columnTypes.add(Column.Type.BOOL);
                            break;
                        case "BYTES":
                            columnTypes.add(Column.Type.BYTES);
                            break;
                        default:
                            throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                                    String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnType));
                    }
                }
            }
    
            private void transportJsonToRecord(Record sendRecord, String msgValue) {
                List<KafkaColumn> kafkaColumns = JSONUtil.toList(msgValue, KafkaColumn.class);
                if (columnTypes.size() != kafkaColumns.size()) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                            String.format("您提供的配置文件有误,readType是JSON时[%s列数=%d]与[json列数=%d]的数量不匹配", Key.COLUMN_TYPE, columnTypes.size(), kafkaColumns.size()));
                }
                for (int i=0; i<columnTypes.size(); i++) {
                    KafkaColumn kafkaColumn = kafkaColumns.get(i);
                    switch (columnTypes.get(i)) {
                        case STRING:
                            sendRecord.setColumn(i, new StringColumn(kafkaColumn.getColumnValue()));
                            break;
                        case LONG:
                            sendRecord.setColumn(i, new LongColumn(kafkaColumn.getColumnValue()));
                            break;
                        case DOUBLE:
                            sendRecord.setColumn(i, new DoubleColumn(kafkaColumn.getColumnValue()));
                            break;
                        case DATE:
                            // 暂只支持时间戳
                            sendRecord.setColumn(i, new DateColumn(Long.parseLong(kafkaColumn.getColumnValue())));
                            break;
                        case BOOL:
                            sendRecord.setColumn(i, new BoolColumn(kafkaColumn.getColumnValue()));
                            break;
                        case BYTES:
                            sendRecord.setColumn(i, new BytesColumn(kafkaColumn.getColumnValue().getBytes(StandardCharsets.UTF_8)));
                            break;
                        default:
                            throw DataXException.asDataXException(KafkaReaderErrorCode.ILLEGAL_PARAM,
                                    String.format("您提供的配置文件有误,datax不支持数据类型[%s]", columnTypes.get(i)));
                    }
                }
            }
        }
    }
    

    重点看 Task 的接口实现


    • init:读取配置项,然后创建 Consumer 实例

    • startWrite:从 Topic 拉取数据,通过 RecordSender 写入到 Channel 中

      这里有几个细节需要注意下


      1. Consumer 每次都是新创建的,拉取数据的时候,如果消费者还未加入到指定的消费者组中,那么它会先加入到消费者组中,加入过程会进行 Rebalance,而 Rebalance 会导致同一消费者组内的所有消费者都不能工作,此时即使 Topic 中有可拉取的消息,也拉取不到消息,所以引入了重试机制来尽量保证那一次同步任务拉取的时候,消费者能正常拉取消息
      2. 一旦 Consumer 拉取到消息,则会循环拉取消息,如果某一次的拉取数据量小于最大拉取量(maxPollRecords),说明 Topic 中的消息已经被拉取完了,那么循环终止;这与常规使用(Consumer 会一直主动拉取或被动接收)是有差别的
      3. 支持两种读取格式:
        text

        json
        ,细节请看下文的配置文件说明
      4. 为了保证写入 Channel 数据的完整,需要配置列的数据类型(DataX 的数据类型)
    • destroy:

      关闭 Consumer 实例

  2. 插件定义


    resources
    下新增
    plugin.json

    {
        "name": "kafkareader",
        "class": "com.qsl.datax.plugin.reader.kafkareader.KafkaReader",
        "description": "read data from kafka",
        "developer": "qsl"
    }
    

    class

    KafkaReader
    的全限定类名

  3. 配置文件


    resources
    下新增
    plugin_job_template.json

    {
        "name": "kafkareader",
        "parameter": {
            "bootstrapServers": "",
            "topic": "test-kafka",
            "groupId": "test1",
            "writeType": "json",
            "pollTimeoutMs": 2000,
            "columnType": [
                "LONG",
                "STRING",
                "STRING"
            ],
            "sasl": {
                "securityProtocol": "SASL_PLAINTEXT",
                "mechanism": "PLAIN",
                "username": "",
                "password": "2"
            }
        }
    }
    

    配置项说明:
    kafkareader.md

  4. 打包发布

    可以参考官方的
    assembly
    配置,利用 assembly 来打包

至此,
kafkareader
也完成了

总结

  1. 完整代码:
    qsl-datax
  2. kafkareader 重试机制只能降低拉取不到数据的概率,并不能杜绝;另外,如果上游一直往 Topic 中发消息,kafkareader 每次拉取的数据量都等于最大拉取量,那么同步任务会一直进行而不会停止,这还是离线同步吗?
  3. 离线同步,不推荐走 kafka,因为用 kafka 走实时同步更香

大家好,我是码农先森。

谈到这个话题有些朋友心中不免会有疑惑,为什么是 Swoole 而不是其他呢?因为 Swoole 是基于 C/C++ 语言开发的高性能异步通信扩展,覆盖的特性足够的多,有利于 PHP 程序员接触更全面的技术知识点。大多数的朋友踏入到 PHP 的大门都是因其简单的语法及其弱类型的特性,还有各种集成环境安装包、简单易用的框架,随随便便就可以快速的搞出一个系统,这就让 PHP 在我们心中落下的「简单易用」的印象。

这种印象就注定了 PHP 程序员在编写代码的过程中,怎么简单怎么来,随便的很,不瞒你说我还见过用中文来命名变量的,这简直颠覆了我的编程认知,好歹你用拼音也比中文强吧。不过为了良好的编程习惯,最好还是用英文单词,毕竟我们也要与国际接触嘛,自己的代码放到 GitHub 上也要让国际友人看的懂吧。话又说回来,很多时候 PHP 靓仔们写的代码,过一段时间后自己都看不懂了,更别提其他人了。我经常听到有人到处吐槽,前人的代码写的和屎一样有又长又臭惨不忍睹,然后你还要憋着内伤在这坨屎上绣上一朵花,这场景不堪回首哈哈。

差点写跑偏了,还是揪回我们这次的主题吧。分水岭这个词大家都挺熟悉的,但是在这里就是把 PHP 程序员分成了两拨人了,一拨是一直在以 PHP-FPM 同步编程模式下编程的人,另一拨是以 Swoole 异步编程模式为代表的编程人。我们在 PHP-FPM 模式下编程时,集成环境一启动,就把 PHP-FPM 进程管理器、Nginx 服务、MySQL 服务全部都给搞起来了,不需要关心其中的细节,如果想要增加新项目就在 Nginx 的 vhost 目录,配置一个本地域名重启一下 Nginx 就完事了,要是遇到项目总是报错迟迟无法访问,就反复重启集成环境,运气好点的重启一两次就莫名的好了,倒霉的就凉凉了,又要操蛋的重装集成环境了,更有甚者直接重装操作系统,一顿操作猛如虎。还有在这种模式下,我们也不需要关心内存的使用情况,变量想定义多少搞多少,外部资源随意加载,从数据库查询数据全部都是
select *
的骚操作,管它用不用的上全部都搞出来再说,如果造成接口访问时间过长,就全部怼到 Redis 缓存中去,美其名曰这是高级的性能优化方案。要是懂得点异步技术的靓仔,还会在比如发送短信等的场景下,使用一下 Redis 消息队列,异步给用户发送消息而不阻塞同步接口,从而使系统的并发性能得到提高。

长期在 PHP-FPM 模式下编程的人,几乎不用懂操作系统、网络协议等基础知识,协议上不管怎么用都只有 HTTP 协议。然而在 Swoole 异步编程模式下,不懂这些基础知识那可就行不通了,除了 HTTP 协议还有 WebSocket 协议、MQTT 协议、TCP 协议、UDP 协议,甚至还可以自定义网络协议,如果你对基础协议知识不了解,那刚开始学习 Swoole 估计就要夭折。其次 Swoole 中的协程、通道、多进程编程,就涉及到了操作系统的多路复用、IO调度策略、进程间通信、进程管理等知识,这些统统都是计算机的底层知识,是在 PHP-FPM 编程模式下接触不到的。还有在 Swoole 中可以针对数据库连接打造数据库连接池,高效的复用数据库连接资源,不用每次都重新连接数据库,避免资源的浪费。正是这些基础知识把大多数 PHP 程序员困在了低水平重复的技术区域里,因此要学好 Swoole 需要先学习计算机底层知识,然后再反复的利用 Swoole 进行实践,两者相辅相成便可好好打磨自己的技术。

可以说在 PHP 领域 Swoole 就是这些基础知识的具体体现,深入理解了 Swoole 并且能应用好,就能让自己的技术水平上一个新的台阶,超越大多数的 PHP 程序员。同时掌握了这些基础知识横向学习其他的语言也会易如反掌,原因是大多数语言的本质都是相通的,了解了其内在本质基本上就能做到一通百通。在这个持续内卷的时代,是时候让自己炼就一些不可或缺的内功了,不要让自己每天都疲于奔命在学习层出不穷新框架的路上,而是要掌握事物的根本规律以不变应万变,打造自己的知识壁垒墙,争取在内卷这场风暴中苟活下来。本次分享的内容到这里结束了,希望对大家能有所启发。

感谢阅读,个人观点仅供参考,欢迎在评论区发表不同观点。


欢迎关注、分享、点赞、收藏、在看,我是微信公众号「码农先森」作者。