2023年3月

作者:京东科技 葛星宇

1.前言

本文除特殊说明外,所指的都是fate 1.9版本。

fate资料存在着多处版本功能与发布的文档不匹配的情况,各个模块都有独立的文档,功能又有关联,坑比较多,首先要理清楚各概念、模块之间的关系。

2.网络互联架构

1. 概念解释:

RollSite是一个grpc通信组件,是eggroll引擎中的一个模块,相当于我们的grpc通信网关。

Exchange是RollSite中的一个功能,用于维护各方网关地址,并转发消息。参考
《FATE exchange部署指南》

2. 对比解读:

l 网状架构相当于我们的一体化版本模式,但没有dop平台来维护网关,每方需要在配置文件里维护其他参与方的网关地址。

l 星型架构的好处是只在Exchange方维护所有参与方的网关地址,前提是需要信任Exchange,并且流量全部都需要从Exchange方中转,相当于我们的中心化版本。但不支持证书。

3. Exchange配置

在Exchange上配置路由表:

在各party方配置默认路由指向exchange,不需要再配置每个party的地址。

3.总体架构

FATE支持eggroll和spark两种计算引擎,搭配不同的通信组件,共五种组合,不同的通信模块不能兼容。

参考:
《不同类型FATE的架构介绍》

区别:

l RabbitMQ是一个简单易上手的MQ

l Pulsar相比RabbitMQ,可以支持更大规模的集群化部署,也支持exchange模式的网络结构。

l Slim FATE相比其他模式,最大化减少集群所需的组件,可以使用在小规模联邦学习计算,IOT设备等情况。

3.1.基于EggRoll引擎的架构

Eggroll是FATE原生支持的计算存储引擎,包括以下三个组件:

l rollsite负责数据传输,以前的版本里叫 Proxy+Federation

l nodemanager负责存储和计算

l clustermanager负责管理nodemanager

3.2.基于spark+hdfs+rabbitMQ的架构

3.3. 基于spark+hdfs+Pulsar的架构

3.4. spark_local (Slim FATE)

支持rabbitMQ替换pulsar

4. 组件源码

所有的fate项目都在这个叫FederateAI社区的URL下:
https://github.com/FederatedAI

主项目:
FATE
是一个汇总的文档和超链集合,
学习入口

在线文档

关联项目:

•KubeFATE docker和k8s的部署

•AnsibleFATE 相当于我们的图形化部署版的底层脚本
学习入口

•FATE-Flow 联合学习任务流水线管理模块,注册、管理和调度中心。

•EggRoll 第一代fate的计算引擎

•FATE-Board 联合学习过程可视化模块,目前只能查看一些记录

•FATE-Serving 在线联合预测,
学习入口

•FATE-Cloud 联邦学习云服务,类似于我们的dop平台,管理功能。

•FedVision 联邦学习支持的可视化对象检测平台

•FATE-Builder fate编译工具

•FedLCM 新增的项目:创建 FATE 联邦并部署FATE实例。目前仅支持部署以Spark和Pulsar作为基础引擎,并使用Exchange实现互相连接的

5. FATE-Flow

FATE Flow是调度系统,根据用户提交的作业DSL,调度算法组件执行。

官网文档

服务能力:

· 数据接入

· 任务组件注册中心

· 联合作业&任务调度

· 多方资源协调

· 数据流动追踪

· 作业实时监测

· 联合模型注册中心

· 多方合作权限管理

· 系统高可用

· CLI、REST API、Python API

5.1. 流程架构

旧版,图比较立体

· DSL Parser:是调度的核心,通过 DSL parser 可以拿到上下游关系、依赖等。

· Job Scheduler:是 DAG 层面的调度,把 DAG 作为一个 Job,把 DAG 里面的节点 run 起来,就称为一个 task。

· Federated Task Scheduler:最小调度粒度就是 task,需要调度多方运行同一个组件但参数算法不同的 task,结束后,继续调度下一个组件,这里就会涉及到协同调度的问题。

· Job Controller:联邦任务控制器

· Executor:联邦任务执行节点,支持不同的 Operator 容器,现在支持 Python 和 Script 的 Operator。Executor,在我们目前的应用中拉起 FederatedML 定义的一些组件,如 data io 数据输入输出,特征选择等模块,每次调起一个组件去 run,然后,这些组件会调用基础架构的 API,如 Storage 和 Federation Service ( API 的抽象 ) ,再经过 Proxy 就可以和对端的 FATE-Flow 进行协同调度。

· Tracking Manager:任务输入输出的实时追踪,包括每个 task 输出的 data 和 model。

· Model Manager:联邦模型管理器

5.2. api service

DataAccess 数据上传,下载,历史记录,参考
示例

Job 提交(并运行),停止,查询,更新,配置,列表,task查询

Tracking

Pipeline

Model

Table

客户端命令行实际上是对api的包装调用,可以参考其
示例

Python调用api
示例

5.3. 算法模块

Federatedml
模块包括许多常见机器学习算法联邦化实现。所有模块均采用去耦的模块化方法开发,以增强模块的可扩展性。具体来说,我们提供:

1.联邦统计: 包括隐私交集计算,并集计算,皮尔逊系数, PSI等

2.联邦特征工程:包括联邦采样,联邦特征分箱,联邦特征选择等。

3.联邦机器学习算法:包括横向和纵向的联邦LR, GBDT, DNN,迁移学习等

4.模型评估:提供对二分类,多分类,回归评估,聚类评估,联邦和单边对比评估

5.安全协议:提供了多种安全协议,以进行更安全的多方交互计算。

Figure 1: Federated Machine Learning Framework

可开发在fate框架下运行的算法:
指南

6. FATE-Serving

6.1. 功能架构

6.2. 部署逻辑架构

Adatptor:默认的情况使用系统自带的MockAdatptor,仅返回固定数据用于简单测试,实际生产环境中需要使用者需要自行开发并对接自己的业务系统。(这部分可以看看能不能对接咱们自己的在线预测系统。)

l 支持使用rollsite/nginx/fateflow作为多方任务协调通信代理

l rollsite支持fate on eggroll的场景,仅支持grpc协议,支持P2P组网及星型组网模式

l nginx支持所有引擎场景,支持http与grpc协议,默认为http,支持P2P组网及星型组网模式

l fateflow支持所有引擎场景,支持http与grpc协议,默认为http,仅支持P2P组网模式,也即只支持互相配置对端fateflow地址

6.3. 部署实例图

6.4. 工作时序图

6.5. 模型推送流程

蓝色为guest集群,灰色代表host集群

1. 通过fate flow建模 2. 分别部署guest方 Fate-serving 与host方Fate-serving

3. 分别配置好guest方Fate-flow与guest方Fate-serving、host方Fate-flow 与host方Fate-serving。

4. Fate-flow推送模型

5. Fate-flow将模型绑定serviceId

6. 以上操作完成后,可以在serving-admin页面上查看模型相关信息(此步操作非必需)。

7. 可以在serving-admin页面上测试调用(此步操作非必需)。

6.6. 搭配nginx代理

https://fate-serving.readthedocs.io/en/develop/example/nginx/

FATE-Serving 之间的交互可以通过nginx反向代理转发grpc请求,以下几种场景配置如下:

· 场景一:双方不配置TLS,通过nginx四层代理转发

· 场景二:双方配置TLS,通过nginx四层代理转发,双方分别进行证书校验

· 场景三:数据使用方配置Client端证书,Nginx配置Server端证书,Host不配置证书,通过nginx七层代理转发,由Client端和nginx进行证书校验

7. FATE Cloud

FATE Cloud由负责联邦站点管理的云管理端Cloud Manager和站点客户端管理端FATE Manager组成,提供了联邦站点的注册与管理、集群自动化部署与升级、集群监控、集群权限控制等核心功能。

联邦云管理端(Cloud Manager)

联邦云管理端即联邦数据网络的管理中心,负责统一运营和管理FATE Manager及各站点,监控站点的服务与联邦合作建模,执行联邦各权限控制,保证联邦数据合作网络的正常运作;

联邦站点管理端(FATE Manager)

联邦站点管理端,负责管理和维护各自的联邦站点,为站点提供加入联邦组织、执行站点服务的自动化部署与升级,监控站点的联邦合作与集群服务,并管理站点用户角色与应用权限;

产品手册

8. 部署测试

共有4类部署方式,单机的安装模式是只提供了单机的安装文档,也可以研究怎么扩展成集群模式。

| | 单机(不推荐生产用) | 集群(生产推荐) | | 非容器 | AllinOne | ansible | | 容器 | docker compose | k8s |

部署时会要求配置机器对应的角色,只能选host,guest和Exchange,其中host和guest并没有区别,实际运行联邦时还是在job的配置中去配置哪一方是guest,哪一方是host,任务只能在guest方提交。

8.1. AllinOne

所有的组件都部署在一台机器上,比较适合开发调试,参考
链接

8.2. ansible

尝试用ansible部署时遇到了python相关的错误,指导文档也缺少详细的步骤,没有相关错误的说明。

8.3. k8s

手上没有k8s环境,暂未测试。

参考文档:
《KubeFATE 部署FATE支持引擎介绍》

8.4. docker compose

容器部署尝试用docker compose方式部署了一对,比较顺利,参考了2篇官方文章,前边的准备步骤和安装过程参考
此文
,“验证部署”及之后的步骤参考
《Docker Compose 部署 FATE》

不同点如下:

8.4.1. 准备阶段

下载镜像较慢,如果大批量部署,可以搭建内网镜像服务。

| Role | party-id | OS | IP | | | host | 20001 | Centos7.6 | 11.50.52.81 | 8C64G | | guest | 20002 | Centos7.6 | 11.50.52.62 | 8C64G | | 部署机 | | Centos7.6 | 11.50.52.40 | |

以上内容替代文档中对应的部分内容。

一开始我只部署了一台host,本来打算这2台做一个集群,后来发现文档里没提这种方式,只好先按文档实验一次,于是又部署了guest,这样在guest的配置里已经写好了host的地址,于是手动将配置更新到了host的/data/projects/fate/confs-20001/confs/eggroll/conf/route_table.json

发现不需要重启容器后续步骤也没报错,说明可以动态修改路由信息。

8.4.2. hetero_lr测试

进入容器的时候,容器名包含的平台id需要修改成实际的。

json格式定义
说明文档

fateflow/examples/lr/test_hetero_lr_job_conf.json 中不同点,

修改对应的平台id

 "initiator": {
 "role": "guest",
 "party_id": 20002
 },
 "role": {
 "guest": [
 20002
 ],
 "host": [
 20001
 ],
 "arbiter": [
 20001
 ]
 },
 

按文档写资源不够运行不了,需要修改如下

"job_parameters": {

    "common": {

      "task_parallelism": 1,

      "computing_partitions": 1,

      "task_cores": 1

    }

  },

不要修改fateflow/examples/lr/test_hetero_lr_job_dsl.json文件,文档中的配置是旧版本的,修改了就不能执行了,里面的DataIO组件已废弃。

运行测试后可以通过board查看,成功的id:202211031508511267810

http://11.50.52.62:8080/#/history

http://11.50.52.81:8080/#/history

8.4.3. 模型部署

# flow model deploy --model-id arbiter-20001#guest-20002#host-20001#model --model-version 202211031508511267810

输出了产生的model_version是202211031811059832400

1. 修改加载模型的配置

# cat > fateflow/examples/model/publish_load_model.json <<EOF
{
  "initiator": {
    "party_id": "20002",
    "role": "guest"
  },
  "role": {
    "guest": [
      "20002"
    ],
    "host": [
      "20001"
    ],
    "arbiter": [
      "20001"
    ]
  },
  "job_parameters": {
    "model_id": "arbiter-20001#guest-20002#host-20001#model",
    "model_version": "202211031811059832400"
  }
}
EOF

2. 修改绑定模型的配置

# cat > fateflow/examples/model/bind_model_service.json <<EOF
{
    "service_id": "test",
    "initiator": {
        "party_id": "20002",
        "role": "guest"
    },
    "role": {
        "guest": ["20002"],
        "host": ["20001"],
        "arbiter": ["20001"]
    },
    "job_parameters": {
        "work_mode": 1,
        "model_id": "arbiter-20001#guest-20002#host-20001#model",
        "model_version": "202211031811059832400"
    }
}
EOF

3. 在线测试

发送以下信息到"GUEST"方的推理服务"{SERVING_SERVICE_IP}:8059/federation/v1/inference"

# curl -X POST -H 'Content-Type: application/json' -i 'http://11.50.52.62:8059/federation/v1/inference' --data '{
  "head": {
    "serviceId": "test"
  },
  "body": {
    "featureData": {
        "x0": 1.88669,
        "x1": -1.359293,
        "x2": 2.303601,
        "x3": 2.00137,
        "x4": 1.307686
    },
    "sendToRemoteFeatureData": {
        "phone_num": "122222222"
    }
  }
}'

9.在Jupyther中构建任务

Jupyter Notebook是web界面IDE。已集成在fate-client容器中。

10. 总结

本文旨在从宏观的角度分析FATE的源码分布、总体架构、主要功能及核心流程,尚有许多细节和功能未深入研究,欢迎大家留言,互相学习。

事情经过

近期做一个项目投标演示(POC)环境支持,需要集成Nacos服务端。考虑到现有项目中已经有了Nacos相关依赖,那还不简单?新建个服务端,配置几下重启不就搞定了吗?然而事情远没有想得这么简单。同样的代码在我本地IDE里运行就能注册成功,在演示环境 Tomcat+War 部署就不行了。

经过远程Debug代码,发现Nacos客户端的线程都有启动,却没有注册成功。

思路

想到可能与Tomcat部署模式有关系,就去查了
官方issue

StackOverFlow

The event is published as part of Spring Boot starting the embedded Tomcat instance. If you're deploying to an external container, there's no embedded container to start and, therefore, no event is published. –
Andy Wilkinson

大致是说只有当 Spring Boot 启动内嵌 Tomcat 成功后,才会发布
WebServerInitializedEvent
事件。而Nacos客户端在等这个事件出现才会向服务端注册自己。又因部署在外部Tomcat中就不会初始化内嵌Tomcat,也就没触发这个事件。

所以解决方法就是将Nacos等事件的部分代码调用下,让他们启动注册。

Nacos的自动注册类是
NacosAutoServiceRegistration
,它继承Spring Cloud的
AbstractAutoServiceRegistration
,在
AbstractAutoServiceRegistration
等的
bind(WebServerInitializedEvent)
方法监听事件,设置端口号并启动注册。这里边
this.port
是从事件中获取的,需要我们自行获取。

设置port的位置可见,是从
org.springframework.cloud.client.serviceregistry.Registration
中取到的,给它设置一下就可以了。

解决办法

我写了一个完整的配置类放到了该
ISSUE
下边,这里直接贴在下边。

import java.lang.management.ManagementFactory;
import java.util.Set;

import javax.annotation.PostConstruct;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.Query;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import com.alibaba.cloud.nacos.registry.NacosAutoServiceRegistration;
import com.alibaba.cloud.nacos.registry.NacosRegistration;

@Configuration
public class NacosWarDeployConfig {
    private static final Logger logger = LoggerFactory.getLogger(NacosWarDeployConfig.class);

    @Autowired
    private Environment env;
    @Autowired
    private NacosRegistration registration;
    @Autowired
    private NacosAutoServiceRegistration nacosAutoServiceRegistration;

    @PostConstruct
    public void nacosServerRegister() {
        if (registration != null) {
            registration.setPort(getTomcatPort());
            nacosAutoServiceRegistration.start();
        }
    }

    public int getTomcatPort() {
        try {
            return getProvideTomcatPort();
        } catch (Exception e) {
            logger.warn("obtain provide tomcat port failed, fallback to embeded tomcat port.");
        }
        return getEmbeddedTomcatPort();
    }

    private int getProvideTomcatPort() throws MalformedObjectNameException, NullPointerException {
        MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
        Set<ObjectName> objectNames = beanServer.queryNames(new ObjectName("*:type=Connector,*"),
                Query.match(Query.attr("protocol"), Query.value("HTTP/1.1")));
        String port = objectNames.iterator().next().getKeyProperty("port");
        return Integer.valueOf(port);
    }

    private int getEmbeddedTomcatPort() {
        return env.getProperty("server.port", Integer.class, 8080);
    }

}

经过我这一波操作问题终于解决了。我是Hellxz,不在进坑就在爬坑的路上。

前置步骤和录屏是一样的,见我的上一篇文章

https://www.cnblogs.com/billin/p/17219558.html

bool
obs_output_actual_start
(
obs_output_t
*output)在上文的这个函数中,如果是启用推流直播,函数指针会转到这里

//碧麟精简批注版
//rtmp推流开始
static bool rtmp_stream_start(void *data)
{
	struct rtmp_stream *stream = data;

	os_atomic_set_bool(&stream->connecting, true);
	
    //开启推流线程
    return pthread_create(&stream->connect_thread, NULL, connect_thread,
			      stream) == 0;
}

推流使用的是obs-outputs插件

上面函数就是新开一个线程,来执行

这个函数static void *connect_thread(void *data),线程传入参数是stream

先看一下rtmp_stream结构

//rtmp stream结构
struct rtmp_stream {
	//obs_output
    obs_output_t *output;
    
    //packet信息
	struct circlebuf packets;
	bool sent_headers;

	bool got_first_video;
	int64_t start_dts_offset;

	volatile bool connecting;
    //连接线程地址
	pthread_t connect_thread;
    //发送线程地址
	pthread_t send_thread;

	os_sem_t *send_sem;
    
    //推流地址,key
    //比如b站,推流地址是rtmp://live-push.bilivideo.com/live-bvc/
	struct dstr path, key;
    //用户名,密码
	struct dstr username, password;
    //编码器名字:
    //我用的是FMLE/3.0 (compatible; FMSc/1.0)
	struct dstr encoder_name;
	struct dstr bind_ip;


	int64_t last_dts_usec;

	uint64_t total_bytes_sent;
	int dropped_frames;

	pthread_mutex_t dbr_mutex;
	struct circlebuf dbr_frames;
	size_t dbr_data_size;

	long audio_bitrate;
	long dbr_est_bitrate;
	long dbr_orig_bitrate;
	long dbr_prev_bitrate;
	long dbr_cur_bitrate;
	long dbr_inc_bitrate;
	bool dbr_enabled;
    
    // RTMP结构对象
	RTMP rtmp;
    
	pthread_t socket_thread;
	uint8_t *write_buf;
	size_t write_buf_len;
	size_t write_buf_size;
	pthread_mutex_t write_buf_mutex;
	
};

rtmp_stream结构里保存了推流的所有关键信息,包括推流地址,key,流的编码器等参数

还保存了几个关键的指针,用于多线程中调度。有连接线程connect_thread,也有发送线程pthread_t send_thread;

//rtmp连接线程
static void *connect_thread(void *data)
{
	struct rtmp_stream *stream = data;
	int ret;
    //设置线程名
	os_set_thread_name("rtmp-stream: connect_thread");
    
    //初始化
	if (!silently_reconnecting(stream)) {
		if (!init_connect(stream)) {
			obs_output_signal_stop(stream->output,
					       OBS_OUTPUT_BAD_PATH);
			os_atomic_set_bool(&stream->silent_reconnect, false);
			return NULL;
		}
	} else {
		struct encoder_packet packet;
		peek_next_packet(stream, &packet);
		stream->start_dts_offset = get_ms_time(&packet, packet.dts);
	}
    
    //连接
	ret = try_connect(stream);

	if (ret != OBS_OUTPUT_SUCCESS) {
		obs_output_signal_stop(stream->output, ret);
		info("Connection to %s failed: %d", stream->path.array, ret);
	}

	if (!stopping(stream))
		pthread_detach(stream->connect_thread);

	os_atomic_set_bool(&stream->silent_reconnect, false);
	os_atomic_set_bool(&stream->connecting, false);
	return NULL;
}

上面的方法是在单独的线程中执行的,主要是做了下面几件事:

1 设置线程名为“rtmp-stream :connect_thread”

2 初始化 init_connect

3 通过调用try_connect(stream)执行实际连接逻辑

//rtmp connect
static int try_connect(struct rtmp_stream *stream)
{
	info("Connecting to RTMP URL %s...", stream->path.array);

    //rtmp初始化
	RTMP_Init(&stream->rtmp);
    
    //设置URL
	if (!RTMP_SetupURL(&stream->rtmp, stream->path.array))
		return OBS_OUTPUT_BAD_PATH;

	RTMP_EnableWrite(&stream->rtmp);
    
    //设置流编码格式
	dstr_copy(&stream->encoder_name, "FMLE/3.0 (compatible; FMSc/1.0)");
    
    //设置用户名密码
	set_rtmp_dstr(&stream->rtmp.Link.pubUser, &stream->username);
	set_rtmp_dstr(&stream->rtmp.Link.pubPasswd, &stream->password);
	set_rtmp_dstr(&stream->rtmp.Link.flashVer, &stream->encoder_name);
	stream->rtmp.Link.swfUrl = stream->rtmp.Link.tcUrl;
	stream->rtmp.Link.customConnectEncode = add_connect_data;

	if (dstr_is_empty(&stream->bind_ip) ||
	    dstr_cmp(&stream->bind_ip, "default") == 0) {
		memset(&stream->rtmp.m_bindIP, 0,
		       sizeof(stream->rtmp.m_bindIP));
	} else {
		bool success = netif_str_to_addr(&stream->rtmp.m_bindIP.addr,
						 &stream->rtmp.m_bindIP.addrLen,
						 stream->bind_ip.array);
		if (success) {
			int len = stream->rtmp.m_bindIP.addrLen;
			bool ipv6 = len == sizeof(struct sockaddr_in6);
			info("Binding to IPv%d", ipv6 ? 6 : 4);
		}
	}

	RTMP_AddStream(&stream->rtmp, stream->key.array);

	stream->rtmp.m_outChunkSize = 4096;
	stream->rtmp.m_bSendChunkSizeInfo = true;
	stream->rtmp.m_bUseNagle = true;

    //连接
	if (!RTMP_Connect(&stream->rtmp, NULL)) {
		set_output_error(stream);
		return OBS_OUTPUT_CONNECT_FAILED;
	}

	if (!RTMP_ConnectStream(&stream->rtmp, 0))
		return OBS_OUTPUT_INVALID_STREAM;

	info("Connection to %s successful", stream->path.array);
    
    //到这里说明连接成功,开始初始化send逻辑,准备推流
	return init_send(stream);
}

当连接成功,开始调用init_send函数,初始化send,准备推流

需要注意,send也是在单独的线

程处理的,因为传视频比较大,如果不单开线程,肯定会造成阻塞。

//初始化rtmp send逻辑
static int init_send(struct rtmp_stream *stream)
{
	int ret;
	obs_output_t *context = stream->output;

    //创建send线程,执行send_thread方法,参数是stream
	ret = pthread_create(&stream->send_thread, NULL, send_thread, stream);

	if (stream->new_socket_loop) {
		int one = 1;
#ifdef _WIN32
		if (ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) {
			stream->rtmp.last_error_code = WSAGetLastError();
#else
		if (ioctl(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) {
			stream->rtmp.last_error_code = errno;
#endif
			warn("Failed to set non-blocking socket");
			return OBS_OUTPUT_ERROR;
		}

		os_event_reset(stream->send_thread_signaled_exit);

		info("New socket loop enabled by user");
		if (stream->low_latency_mode)
			info("Low latency mode enabled by user");

		if (stream->write_buf)
			bfree(stream->write_buf);

		int total_bitrate = 0;

		obs_encoder_t *vencoder = obs_output_get_video_encoder(context);
		if (vencoder) {
			obs_data_t *params = obs_encoder_get_settings(vencoder);
			if (params) {
				int bitrate =
					obs_data_get_int(params, "bitrate");
				if (!bitrate) {
					warn("Video encoder didn't return a "
					     "valid bitrate, new network "
					     "code may function poorly. "
					     "Low latency mode disabled.");
					stream->low_latency_mode = false;
					bitrate = 10000;
				}
				total_bitrate += bitrate;
				obs_data_release(params);
			}
		}

		obs_encoder_t *aencoder =
			obs_output_get_audio_encoder(context, 0);
		if (aencoder) {
			obs_data_t *params = obs_encoder_get_settings(aencoder);
			if (params) {
				int bitrate =
					obs_data_get_int(params, "bitrate");
				if (!bitrate)
					bitrate = 160;
				total_bitrate += bitrate;
				obs_data_release(params);
			}
		}

		// to bytes/sec
		int ideal_buffer_size = total_bitrate * 128;

		if (ideal_buffer_size < 131072)
			ideal_buffer_size = 131072;

		stream->write_buf_size = ideal_buffer_size;
		stream->write_buf = bmalloc(ideal_buffer_size);

#ifdef _WIN32
		ret = pthread_create(&stream->socket_thread, NULL,
				     socket_thread_windows, stream);
#else
		warn("New socket loop not supported on this platform");
		return OBS_OUTPUT_ERROR;
#endif

		if (ret != 0) {
			RTMP_Close(&stream->rtmp);
			warn("Failed to create socket thread");
			return OBS_OUTPUT_ERROR;
		}

		stream->socket_thread_active = true;
		stream->rtmp.m_bCustomSend = true;
		stream->rtmp.m_customSendFunc = socket_queue_data;
		stream->rtmp.m_customSendParam = stream;
	}

	os_atomic_set_bool(&stream->active, true);

	if (!send_meta_data(stream)) {
		warn("Disconnected while attempting to send metadata");
		set_output_error(stream);
		return OBS_OUTPUT_DISCONNECTED;
	}

	obs_encoder_t *aencoder = obs_output_get_audio_encoder(context, 1);
	if (aencoder && !send_additional_meta_data(stream)) {
		warn("Disconnected while attempting to send additional "
		     "metadata");
		return OBS_OUTPUT_DISCONNECTED;
	}

	if (obs_output_get_audio_encoder(context, 2) != NULL) {
		warn("Additional audio streams not supported");
		return OBS_OUTPUT_DISCONNECTED;
	}

	if (!silently_reconnecting(stream))
		obs_output_begin_data_capture(stream->output, 0);

	return OBS_OUTPUT_SUCCESS;
}

核心推流线程,在单独的线程里完成推流逻辑

//推流核心线程
static void *send_thread(void *data)
{
	struct rtmp_stream *stream = data;
    
    //设置线程名
	os_set_thread_name("rtmp-stream: send_thread");

    
    //设定buffersize
#if defined(_WIN32)
	// Despite MSDN claiming otherwise, send buffer auto tuning on
	// Windows 7 doesn't seem to work very well.
	if (get_win_ver_int() == 0x601) {
		DWORD cur_sendbuf_size;
		DWORD desired_sendbuf_size = 524288;
		socklen_t int_size = sizeof(int);

		if (!getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET,
				SO_SNDBUF, (char *)&cur_sendbuf_size,
				&int_size) &&
		    cur_sendbuf_size < desired_sendbuf_size) {

			setsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET,
				   SO_SNDBUF, (char *)&desired_sendbuf_size,
				   sizeof(desired_sendbuf_size));
		}
	}

	log_sndbuf_size(stream);
#endif
    
    //推流主循环
	while (os_sem_wait(stream->send_sem) == 0) {
		struct encoder_packet packet;
		struct dbr_frame dbr_frame;

		if (stopping(stream) && stream->stop_ts == 0) {
			break;
		}

		if (!get_next_packet(stream, &packet))
			continue;

		if (stopping(stream)) {
			if (can_shutdown_stream(stream, &packet)) {
				obs_encoder_packet_release(&packet);
				break;
			}
		}

		if (!stream->sent_headers) {
			if (!send_headers(stream)) {
				os_atomic_set_bool(&stream->disconnected, true);
				break;
			}
		}

		/* silent reconnect signal received from server, reconnect on
		 * next keyframe */
		if (silently_reconnecting(stream) &&
		    packet.type == OBS_ENCODER_VIDEO && packet.keyframe) {
			reinsert_packet_at_front(stream, &packet);
			break;
		}

		if (stream->dbr_enabled) {
			dbr_frame.send_beg = os_gettime_ns();
			dbr_frame.size = packet.size;
		}

		if (send_packet(stream, &packet, false, packet.track_idx) < 0) {
			os_atomic_set_bool(&stream->disconnected, true);
			break;
		}

		if (stream->dbr_enabled) {
			dbr_frame.send_end = os_gettime_ns();

			pthread_mutex_lock(&stream->dbr_mutex);
			dbr_add_frame(stream, &dbr_frame);
			pthread_mutex_unlock(&stream->dbr_mutex);
		}
	}

	bool encode_error = os_atomic_load_bool(&stream->encode_error);

	if (disconnected(stream)) {
		info("Disconnected from %s", stream->path.array);
	} else if (encode_error) {
		info("Encoder error, disconnecting");
	} else if (silently_reconnecting(stream)) {
		info("Silent reconnect signal received from server");
	} else {
		info("User stopped the stream");
	}

#if defined(_WIN32)
	log_sndbuf_size(stream);
#endif

	if (stream->new_socket_loop) {
		os_event_signal(stream->send_thread_signaled_exit);
		os_event_signal(stream->buffer_has_data_event);
		pthread_join(stream->socket_thread, NULL);
		stream->socket_thread_active = false;
		stream->rtmp.m_bCustomSend = false;
	}

	set_output_error(stream);

	if (silently_reconnecting(stream)) {
		/* manually close the socket to prevent librtmp from sending
		 * unpublish / deletestream messages when we call RTMP_Close,
		 * since we want to re-use this stream when we reconnect */
		RTMPSockBuf_Close(&stream->rtmp.m_sb);
		stream->rtmp.m_sb.sb_socket = -1;
	}

	RTMP_Close(&stream->rtmp);

	/* reset bitrate on stop */
	if (stream->dbr_enabled) {
		if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) {
			stream->dbr_cur_bitrate = stream->dbr_orig_bitrate;
			dbr_set_bitrate(stream);
		}
	}

	if (!stopping(stream)) {
		pthread_detach(stream->send_thread);
		if (!silently_reconnecting(stream))
			obs_output_signal_stop(stream->output,
					       OBS_OUTPUT_DISCONNECTED);
	} else if (encode_error) {
		obs_output_signal_stop(stream->output, OBS_OUTPUT_ENCODE_ERROR);
	} else {
		obs_output_end_data_capture(stream->output);
	}

	if (!silently_reconnecting(stream)) {
		free_packets(stream);
		os_event_reset(stream->stop_event);
		os_atomic_set_bool(&stream->active, false);
	}

	stream->sent_headers = false;

	/* reset bitrate on stop */
	if (stream->dbr_enabled) {
		if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) {
			stream->dbr_cur_bitrate = stream->dbr_orig_bitrate;
			dbr_set_bitrate(stream);
		}
	}

	if (silently_reconnecting(stream)) {
		rtmp_stream_start(stream);
	}

	return NULL;
}

更多技术交流、求职机会,欢迎关注
字节跳动数据平台微信公众号,回复【1】进入官方交流群

BitSail 是字节跳动自研的数据集成产品,支持多种异构数据源间的数据同步,并提供离线、实时、全量、增量场景下全域数据集成解决方案。本系列聚焦 BitSail Connector 开发模块,为大家带来详细全面的开发方法与场景示例,本篇将主要介绍 Source 接口部分。

持续关注,BitSail Connector 开发详解将分为四篇呈现。

  • BitSail Connector 开发详解系列一:Source

  • BitSail Connector 开发详解系列二:SourceSplitCoordinator

  • BitSail Connector 开发详解系列三:SourceReader

  • BitSail Connector 开发详解系列四:Sink、Writer

Source Connector

本文将主要介绍 Source 接口部分:

  • Source: 参与数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,不参与作业真正的执行。

  • SourceSplit: 数据读取分片,大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的 Split 并行处理。

  • State:作业状态快照,当开启 checkpoint 之后,会保存当前执行状态。

Source

数据读取组件的生命周期管理,主要负责和框架的交互,构架作业,它不参与作业真正的执行。

以 RocketMQSource 为例:Source 方法需要实现 Source 和 ParallelismComputable 接口。

Source 接口

public interface Source<T, SplitT extends SourceSplit, StateT extends Serializable>
    extendsSerializable, TypeInfoConverterFactory {/*** Run in client side for source initialize;*/
  void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throwsIOException;/*** Indicate the Source type.*/Boundedness getSourceBoundedness();/*** Create Source Reader.*/SourceReader<T, SplitT>createReader(SourceReader.Context readerContext);/*** Create split coordinator.*/SourceSplitCoordinator<SplitT, StateT> createSplitCoordinator(SourceSplitCoordinator.Context<SplitT, StateT>coordinatorContext);/*** Get Split serializer for the framework,{@linkSplitT}should implement from {@linkSerializable}*/
  default BinarySerializer<SplitT>getSplitSerializer() {return new SimpleBinarySerializer<>();
}
/*** Get State serializer for the framework, {@linkStateT}should implement from {@linkSerializable}*/ default BinarySerializer<StateT>getSplitCoordinatorCheckpointSerializer() {return new SimpleBinarySerializer<>();
}
/*** Create type info converter for the source, default value {@linkBitSailTypeInfoConverter}*/ defaultTypeInfoConverter createTypeInfoConverter() {return newBitSailTypeInfoConverter();
}
/*** Get Source' name.*/String getReaderName();
}

configure 方法

主要去做一些客户端的配置的分发和提取,可以操作运行时环境 ExecutionEnviron 的配置和 readerConfiguration 的配置。

示例
@Overridepublic voidconfigure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) {this.readerConfiguration =readerConfiguration;this.commonConfiguration =execution.getCommonConfiguration();
}

getSourceBoundedness 方法

设置作业的处理方式,是采用流式处理方法、批式处理方法,或者是流批一体的处理方式,在流批一体的场景中,我们需要根据作业的不同类型设置不同的处理方式。

具体对应关系如下:

流批一体场景示例
@OverridepublicBoundedness getSourceBoundedness() {return Mode.BATCH.equals(Mode.getJobRunMode(commonConfiguration.get(CommonOptions.JOB_TYPE))) ?Boundedness.BOUNDEDNESS :
Boundedness.UNBOUNDEDNESS;
}
流批一体场景示例
@OverridepublicBoundedness getSourceBoundedness() {return Mode.BATCH.equals(Mode.getJobRunMode(commonConfiguration.get(CommonOptions.JOB_TYPE))) ?Boundedness.BOUNDEDNESS :
Boundedness.UNBOUNDEDNESS;
}

createTypeInfoConverter 方法

用于指定 Source 连接器的类型转换器;我们知道大多数的外部数据系统都存在着自己的类型定义,它们的定义与 BitSail 的类型定义不会完全一致;为了简化类型定义的转换,我们支持了通过配置文件来映射两者之间的关系,进而来简化配置文件的开发。

在行为上表现为对任务描述 Json 文件中
reader
部分的
columns
的解析,对于
columns
中不同字段的 type 会根据上面描述文件从
ClickhouseReaderOptions.
COLUMNS
字段中解析到
readerContext.getTypeInfos()
中。

实现
  • BitSailTypeInfoConverter

    默认的
    TypeInfoConverter
    ,直接对
    ReaderOptions.
    COLUMNS
    字段进行字符串的直接解析,
    COLUMNS
    字段中是什么类型,
    TypeInfoConverter
    中就是什么类型。

  • FileMappingTypeInfoConverter

    会在 BitSail 类型系统转换时去绑定
    {readername}-type-converter.yaml
    文件,做数据库字段类型和 BitSail 类型的映射。
    ReaderOptions.
    COLUMNS
    字段在通过这个映射文件转换后才会映射到
    TypeInfoConverter
    中。

示例

FileMappingTypeInfoConverter

通过 JDBC 方式连接的数据库,包括 MySql、Oracle、SqlServer、Kudu、ClickHouse 等。这里数据源的特点是以
java.sql.ResultSet
的接口形式返回获取的数据,对于这类数据库,我们往往将
TypeInfoConverter
对象设计为
FileMappingTypeInfoConverter
,这个对象会在 BitSail 类型系统转换时去绑定
{readername}-type-converter.yaml
文件,做数据库字段类型和 BitSail 类型的映射。

@OverridepublicTypeInfoConverter createTypeInfoConverter() {return newFileMappingTypeInfoConverter(getReaderName());
}

对于
{readername}-type-converter.yaml
文件的解析,以
clickhouse-type-converter.yaml
为例。

# Clickhouse Type to BitSail Type
engine.type.to.bitsail.type.converter:
-source.type: int32
target.type:
int -source.type: float64
target.type:
double -source.type: string
target.type: string
-source.type: date
target.type: date.date
- source.type: nulltarget.type:void# BitSail Type to Clickhouse Type
bitsail.type.to.engine.type.converter:
- source.type: inttarget.type: int32- source.type: doubletarget.type: float64-source.type: date.date
target.type: date
-source.type: string
target.type: string

这个文件起到的作用是进行 job 描述 json 文件中
reader
部分的
columns
的解析,对于
columns
中不同字段的 type 会根据上面描述文件从
ClickhouseReaderOptions.
COLUMNS
字段中解析到
readerContext.getTypeInfos()
中。

"reader": {"class": "com.bytedance.bitsail.connector.clickhouse.source.ClickhouseSource","jdbc_url": "jdbc:clickhouse://localhost:8123","db_name": "default","table_name": "test_ch_table","split_field": "id","split_config": "{\"name\": \"id\", \"lower_bound\": 0, \"upper_bound\": \"10000\", \"split_num\": 3}","sql_filter": "( id % 2 == 0 )","columns": [
{
"name": "id","type": "int64"},
{
"name": "int_type","type": "int32"},
{
"name": "double_type","type": "float64"},
{
"name": "string_type","type": "string"},
{
"name": "p_date","type": "date"}
]
},

这种方式不仅仅适用于数据库,也适用于所有需要在类型转换中需要引擎侧和 BitSail 侧进行类型映射的场景。

BitSailTypeInfoConverter

通常采用默认的方式进行类型转换,直接对
ReaderOptions.
COLUMNS
字段进行字符串的直接解析。

@OverridepublicTypeInfoConverter createTypeInfoConverter() {return newBitSailTypeInfoConverter();
}

以 Hadoop 为例:

"reader": {"class": "com.bytedance.bitsail.connector.hadoop.source.HadoopSource","path_list": "hdfs://127.0.0.1:9000/test_namespace/source/test.json","content_type":"json","reader_parallelism_num": 1,"columns": [
{
"name":"id","type": "int"},
{
"name": "string_type","type": "string"},
{
"name": "map_string_string","type": "map<string,string>"},
{
"name": "array_string","type": "list<string>"}
]
}

createSourceReader 方法

书写具体的数据读取逻辑,负责数据读取的组件,在接收到 Split 后会对其进行数据读取,然后将数据传输给下一个算子。

具体传入构造 SourceReader 的参数按需求决定,但是一定要保证所有参数可以序列化。如果不可序列化,将会在 createJobGraph 的时候出错。

示例
public SourceReader<Row, RocketMQSplit>createReader(SourceReader.Context readerContext) {return newRocketMQSourceReader(
readerConfiguration,
readerContext,
getSourceBoundedness());
}

createSplitCoordinator 方法

书写具体的数据分片、分片分配逻辑,SplitCoordinator 承担了去创建、管理 Split 的角色。

具体传入构造 SplitCoordinator 的参数按需求决定,但是一定要保证所有参数可以序列化。如果不可序列化,将会在 createJobGraph 的时候出错。

示例
public SourceSplitCoordinator<RocketMQSplit, RocketMQState>createSplitCoordinator(SourceSplitCoordinator
.Context
<RocketMQSplit, RocketMQState>coordinatorContext) {return newRocketMQSourceSplitCoordinator(
coordinatorContext,
readerConfiguration,
getSourceBoundedness());
}

ParallelismComputable 接口

public interface ParallelismComputable extendsSerializable {/*** give a parallelism advice for reader/writer based on configurations and upstream parallelism advice
*
*
@paramcommonConf common configuration
*
@paramselfConf reader/writer configuration
*
@paramupstreamAdvice parallelism advice from upstream (when an operator has no upstream in DAG, its upstream is
* global parallelism)
*
@returnparallelism advice for the reader/writer*/ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf,
BitSailConfiguration selfConf,
ParallelismAdvice upstreamAdvice)
throwsException;
}

getParallelismAdvice 方法

用于指定下游 reader 的并行数目。一般有以下的方式:

可以选择
selfConf.get(ClickhouseReaderOptions.
READER_PARALLELISM_NUM
)
来指定并行度。

也可以自定义自己的并行度划分逻辑。

示例

比如在 RocketMQ 中,我们可以定义每 1 个 reader 可以处理至多 4 个队列
DEFAULT_ROCKETMQ_PARALLELISM_THRESHOLD
= 4

通过这种自定义的方式获取对应的并行度。

publicParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConfiguration,
BitSailConfiguration rocketmqConfiguration,
ParallelismAdvice upstreamAdvice)
throwsException {
String cluster
=rocketmqConfiguration.get(RocketMQSourceOptions.CLUSTER);
String topic
=rocketmqConfiguration.get(RocketMQSourceOptions.TOPIC);
String consumerGroup
=rocketmqConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP);
DefaultLitePullConsumer consumer
=RocketMQUtils.prepareRocketMQConsumer(rocketmqConfiguration, String.format(SOURCE_INSTANCE_NAME_TEMPLATE,
cluster,
topic,
consumerGroup,
UUID.randomUUID()
));
try{
consumer.start();
Collection
<MessageQueue> messageQueues =consumer.fetchMessageQueues(topic);int adviceParallelism = Math.max(CollectionUtils.size(messageQueues) / DEFAULT_ROCKETMQ_PARALLELISM_THRESHOLD, 1);returnParallelismAdvice.builder()
.adviceParallelism(adviceParallelism)
.enforceDownStreamChain(
true)
.build();
}
finally{
consumer.shutdown();
}
}
}

SourceSplit

数据源的数据分片格式,需要我们实现 SourceSplit 接口。

SourceSplit 接口

要求我们实现一个实现一个获取 splitId 的方法。

public interface SourceSplit extendsSerializable {
String uniqSplitId();
}

对于具体切片的格式,开发者可以按照自己的需求进行自定义。

示例

JDBC 类存储

一般会通过主键,来对数据进行最大、最小值的划分;对于无主键类则通常会将其认定为一个 split,不再进行拆分,所以 split 中的参数包括主键的最大最小值,以及一个布尔类型的
readTable
,如果无主键类或是不进行主键的切分则整张表会视为一个 split,此时
readTable

true
,如果按主键最大最小值进行切分,则设置为
false

以 ClickhouseSourceSplit 为例:

@Setterpublic class ClickhouseSourceSplit implementsSourceSplit {public static final String SOURCE_SPLIT_PREFIX = "clickhouse_source_split_";private static final String BETWEEN_CLAUSE = "( `%s` BETWEEN ? AND ? )";private finalString splitId;/*** Read whole table or range [lower, upper]*/
  private booleanreadTable;privateLong lower;privateLong upper;public ClickhouseSourceSplit(intsplitId) {this.splitId = SOURCE_SPLIT_PREFIX +splitId;
}

@Override
publicString uniqSplitId() {returnsplitId;
}
public voiddecorateStatement(PreparedStatement statement) {try{if(readTable) {
lower
=Long.MIN_VALUE;
upper
=Long.MAX_VALUE;
}
statement.setObject(
1, lower);
statement.setObject(
2, upper);
}
catch(SQLException e) {throw BitSailException.asBitSailException(CommonErrorCode.RUNTIME_ERROR, "Failed to decorate statement with split " + this, e.getCause());
}
}
public staticString getRangeClause(String splitField) {return StringUtils.isEmpty(splitField) ? null: String.format(BETWEEN_CLAUSE, splitField);
}

@Override
publicString toString() {returnString.format("{\"split_id\":\"%s\", \"lower\":%s, \"upper\":%s, \"readTable\":%s}",
splitId, lower, upper, readTable);
}
}

消息队列

一般按照消息队列中 topic 注册的 partitions 的数量进行 split 的划分,切片中主要应包含消费的起点和终点以及消费的队列。

以 RocketMQSplit 为例:

@Builder
@Getter
public class RocketMQSplit implementsSourceSplit {privateMessageQueue messageQueue;

@Setter
private longstartOffset;private longendOffset;privateString splitId;

@Override
publicString uniqSplitId() {returnsplitId;
}

@Override
publicString toString() {return "RocketMQSplit{" + "messageQueue=" + messageQueue + ", startOffset=" + startOffset + ", endOffset=" + endOffset + '}';
}
}

文件系统

一般会按照文件作为最小粒度进行划分,同时有些格式也支持将单个文件拆分为多个子 Splits。文件系统 split 中需要包装所需的文件切片。

以 FtpSourceSplit 为例:

public class FtpSourceSplit implementsSourceSplit {public static final String FTP_SOURCE_SPLIT_PREFIX = "ftp_source_split_";private finalString splitId;

@Setter
privateString path;
@Setter
private longfileSize;public FtpSourceSplit(intsplitId) {this.splitId = FTP_SOURCE_SPLIT_PREFIX +splitId;
}

@Override
publicString uniqSplitId() {returnsplitId;
}

@Override
public booleanequals(Object obj) {return (obj instanceof FtpSourceSplit) &&(splitId.equals(((FtpSourceSplit) obj).splitId));
}

}

特别的,在 Hadoop 文件系统中,我们也可以利用对
org.apache.hadoop.mapred.InputSplit
类的包装来自定义我们的 Split。

public class HadoopSourceSplit implementsSourceSplit {private static final long serialVersionUID = 1L;private final Class<? extends InputSplit>splitType;private transientInputSplit hadoopInputSplit;private byte[] hadoopInputSplitByteArray;publicHadoopSourceSplit(InputSplit inputSplit) {if (inputSplit == null) {throw new NullPointerException("Hadoop input split must not be null");
}
this.splitType =inputSplit.getClass();this.hadoopInputSplit =inputSplit;
}
publicInputSplit getHadoopInputSplit() {return this.hadoopInputSplit;
}
public voidinitInputSplit(JobConf jobConf) {if (this.hadoopInputSplit != null) {return;
}

checkNotNull(hadoopInputSplitByteArray);
try{this.hadoopInputSplit =(InputSplit) WritableFactories.newInstance(splitType);if (this.hadoopInputSplit instanceofConfigurable) {
((Configurable)
this.hadoopInputSplit).setConf(jobConf);
}
else if (this.hadoopInputSplit instanceofJobConfigurable) {
((JobConfigurable)
this.hadoopInputSplit).configure(jobConf);
}
if (hadoopInputSplitByteArray != null) {try (ObjectInputStream objectInputStream = new ObjectInputStream(newByteArrayInputStream(hadoopInputSplitByteArray))) {this.hadoopInputSplit.readFields(objectInputStream);
}
this.hadoopInputSplitByteArray = null;
}
}
catch(Exception e) {throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e);
}
}
private void writeObject(ObjectOutputStream out) throwsIOException {if (hadoopInputSplit != null) {try(
ByteArrayOutputStream byteArrayOutputStream
= newByteArrayOutputStream();
ObjectOutputStream objectOutputStream
= newObjectOutputStream(byteArrayOutputStream)
) {
this.hadoopInputSplit.write(objectOutputStream);
objectOutputStream.flush();
this.hadoopInputSplitByteArray =byteArrayOutputStream.toByteArray();
}
}
out.defaultWriteObject();
}

@Override
publicString uniqSplitId() {returnhadoopInputSplit.toString();
}
}

State

在需要做 checkpoint 的场景下,通常我们会通过 Map 来保留当前的执行状态

流批一体场景

在流批一体场景中,我们需要保存状态以便从异常中断的流式作业恢复

以 RocketMQState 为例:

public class RocketMQState implementsSerializable {private final Map<MessageQueue, String>assignedWithSplitIds;public RocketMQState(Map<MessageQueue, String>assignedWithSplitIds) {this.assignedWithSplitIds =assignedWithSplitIds;
}
public Map<MessageQueue, String>getAssignedWithSplits() {returnassignedWithSplitIds;
}
}

批式场景

对于批式场景,我们可以使用
EmptyState
不存储状态,如果需要状态存储,和流批一体场景采用相似的设计方案。

public class EmptyState implementsSerializable {public staticEmptyState fromBytes() {return newEmptyState();
}
}

大家好,我是三友~~

今天来跟大家聊一聊Java、Spring、Dubbo三者SPI机制的原理和区别。

其实我之前写过一篇类似的文章,但是这篇文章主要是剖析dubbo的SPI机制的源码,中间只是简单地介绍了一下Java、Spring的SPI机制,并没有进行深入,所以本篇就来深入聊一聊这三者的原理和区别。


什么是SPI

SPI全称为Service Provider Interface,是一种动态替换发现的机制,一种解耦非常优秀的思想,SPI可以很灵活的让接口和实现分离,让api提供者只提供接口,第三方来实现,然后可以使用配置文件的方式来实现替换或者扩展,在框架中比较常见,提高框架的可扩展性。

简单来说SPI是一种非常优秀的设计思想,它的核心就是解耦、方便扩展。


Java SPI机制--ServiceLoader

ServiceLoader是Java提供的一种简单的SPI机制的实现,Java的SPI实现约定了以下两件事:

  • 文件必须放在 META-INF/services/ 目录底下
  • 文件名必须为接口的全限定名,内容为接口实现的全限定名

这样就能够通过ServiceLoader加载到文件中接口的实现。


来个demo

第一步,需要一个接口以及他的实现类

public interface LoadBalance {
}

public class RandomLoadBalance implements LoadBalance{
}

第二步,在
META-INF/services/
目录创建一个文件名LoadBalance全限定名的文件,文件内容为RandomLoadBalance的全限定名

测试类:

public class ServiceLoaderDemo {

    public static void main(String[] args) {
        ServiceLoader<LoadBalance> loadBalanceServiceLoader = ServiceLoader.load(LoadBalance.class);
        Iterator<LoadBalance> iterator = loadBalanceServiceLoader.iterator();
        while (iterator.hasNext()) {
            LoadBalance loadBalance = iterator.next();
            System.out.println("获取到负载均衡策略:" + loadBalance);
        }
    }

}

测试结果:

此时就成功获取到了实现。

在实际的框架设计中,上面这段测试代码其实是框架作者写到框架内部的,而对于框架的使用者来说,要想自定义LoadBalance实现,嵌入到框架,仅仅只需要写接口的实现和spi文件即可。


实现原理

如下是ServiceLoader中一段核心代码

首先获取一个fullName,其实就是
META-INF/services/接口的全限定名

然后通过ClassLoader获取到资源,其实就是接口的全限定名文件对应的资源,然后交给
parse
方法解析资源

parse
方法其实就是通过IO流读取文件的内容,这样就可以获取到接口的实现的全限定名

再后面其实就是通过反射实例化对象,这里就不展示了。

所以其实不难发现ServiceLoader实现原理比较简单,总结起来就是通过IO流读取
META-INF/services/接口的全限定名
文件的内容,然后反射实例化对象。


优缺点

由于Java的SPI机制实现的比较简单,所以他也有一些缺点。

第一点就是浪费资源,虽然例子中只有一个实现类,但是实际情况下可能会有很多实现类,而Java的SPI会一股脑全进行实例化,但是这些实现了不一定都用得着,所以就会白白浪费资源。

第二点就是无法对区分具体的实现,也就是这么多实现类,到底该用哪个实现呢?如果要判断具体使用哪个,只能依靠接口本身的设计,比如接口可以设计为一个策略接口,又或者接口可以设计带有优先级的,但是不论怎样设计,框架作者都得写代码进行判断。

所以总得来说就是ServiceLoader无法做到按需加载或者按需获取某个具体的实现。


使用场景

虽然说ServiceLoader可能有些缺点,但是还是有使用场景的,比如说:

  • 不需要选择具体的实现,每个被加载的实现都需要被用到
  • 虽然需要选择具体的实现,但是可以通过对接口的设计来解决


Spring SPI机制--SpringFactoriesLoader

Spring我们都不陌生,他也提供了一种SPI的实现SpringFactoriesLoader。

Spring的SPI机制的约定如下:

  • 配置文件必须在 META-INF/ 目录下,文件名必须为spring.factories
  • 文件内容为键值对,一个键可以有多个值,只需要用逗号分割就行,同时键值都需要是类的全限定名,键和值可以没有任何类与类之间的关系,当然也可以有实现的关系。

所以也可以看出,Spring的SPI机制跟Java的不论是文件名还是内容约定都不一样。


来个demo


META-INF/
目录下创建spring.factories文件,LoadBalance为键,RandomLoadBalance为值

测试:

public class SpringFactoriesLoaderDemo {

    public static void main(String[] args) {
        List<LoadBalance> loadBalances = SpringFactoriesLoader.loadFactories(LoadBalance.classMyEnableAutoConfiguration.class.getClassLoader());
        for (LoadBalance loadBalance : loadBalances) {
            System.out.println("获取到LoadBalance对象:" + loadBalance);
        }
    }

}

运行结果:

成功获取到了实现对象。


核心原理

如下是SpringFactoriesLoader中一段核心代码

其实从这可以看出,跟Java实现的差不多,只不过读的是
META-INF/
目录下spring.factories文件内容,然后解析出来键值对。


使用场景

Spring的SPI机制在内部使用的非常多,尤其在SpringBoot中大量使用,SpringBoot启动过程中很多扩展点都是通过SPI机制来实现的,这里我举两个例子


1、自动装配

在SpringBoot3.0之前的版本,自动装配是通过SpringFactoriesLoader来加载的。

但是SpringBoot3.0之后不再使用SpringFactoriesLoader,而是Spring重新从
META-INF/spring/
目录下的
org.springframework.boot.autoconfigure.AutoConfiguration.imports
文件中读取了。

至于如何读取的,其实猜也能猜到就跟上面SPI机制读取的方式大概差不多,就是文件路径和名称不一样。


2、PropertySourceLoader的加载

PropertySourceLoader是用来解析application配置文件的,它是一个接口

SpringBoot默认提供了 PropertiesPropertySourceLoader 和 YamlPropertySourceLoader两个实现,就是对应properties和yaml文件格式的解析。

SpringBoot在加载PropertySourceLoader时就用了SPI机制


与Java SPI机制对比

首先Spring的SPI机制对Java的SPI机制对进行了一些简化,Java的SPI每个接口都需要对应的文件,而Spring的SPI机制只需要一个spring.factories文件。

其次是内容,Java的SPI机制文件内容必须为接口的实现类,而Spring的SPI并不要求键值对必须有什么关系,更加灵活。

第三点就是Spring的SPI机制提供了获取类限定名的方法
loadFactoryNames
,而Java的SPI机制是没有的。通过这个方法获取到类限定名之后就可以将这些类注入到Spring容器中,用Spring容器加载这些Bean,而不仅仅是通过反射。

但是Spring的SPI也同样没有实现获取指定某个指定实现类的功能,所以要想能够找到具体的某个实现类,还得依靠具体接口的设计。

所以不知道你有没有发现,PropertySourceLoader它其实就是一个策略接口,注释也有说,所以当你的配置文件是properties格式的时候,他可以找到解析properties格式的PropertiesPropertySourceLoader对象来解析配置文件。


Dubbo SPI机制--ExtensionLoader

ExtensionLoader是dubbo的SPI机制的实现类。每一个接口都会有一个自己的ExtensionLoader实例对象,这点跟Java的SPI机制是一样的。

同样地,Dubbo的SPI机制也做了以下几点约定:

  • 接口必须要加@SPI注解
  • 配置文件可以放在 META-INF/services/ META-INF/dubbo/internal/ META-INF/dubbo/ META-INF/dubbo/external/ 这四个目录底下,文件名也是接口的全限定名
  • 内容为键值对,键为短名称(可以理解为spring中Bean的名称),值为实现类的全限定名


先来个demo

首先在LoadBalance接口上@SPI注解

@SPI
public interface LoadBalance {

}

然后,修改一下Java的SPI机制测试时配置文件内容,改为键值对,因为Dubbo的SPI机制也可以从
META-INF/services/
目录下读取文件,所以这里就没重写文件

random=com.sanyou.spi.demo.RandomLoadBalance

测试类:

public class ExtensionLoaderDemo {

    public static void main(String[] args) {
        ExtensionLoader<LoadBalance> extensionLoader = ExtensionLoader.getExtensionLoader(LoadBalance.class);
        LoadBalance loadBalance = extensionLoader.getExtension("random");
        System.out.println("获取到random键对应的实现类对象:" + loadBalance);
    }

}

通过ExtensionLoader的
getExtension
方法,传入短名称,这样就可以精确地找到短名称对的实现类。

所以从这可以看出Dubbo的SPI机制解决了前面提到的无法获取指定实现类的问题。

测试结果:

dubbo的SPI机制除了解决了无法获取指定实现类的问题,还提供了很多额外的功能,这些功能在dubbo内部用的非常多,接下来就来详细讲讲。


dubbo核心机制


1、自适应机制

自适应,自适应扩展类的含义是说,基于参数,在运行时动态选择到具体的目标类,然后执行。

每个接口有且只能有一个自适应类,通过ExtensionLoader的
getAdaptiveExtension
方法就可以获取到这个类的对象,这个对象可以根据运行时具体的参数找到目标实现类对象,然后调用目标对象的方法。

举个例子,假设上面的LoadBalance有个自适应对象,那么获取到这个自适应对象之后,如果在运行期间传入了
random
这个key,那么这个自适应对象就会找到
random
这个key对应的实现类,调用那个实现类的方法,如果动态传入了其它的key,就路由到其它的实现类。

自适应类有两种方式产生,第一种就是自己指定,在接口的实现类上加@Adaptive注解,那么这个这个实现类就是自适应实现类。

@Adaptive
public class RandomLoadBalance implements LoadBalance{
}

除了自己代码指定,还有一种就是dubbo会根据一些条件帮你动态生成一个自适应类,生成过程比较复杂,这里就不展开了。

自适应机制在Dubbo中用的非常多,而且很多都是自动生成的,如果你不知道Dubbo的自适应机制,你在读源码的时候可能都不知道为什么代码可以走到那里。。


2、IOC和AOP

一提到IOC和AOP,立马想到的都是Spring,但是IOC和AOP并不是Spring特有的概念,Dubbo也实现IOC和AOP的功能,但是是一个轻量级的。

2.1、依赖注入

Dubbo依赖注入是通过setter注入的方式,注入的对象默认就是上面提到的自适应的对象,在Spring环境下可以注入Spring Bean。

public class RoundRobinLoadBalance implements LoadBalance {

    private LoadBalance loadBalance;
    
    public void setLoadBalance(LoadBalance loadBalance) {
        this.loadBalance = loadBalance;
    }

}

如上代码,RoundRobinLoadBalance中有一个setLoadBalance方法,参数LoadBalance,在创建RoundRobinLoadBalance的时候,在非Spring环境底下,Dubbo就会找到LoadBalance自适应对象然后通过反射注入。

这种方式在Dubbo中也很常见,比如如下的一个场景

RegistryProtocol中会注入一个Protocol,其实这个注入的Protocol就是一个自适应对象。

2.2、接口回调

Dubbo也提供了一些类似于Spring的一些接口的回调功能,比如说,如果你的类实现了Lifecycle接口,那么创建或者销毁的时候就会回调以下几个方法

在dubbo3.x的某个版本之后,dubbo提供了更多接口回调,比如说ExtensionPostProcessor、ExtensionAccessorAware,命名跟Spring的非常相似,作用也差不多。

2.3、自动包装

自动包装其实就是aop的功能实现,对目标对象进行代理,并且这个aop功能在默认情况下就是开启的。

在Dubbo中SPI接口的实现中,有一种特殊的类,被称为Wrapper类,这个类的作用就是来实现AOP的。

判断Wrapper类的唯一标准就是这个类中必须要有这么一个构造参数,这个构造方法的参数只有一个,并且参数类型就是接口的类型,如下代码:

public class RoundRobinLoadBalance implements LoadBalance {

    private final LoadBalance loadBalance;

    public RoundRobinLoadBalance(LoadBalance loadBalance) {
        this.loadBalance = loadBalance;
    }

}

此时RoundRobinLoadBalance就是一个Wrapper类。

当通过
random
获取RandomLoadBalance目标对象时,那么默认情况下就会对RandomLoadBalance进行包装,真正获取到的其实是RoundRobinLoadBalance对象,RoundRobinLoadBalance内部引用的对象是RandomLoadBalance。

测试一下

在配置文件中加入

roundrobin=com.sanyou.spi.demo.RoundRobinLoadBalance

测试结果

从结果可以看出,虽然指定了
random
,但是实际获取到的是RoundRobinLoadBalance,而RoundRobinLoadBalance内部引用了RandomLoadBalance。

如果有很多的包装类,那么就会形成一个责任链条,一个套一个。

所以dubbo的aop跟spring的aop实现是不一样的,spring的aop底层是基于动态代理来的,而dubbo的aop其实算是静态代理,dubbo会帮你自动组装这个代理,形成一条责任链。

到这其实我们已经知道,dubbo的spi接口的实现类已经有两种类型了:

  • 自适应类
  • Wrapper类

除了这两种类型,其实还有一种,叫做默认类,就是
@SPI
注解的值对应的实现类,比如

@SPI("random")
public interface LoadBalance {

}

此时
random
这个key对应的实现类就是默认实现,通过
getDefaultExtension
这个方法就可以获取到默认实现对象。


3、自动激活

所谓的自动激活,就是根据你的入参,动态地选择一批实现类返回给你。

自动激活的实现类上需要加上
Activate
注解,这里就又学习了一种实现类的分类。

@Activate
public interface RandomLoadBalance {

}

此时RandomLoadBalance就属于可以被自动激活的类。

获取自动激活类的方法是
getActivateExtension
,所以根据这个方法的入参,可以动态选择一批实现类。

自动激活这个机制在Dubbo一个核心的使用场景就是Filter过滤器链中。

Filter是dubbo中的一个扩展点,可以在请求发起前或者是响应获取之后就行拦截,作用有点像Spring MVC中的HandlerInterceptor。

Filter的一些实现类
Filter的一些实现类

如上Filter有很多实现,所以为了能够区分Filter的实现是作用于provider的还是consumer端,所以就可以用自动激活的机制来根据入参来动态选择一批Filter实现。

比如说ConsumerContextFilter这个Filter就作用于Consumer端。

ConsumerContextFilter
ConsumerContextFilter

最后,这里并没有对dubbo的SPI机制进行源码分析,感兴趣的同学可以看一下
面试常问的dubbo的spi机制到底是什么?(上)

面试常问的dubbo的spi机制到底是什么?(下)
两篇文章。


总结

通过以上分析可以看出,实现SPI机制的核心原理就是通过IO流读取指定文件的内容,然后解析,最后加入一些自己的特性。

最后总的来说,Java的SPI实现的比较简单,并没有什么其它功能;Spring得益于自身的ioc和aop的功能,所以也没有实现太复杂的SPI机制,仅仅是对Java做了一点简化和优化;但是dubbo的SPI机制为了满足自身框架的使用要求,实现的功能就比较多,不仅将ioc和aop的功能集成到SPI机制中,还提供注入自适应和自动激活等功能。

最后的最后,安利一下大鹏的《保你平安》这部电影,周末去看了,从整体来看我感觉还可以,电影中融入了一些喜剧、动作、恐怖等元素,虽然有些地方强行煽情,但是大鹏想要表达的整体思想还是很值得一看滴。


往期热门文章推荐

如何去阅读源码,我总结了18条心法

如何实现延迟任务,我总结了11种方法

如何写出漂亮代码,我总结了45个小技巧

三万字盘点Spring/Boot的那些常用扩展点

两万字盘点那些被玩烂了的设计模式

扒一扒Bean注入到Spring的那些姿势

扫码或者搜索关注公众号
三友的java日记
,及时干货不错过,公众号致力于通过画图加上通俗易懂的语言讲解技术,让技术更加容易学习,回复 面试 即可获得一套面试真题。