2024年9月

TCP/IP协议

简介

首先TCP/IP协议不只是表示TCP协议和IP协议两种协议,而是一个协议簇。协议簇是什么并不难理解,就是字面意思,一个由多个协议组合而成的集合体,其中最有代表性的就是TCP和IP这两个协议,除了这两个还有我们熟知的FTP、UDP等协议。当然我们下面主要介绍的还是这两位主角TCP和IP协议。

网络分层

网络分层是我们网络传输的一个框架,每个层级之间相互合作达到了我们相对迅速、安全的网络传输,我们这里介绍一下ISO组织推出的OSI分层模型。

  • 物理层:该层定义了物理传输的接口介质等,用于物理节点之间的传输

  • 数据链路层:这一层是交换机的工作层级,用来将从物理层接收到的数据进行MAC地址的封装与解封


    MAC地址就是我们网络的物理地址,是网卡层级的地址,每个网卡都有自己的MAC地址

  • 网络层:这一层是路由器的工作层级,该层用来通过IP协议进行封装或解封数据包

  • 传输层:该层定义了我们传输数据用的协议以及端口号,将我们的数据包进行分段和传输,到目的地地址之后再进行重组,TCP和UDP协议就是在这层发挥作用的

  • 会话层:用来通过传输层建立的传输通路进行发起或者接收会话请求

  • 表示层:用来将接受的数据转化成我们用户能够识别的东西,反之就是将我们能识别的东西转化为计算机数据

  • 应用层:该层主要为用户提供终端网络服务,比如我们进行的网页浏览或者邮件传输等,该层是HTTP、FTP等协议发挥作用的地方

IP协议

IP指网际互连协议,Internet Protocol的缩写,从我们的网络层级上来看,属于网络层,作用是将我们的数据通过IP协议封装成数据包的形式或者将数据包拆解提供给数据链路层帮助数据传输。

现如今我们的网络环境是非常复杂的,网络结构也是百花齐放,每个厂家产的网络设备、系统等可能就会有协议上的差别,因此IP协议强调了适应性、简洁性和可操作性,并且在可靠性上做出了一定的让步,IP不保证可靠性,所传送分组有可能出现丢失、重复、延迟或乱序等问题。

IP分片

IP包在进行传输的过程中,可能会经历不同的物理网络,受限于网络数据帧的最大传输单元,当IP数据包过大时就会将其分组拆分为多个满足传输单元条件的片段,传输完毕到达目标主机的时候再进行重组。

IP传输主要就是解决两个问题,一个是分解重组,一个是传输,具体细节大家可以再深层次研究研究,此处不过多赘述。

TCP协议

TCP是“Transmission Control Protocol”的简称,翻译过来的意思是“传输控制协议”。我们通过上面的介绍可以看出,如果我们直接拿着IP数据包去进行传输显然是非常不可靠的,稍微地一个网络动荡就会导致我接收不到数据包,并且发送方还不知道。那咋办呢?

TCP提供的端到端的字节流传输,他为我们的数据创建了一个通道,将数据包以流的形式在他创建的通道中进行传输,使得端与端之间的数据传输变得可靠起来。那么他是如何建立与终止这个通道的呢?

三次握手

如图所示为三次握手的大致流程,三次报文传输,三次状态更新,少一步都会导致连接失败。三次握手完毕之后就可以进行数据传输了。

我们不剖析SYN与ACK的原理,我们看一下括号里的内容,不难看出,每次数据传输其实是基于收到的数据进行一个约定好的处理然后再发送,就相当于我们对诗,你一句(白日依山尽)我一句(黄河入海流),只有按照约定好的规则去返回我们传输的内容才会正常建立连接,所以这种连接方式确实是可靠的。

四次挥手

如图为四次挥手的大致流程,四次报文传输,两端分别都发送和接收了一次ACK与FIN报文。FIN报文的意思就是终止传输的请求报文,当客户端申请终止传输时,现向服务端发送一个终止确认,意思就是数据传输完毕,请求终止连接,然后服务端收到之后会发送一个ACK意思是剩余数据传输中,待传输结束之后发送FIN意思是服务端数据发送完毕申请终止连接,最终客户端回应一个ACK响应给服务端,说明该连接已关闭。

滑动窗口

TCP滑动窗口是用来控制网络传输时的流量的一种协议,该协议支持发送方在停止并等待确认前发送多个数据分组,来提高网络吞吐量、数据传输效率。

大概意思就是,发送方要发送数据,接收方给发送方一个窗口大小,然后发送方通过接收方给的窗口大小,对自己的数据流进行分组,按照接收方指定的大小一段一段的发送,这样做的好处就是接收方可以通过控制窗口大小的方式来管理发送方发送数据的大小,从而避免了发送方直接将大量数据发送进来造成的网络拥堵。

发送方发送数据时可以不必发送一批将窗口填满的数据包,发送方在慢启动模式下可以从一个数据包开始,一个一个递增直到接收方窗口大小的数据包量。

超时重传

超时重传机制是TCP可靠性的重要一环,此机制可以将接收方超过一定时间没收到但是发送方已发送的数据进行重发,保证了接收方能更有机会接收到数据。

TCP协议要求发送方每发送一个报文段的时候就启动一个定时器并且等待回应消息,如果超时就会对该段报文重组并重传。此机制最难的实现点就是重传超时时间(RTO)的确认,过大过小都会导致问题出现,太大会导致等待确认时间过长影响吞吐量,而太小会导致频繁重传浪费网络资源。所以说RTO的动态确认是很困难的,此处不过多赘述(其实是实力不够硬T_T)。

总结

TCP/IP协议簇是一个非常巧妙的协议集合,他们分布在不同的网络层级帮助我们约定、对接、传输各种报文、数据包等形式组成的数据。也有不同场景的不同应对协议,比如TCP与UDP的取舍,各有各的好处与使用场景。本篇文章仅仅通过我主观的理解去介绍了IP与TCP协议的大致情况。更深层次的东西如果有精力的话我们可以一起继续往深了挖。

感谢各位大佬的光临,本文重在介绍,若有不对的地方还请麻烦各位大佬指教,感谢。

《数据资产管理核心技术与应用》是清华大学出版社出版的一本图书,全书共分10章,第1章主要让读者认识数据资产,了解数据资产相关的基础概念,以及数据资产的发展情况。第2~8章主要介绍大数据时代数据资产管理所涉及的核心技术,内容包括元数据的采集与存储、数据血缘、数据质量、数据监控与告警、数据服务、数据权限与安全、数据资产管理架构等。第9~10章主要从实战的角度介绍数据资产管理技术的应用实践,包括如何对元数据进行管理以发挥出数据资产的更大潜力,以及如何对数据进行建模以挖掘出数据中更大的价值。

图书介绍:
数据资产管理核心技术与应用

今天主要是给大家分享一下第四章的内容:

第四章的标题为数据质量的技术实现

内容思维导图如下:

本文是接着

《数据资产管理核心技术与应用》读书笔记-第四章:数据质量的技术实现(二)

继续往下介绍

4、 常见的开源数据质量管理平台

4.1、   Apache Griffin

Apache Griffin 是一个开源的大数据质量管理系统,底层是基于Hadoop和Spark实现的,支持批处理和流处理模式两种数据质量检测方式,官方网址为:
https://griffin.apache.org/
,如下图所示,是Apache Griffin 官方地址
https://griffin.apache.org/docs/quickstart-cn.html
中提供的架构图。

Apache Griffin 的源代码github地址为https://github.com/apache/griffin    《数据资产管理核心技术与应用》是清华大学出版社出版的一本图书,作者为张永清等著

从架构图中可以看到

  • Apache Griffin 在做数据质量检测时,是基于Spark 实现的,以Spark任务的形式对定义的待采集数据质量的数据源进数据采集。
  • 在架构图中,Define主要用于数据质量的维度定义,也就是我们说的数据质量规则的定义。
  • Measure负责数据质量任务的执行以及生成数据质量的结果数据。《数据资产管理核心技术与应用》是清华大学出版社出版的一本图书,作者为张永清等著
  • Analyze主要负责结果数据的存储以及呈现。

如下图所示,Apache Griffin 的架构图刚好是可以对应到我们前面的数据质量采集流程的。

另外Apache Griffin   也是支持容器化部署的,相关部署介绍请参考:
https://github.com/apache/griffin/blob/master/griffin-doc/docker/griffin-docker-guide.md

Apache Griffin   的主要技术栈和开发语言包括

  • 后端:Java和Scala,其API服务主要是由Java 语言开发,基于Http协议和GRPC协议做数据通信。其任务的执行主要是基于Scala语言开发,用于Spark任务的提交、运行等。
  • 前端:TypeScript、Html、Css

其核心技术架构如下图所示。《数据资产管理核心技术与应用》是清华大学出版社出版的一本图书,作者为张永清等著

从图中可以看到其核心技术是通过SpringBoot+Spark来实现的。

4.2、  Qualitis

Qualitis是一个支持多种异构数据源的数据质量监测平台,其设计初衷是用于解决业务系统运行、数据中心建设及数据治理过程中的遇到的各种数据质量问题。

如下图所示,是Qualitis官方地址https://github.com/WeBankFinTech/Qualitis/blob/master/docs/zh_CN/ch1/%E6%9E%B6%E6%9E%84%E8%AE%BE%E8%AE%A1%E6%96%87%E6%A1%A3.md#21-%E6%80%BB%E4%BD%93%E6%9E%B6%E6%9E%84%E8%AE%BE%E8%AE%A1中提供的架构图。

从架构图中可以看到也是包含了质量规则配置、质量任务管理和质量数据采集、质量数据存储和分析等这些核心模块。

在Qualitis官方网址中也提供了总体模块设计图,其模块设计图也是刚好可以对应到我们前面的数据质量采集流程,如下图所示。《数据资产管理核心技术与应用》是清华大学出版社出版的一本图书,作者为张永清等著

可以看到数据质量采集的流程其实不管在哪个开源的数据质量平台中,都几乎是一样的,都需要包括

  • 质量规则的配置和管理:主要是配置规则和维护规则。
  • 定时job定时去执行质量规则抓取原始的数据质量数据。《数据资产管理核心技术与应用》是清华大学出版社出版的一本图书,作者为张永清等著
  • 质量的数据处理和分析:对抓取到的原始质量数据进行处理,然后通过质量数据的分析来优化质量规则的配置,形成一个闭环的链路,如下图所示

老项目大多都有对JDBC进行了封装,可以直接执行SQL的工具类,在做项目升级改造的时候(这里仅指整合mybatis),要么全部调整成dao-xml的形式(会有改动代码多的问题,而且看代码时需要xml和java来回切换),要么维持原逻辑不改动(跟mybatis基本无关,同样难以用到mybatis的配置)

这里实现个可以让工具使用到mybatis的xml和dao骚气操作,可以保持工具类原有用法

这里仅展示查询部分逻辑,增删改类似的写法,写法中sql和作为字符串写在java代码中,不习惯可以不往下看了

1、根据mybatis写法写dao类和xml类,同时需要一个查询返回的数据集类即可

如果需要转为具体dto类,写转换逻辑即可

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.*.utility.SQLMapper">

    <select id="execSQL" resultType="com.*.utility.Grid" parameterType="java.util.Map">
        ${sql}
    </select>

    <update id="execUpdateSQL" >
        ${sql}
    </update>

</mapper>


@Mapper
public interface SQLMapper {

/**
* 核心方法是这个,直接用肯定不方便,因为要把sql和需要的参数都放到 map里面
* 在调用的写法上应该把sql和sql执行需要的参数作为两个入参传入
*/
@Deprecated
Grid execSQL(Map<String,Object> params);

/**
* 无需参数的查询
*/
default Grid execSQL(String sql){
return execSQL(Collections.singletonMap("sql",sql));
}

/**
* 对sql中仅需一个参数的查询,
* @param bindVariable 需要的参数,仅可为String、int、double、date等基本的类型
*/
default Grid execSQLBindVariable(String sql, Object bindVariable){
return execSQL(new SingletonBindVariables(sql,bindVariable));
}

/**
* 将参数放入Map中进行查询,如果入参是dto类型的传入,推荐使用ObjectBingVariables类进行包装下
*/
default Grid execSQLBindVariables(String sql, Map<String,Object> bindVariables){
bindVariables.put("sql",sql);
return execSQL(bindVariables);
}

/**
* 参数支持Lambda写法
*/
default Grid execSQL(String sql, Function<String,Object> param){
return execSQLBindVariables(sql, FunctionBindVariables.from(sql,param));
}
}

public class ObjectBingVariables extends HashMap<String,Object>{privateObject objectValue;private Map<String,Object> cache = new HashMap<>();publicObjectBingVariables(Object objectValue){
Objects.requireNonNull(objectValue,
"传入得查询参数不能为null!");this.objectValue =objectValue;
init(objectValue);
}
private voidinit(Object dto){try{
Method[] methods
=dto.getClass().getMethods();for(Method method : methods) {if (method.getName().startsWith("get") && method.getParameterCount() == 0) {
Object value
=method.invoke(dto);
String key
= method.getName().substring(3);this.put(key.toUpperCase(),value);
}
}
}
catch(Exception ex){throw newRuntimeException(ex);
}
}

@Override
publicObject put(String key, Object value) {returnsuper.put(key.toUpperCase(), value);
}

@Override
public Object get(Object key) {return super.get(String.valueOf(key).toUpperCase());
}
}
public class FunctionBindVariables extends HashMap<String,Object>{privateString sql;private Function<String,Object>function;public FunctionBindVariables(String sql, Function<String,Object>function){this.sql =sql;this.function =function;
}
public static Map<String,Object> from(String sql, Function<String,Object>function){return newFunctionBindVariables(sql,function);
}

@Override
public Object get(Object key) {return "sql".equals(key) ? this.sql : function.apply((String)key);
}
}
public class SingletonBindVariables extends HashMap<String,Object>{publicSingletonBindVariables(String sql, Object param){
put(
"sql",sql);
put(
"param",param);
}
public static Map<String,Object> from(String sql, Object param){return newSingletonBindVariables(sql,param);
}


@Override
public Object get(Object key) {return "sql".equals(key) ? super.get("sql") : super.get("param");
}

}

到这里,查询就仅需要一个通用的查询结果集Grid对象

import org.apache.ibatis.type.JdbcType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.
*;
import java.util.function.Function;
public classGrid {private static final Logger log = LoggerFactory.getLogger(Grid.class);privateJdbcType[] jdbcTypes;private int MaxCol = 0;private int MaxRow = 0;private int MaxNumber = 0;private List<String> data = newArrayList();public Grid(intmaxCol) {this.MaxCol =maxCol;
}
protected voidaddText(String text) {this.data.add(text);int size = this.data.size();if (size > this.MaxCol) {this.MaxNumber = size - this.MaxCol;if (this.MaxNumber % this.MaxCol == 0) {this.MaxRow = this.MaxNumber / this.MaxCol;
}
else{this.MaxRow = this.MaxNumber / this.MaxCol + 1;
}

}
}
public <T> T getText(int row, int col, Function<String, T>function) {return function.apply(this.getText(row, col));
}
public String getText(introw, String ignoreCaseRowName) {return getText(row,ignoreCaseRowName,false);
}
/**
* 根据行数和列名匹配对应得数据
* @param row 列数
* @param ignoreCaseRowName 忽略大小写得 列名
* @param IgnoreUnmatchedColumn 忽略掉未匹配得列,当根据列名未找到数据时生效,true时如果列名不存在会返回null值,false时则抛出异常
*/ public String getText(introw, String ignoreCaseRowName, boolean IgnoreUnmatchedColumn) {int colIndex = -1;for(int i=0;i<this.MaxCol;i++){if(this.data.get(i).equalsIgnoreCase(ignoreCaseRowName)){
colIndex
= i+1;break;
}
}
if(colIndex== -1 &&IgnoreUnmatchedColumn)return null;if(colIndex == -1)throw new RuntimeException("未找到符合["+ignoreCaseRowName+"]的列");returngetText(row,colIndex);
}
public String getText(int row, intcol) {int Number = (row - 1) * this.MaxCol + col - 1;if (Number <= this.MaxNumber) {return (String)this.data.get(Number + this.MaxCol);
}
else{
log.error(
"指定的位置在结果集中没有数据");return null;
}
}
public void replaceText(int row, intcol, String text) {int Number = (row - 1) * this.MaxCol + col - 1;if (Number <= this.MaxNumber) {this.data.set(Number, text);
}
else{
log.error(
"指定的位置在结果集中没有数据");
}
}
public intgetMaxCol() {return this.MaxCol;
}
public intgetMaxRow() {return this.MaxRow;
}
publicString[] getColNames(){
String[] colNames
= newString[MaxCol];for(int i=0;i<colNames.length;i++){
colNames[i]
= this.data.get(i);
}
returncolNames;
}
public String getColName(intindex) {if (index > 0 && index <= this.MaxCol) {return (String)this.data.get(index - 1);
}
else{
log.error(
"指定的位置在结果集中没有数据");return null;
}
}
public boolean setColName(intindex, String columnName) {if (index > 0 && index <= this.MaxCol) {this.data.set(index - 1, columnName);return true;
}
else{return false;
}
}
public String[] getRowData(introw) {if (row > 0 && row <= this.MaxRow) {
String[] result
= new String[this.MaxCol];for(int i = 0; i < this.MaxCol; ++i) {int index = this.MaxCol * row +i;
result[i]
= (String)this.data.get(index);
}
returnresult;
}
else{return new String[0];
}
}
public Map<String,String> getRowMap(introw){
Map
<String,String> data = new IgnoreCaseHashMap<>();
String[] colNames
=getColNames();for(int i=0;i<colNames.length;i++){
data.put(colNames[i],getText(row,i
+1));
}
returndata;
}
public String[] getColData(intcol) {if (col > 0 && col <= this.MaxCol) {
String[] result
= new String[this.MaxRow];for(int i = 0; i < this.MaxRow; ++i) {int index = this.MaxRow * (i + 1) +col;
result[i]
= (String)this.data.get(index);
}
returnresult;
}
else{return new String[0];
}
}
public voidsetJdbcTypes(JdbcType[] jdbcTypes) {this.jdbcTypes =jdbcTypes;
}
public JdbcType getJdbcType(intcol) {return this.jdbcTypes[col - 1];
}
publicString toString() {
StringBuilder builder
= new StringBuilder("Grid{[");for(int i = 0; i < this.data.size(); ++i) {if (i != 0 && i % this.MaxCol == 0) {
builder.append(
"],[");
}
else if (i != 0) {
builder.append(
",");
}

builder.append((String)
this.data.get(i));
}

builder.append(
"]}");returnbuilder.toString();
}



}

通过mybatis插件让查询结果转为该对象

import com.sinosoft.mybatis.typehandler.DateTypeHandler;
import com.sinosoft.mybatis.typehandler.DoubleTypeHandler;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.executor.resultset.ResultSetHandler;
import org.apache.ibatis.type.JdbcType;
import org.apache.ibatis.type.MappedTypes;
import org.apache.ibatis.type.TypeHandler;
import org.apache.ibatis.type.TypeHandlerRegistry;
import org.springframework.stereotype.Component;

import java.sql.
*;
import java.util.Collections;
import java.util.List;

@Component
@Intercepts({
@Signature(type
= ResultSetHandler.class, method="handleResultSets", args={Statement.class}),
@Signature(type
= ResultSetHandler.class, method="handleCursorResultSets", args={Statement.class}),
@Signature(type
= ResultSetHandler.class, method="handleOutputParameters", args={CallableStatement.class})
})
public classGridResultSetHandler implements ResultSetHandler {privateTypeHandlerRegistry registry;publicGridResultSetHandler(){
registry
= newTypeHandlerRegistry();
registry.register(String.
class,JdbcType.TIMESTAMP, newDateTypeHandler());
registry.register(String.
class,JdbcType.DATE, newDateTypeHandler());
registry.register(String.
class,JdbcType.NUMERIC, newDoubleTypeHandler());
registry.register(String.
class,JdbcType.DOUBLE, newDoubleTypeHandler());
}
public List<Grid>handleResultSets(Statement statement) throws SQLException {
ResultSet resultSet
=statement.getResultSet();try{
ResultSetMetaData metaData
=resultSet.getMetaData();int columnCount =metaData.getColumnCount();
Grid grid
= newGrid(columnCount);
JdbcType[] jdbcTypes
= newJdbcType[columnCount];inti;for(i = 1; i <= columnCount; ++i) {
grid.addText(metaData.getColumnName(i));
jdbcTypes[i
- 1] =JdbcType.forCode(metaData.getColumnType(i));
}

grid.setJdbcTypes(jdbcTypes);

label61:
while(true) {if(resultSet.next()) {
i
= 1;while(true) {if (i >columnCount) {continuelabel61;
}
int columnType =metaData.getColumnType(i);
TypeHandler
<String> typeHandler = this.registry.getTypeHandler(String.class, JdbcType.forCode(columnType));
grid.addText((String)typeHandler.getResult(resultSet, i));
++i;
}
}

List
<Grid> matrices =Collections.singletonList(grid);
List var8
=matrices;returnvar8;
}
}
finally{
resultSet.close();
}
}
public Cursor<Grid>handleCursorResultSets(Statement statement) throws SQLException {throw new UnsupportedOperationException("Unsupported");
}
public voidhandleOutputParameters(CallableStatement callableStatement) throws SQLException {throw new UnsupportedOperationException("Unsupported");
}
}

调用示例代码如下:

@Autowired
private SQLMapper cSQLMapper;

public void test(){


cSQLMapper.execSQLBindVariable("select * from Code where codeType=#{codeType}","sex");

Map<String,Object> bind = new HashMap<>();
bind.put("codeType","sex");
bind.put("code","1");
cSQLMapper.execSQLBindVariables("select * from Code where codeType=#{codeType} and code=#{code}",bind);

LDCodePo tLDCodePo = new LDCodePo();
tLDCodePo.setCodeType("sex");
tLDCodePo.setCode("1");
cSQLMapper.execSQLBindVariables("select * from Code where codeType=#{codetype} and code=#{code}",
new ObjectBingVariables(tLDCodePo));
}

简单的查询可以使用这个,复杂的虽然也是可以通过sql字符串拼接去实现,但对于需要使用foreach标签等的,更好的还是使用dao-xml的形式

对于查询结果集需要转为具体对象的,可以对Grid做适配支持等,

热点随笔:

·
写在临近40岁的年龄
(
薰衣草的旋律
)
·
40岁大龄失业程序猿,未来该何去何从
(
牛初九
)
·
博客园20年纪念T恤上架:艰难的时光,燃烧的希望
(
博客园团队
)
·
一场 Kafka CRC 异常引发的血案
(
昔久
)
·
利用分布式锁在ASP.NET Core中实现防抖
(
yi念之间
)
·
.NET 8.0 前后分离快速开发框架
(
小码编匠
)
·
.NET 8.0 文档管理系统网盘功能的实现
(
小码编匠
)
·
省钱的开源项目「GitHub 热点速览」
(
削微寒
)
·
架构师备考的一些思考
(
kiba518
)
·
1000多天我开发了一个免费的跨浏览器的书签同步、阅读排版、任意网页标注插件
(
_Wang
)
·
C#/.NET/.NET Core技术前沿周刊 | 第 3 期(2024年8.26-8.31)
(
追逐时光者
)
·
金九银十来了,你的简历写好了么?
(
程序员晓凡
)

热点新闻:

·
Runway“跑路”了?! 全部开源模型删除,网友:名字果然没骗人!
·
真菌和计算机组合机器人问世
·
全球最大,马斯克4个月建成10万张H100超算集群!xAI算力超越OpenAI,奥特曼怕了
·
2GB内存就能跑ChatGPT!这个国产「小钢炮」,要让华为OV们的AI体验突破瓶颈
·
18.98万元!比亚迪“最美轿跑SUV”焕新,但风头还是被它抢了!
·
网传“重庆互联网一哥”拖欠薪资数月,猪八戒网内部回应
·
谷歌DeepMind祭出蛋白质设计新AI,有望攻克癌症!蛋白亲和力暴增300倍
·
苹果华为发布会再度撞档:华为“Z 形态”折叠屏引强烈关注
·
iPhone 16 系列不支持微信?不可能,绝对不可能
·
“稚晖君”停更这一年:想赢下人形机器人,要像大公司那样去创业
·
韩国N号房再现!大量女性被AI换脸在微博求助,涉及500所学校,超过22万人参与
·
四大名著IP化,改好了成《黑神话》,改不好是《金玉良缘》

前言

给大家推荐一个轻量级的、支持插件的综合网络通信库:TouchSocket。

TouchSocket 的基础通信功能包括 TCP、UDP、SSL、RPC 和 HTTP。其中,HTTP 服务器支持 WebSocket、静态网页、XML-RPC、WebAPI 和 JSON-RPC 等扩展插件。

此外,TouchSocket 还支持自定义协议的 TouchRPC,具备 SSL 加密、异步调用、权限管理、错误状态返回、服务回调和分布式调用等功能。

在空载函数执行时,10 万次调用仅需 3.8 秒;在不返回状态时,仅需 0.9 秒。

项目介绍

TouchSocket(Pro)是基于 .NET 的程序集系列,可用于对应 .NET 版本的 C#、F# 和 VB.NET 项目。无论你的项目是控制台、WinForms、WPF 还是 ASP.NET Core,它都全面支持。

TouchSocket(Pro)长期支持 .NET 4.5、.NET 4.8.1、.NET Standard 2.0 三个平台,并将 .NET 6.0 作为最新稳定版支持平台,同时支持.NET 7.0 作为最新发布平台。

项目环境

.NET 4.5:保证了在.Net Framework上的最低支持版本。基本上支持全系99%的功能。

.NET 4.8.1:这是在net45的依赖基础之上,额外添加了一些微软官方库,以支持达到net6.0一样的功能(例如:Span、ValueTuple、ValueTask等)。

.NET Standard 2.0:这是保证了在一些通用平台上的最低依赖。基本上支持全系99%的功能。

.NET 6.0:这是目前最新的稳定版发布平台,它在最低依赖的前提下,还保证了全部功能。

.NET 7.0:这是目前最新的发布平台,我们会在该平台上开发最能尝鲜的功能(例如:AOT等)。

支持框架的框架Console、WPF、Winform、Blazor Server、Xamarin、MAUI、Avalonia、Mono、Unity 3D(除WebGL)、其他(即所有C#系)

项目功能

1、功能导图

2、项目文档

项目特点

1、传统 IOCP 与 TouchSocket 的 IOCP 模式

TouchSocket 的 IOCP 模式与传统 IOCP 不同。以微软官方示例为例,传统 IOCP 使用 MemoryBuffer 开辟一块内存,均分后给每个会话分配一个区域接收数据,收到数据后再复制并处理。而 TouchSocket 则在每次接收前从内存池中获取一个可用内存块直接用于接收,收到数据后直接处理该内存块,从而避免了复制操作。虽然这只是细节上的设计差异,但在传输 10 万次 64KB 数据时,性能提升了 10 倍。

2、数据处理适配器

TouchSocket 在设计时借鉴了其他产品的优秀理念,数据处理适配器就是其中之一。与其他产品不同的是,TouchSocket 的适配器功能更加强大、易用且灵活。它不仅可以提前解析数据包,还可以解析数据对象,并且可以随时替换,立即生效。例如,可以使用固定包头对数据进行预处理,从而解决数据分包和粘包问题;也可以直接解析 HTTP 数据协议和 WebSocket 数据协议等。

3、兼容性与适配

TouchSocket 提供多种框架模型,能够完全兼容基于 TCP 和 UDP 协议的所有协议。例如,TcpService 和 TcpClient 的基础功能与 Socket 相同,但增强了框架的坚固性和并发性,通过事件形式抛出连接和接收数据,使用户能够更加友好地使用。

项目使用

1、Nuget安装

搜索框输入TouchSocket,然后在搜索结果中选择,点击安装,具体如下图所示

2、TcpService

TcpService是Tcp系服务器基类,它不参与实际的数据交互,只是配置、激活、管理、注销、重建SocketClient类实例。

而SocketClient是当
TcpClient(客户端)
成功连接服务器以后,由服务器新建的一个实例类,后续的所有通信,也都是通过该实例完成的。

var service = newTcpService();
service.Connecting
= (client, e) => { return EasyTask.CompletedTask; };//有客户端正在连接 service.Connected = (client, e) => { return EasyTask.CompletedTask; };//有客户端成功连接 service.Disconnecting = (client, e) => { return EasyTask.CompletedTask; };//有客户端正在断开连接,只有当主动断开时才有效。 service.Disconnected = (client, e) => { return EasyTask.CompletedTask; };//有客户端断开连接 service.Received = (client, e) =>{//从客户端收到信息 var mes = Encoding.UTF8.GetString(e.ByteBlock.Buffer, 0, e.ByteBlock.Len);//注意:数据长度是byteBlock.Len client.Logger.Info($"已从{client.Id}接收到信息:{mes}");

client.Send(mes);
//将收到的信息直接返回给发送方 //client.Send("id",mes);//将收到的信息返回给特定ID的客户端 //将收到的信息返回给在线的所有客户端。//注意:这里只是一个建议思路,实际上群发应该使用生产者消费者模式设计//var ids = service.GetIds();//foreach (var clientId in ids)//{//if (clientId != client.Id)//不给自己发//{//service.Send(clientId, mes);//}//} returnEasyTask.CompletedTask;
};

service.Setup(
new TouchSocketConfig()//载入配置 .SetListenIPHosts("tcp://127.0.0.1:7789", 7790)//同时监听两个地址 .ConfigureContainer(a =>//容器的配置顺序应该在最前面 {
a.AddConsoleLogger();
//添加一个控制台日志注入(注意:在maui中控制台日志不可用) })
.ConfigurePlugins(a
=>{//a.Add();//此处可以添加插件 }));

service.Start();
//启动

3、TcpClient

TcpClient是Tcp系客户端基类,他直接参与tcp的连接、发送、接收、处理、断开等,他的业务与服务器的SocketClient是一一对应的。
var tcpClient = newTcpClient();
tcpClient.Connecting
= (client, e) => { return EasyTask.CompletedTask; };//即将连接到服务器,此时已经创建socket,但是还未建立tcp tcpClient.Connected = (client, e) => {return EasyTask.CompletedTask; };//成功连接到服务器 tcpClient.Disconnecting = (client, e) => {return EasyTask.CompletedTask; };//即将从服务器断开连接。此处仅主动断开才有效。 tcpClient.Disconnected = (client, e) => {return EasyTask.CompletedTask; };//从服务器断开连接,当连接不成功时不会触发。 tcpClient.Received = (client, e) =>{//从服务器收到信息。但是一般byteBlock和requestInfo会根据适配器呈现不同的值。 var mes = Encoding.UTF8.GetString(e.ByteBlock.Buffer, 0, e.ByteBlock.Len);
tcpClient.Logger.Info($
"客户端接收到信息:{mes}");returnEasyTask.CompletedTask;
};
//载入配置 tcpClient.Setup(newTouchSocketConfig()
.SetRemoteIPHost(
"127.0.0.1:7789")
.ConfigureContainer(a
=>{
a.AddConsoleLogger();
//添加一个日志注入 }));

tcpClient.Connect();
//调用连接,当连接不成功时,会抛出异常。 //Result result = tcpClient.TryConnect();//或者可以调用TryConnect//if (result.IsSuccess())//{ //}
tcpClient.Logger.Info(
"客户端成功连接");

tcpClient.Send(
"RRQM");

4、TcpClient 断线重连

在Config的插件配置中,使用重连插件即可。

.ConfigurePlugins(a=>{
a.UseReconnection(
5, true, 1000);//如需永远尝试连接,tryCount设置为-1即可。 });

项目案例

1、工程师软件工具箱

使用TouchRpc开发的一个内部工程师软件工具箱 。

2、远程监控、控制项目

项目地址

GitHub:
https://github.com/RRQM/TouchSocket

Gitee:
https://gitee.com/RRQM_Home/TouchSocket

文档地址:
https://touchsocket.net/

最后

如果你觉得这篇文章对你有帮助,不妨点个赞支持一下!你的支持是我继续分享知识的动力。如果有任何疑问或需要进一步的帮助,欢迎随时留言。

也可以加入微信公众号
[DotNet技术匠]
社区,与其他热爱技术的同行一起交流心得,共同成长!
优秀是一种习惯,欢迎大家留言学习!