2024年12月

传统的http1.0请求开发,已经满足了我们日常的web开发。
一般请求就像下图这样子,客服端发起一个请求(触发),服务端做出一个响应(动作):

有时会有诸如实时刷新,实时显示的场景,我们往往是客户端定时发起请求,不断的尝试获取最新的数据。
但是每次请求都会创建并释放一个新的连接,这样对于需要频繁请求的场景,性能损耗太大,此外对于实时性响应的场景也很难评估轮询周期。轮询的周期短,很多查询结果其实并没有变化,增加了成本开销。轮询周期长,又不能实时的展示数据,周期值变成了一个经验值,而且不同场景都需要不断的调整。这属实不够友好。
于是http1.1协议对此进行了扩展,允许长连接的存在。今天要介绍的SSE协议,就属于http1.1下的新协议。
SSE全称为 Sever-Sent Event
指服务器端事件发送。当客户端请求成功后,服务端会依次将事件(其实就是响应信息),分多次发送到客户端。客户端只要接收事件(响应信息),做出相应的处理即可。
就像下图的样子:

比如K线增长图,实时热力图,各种增长曲线等等,都可以实时的,由后端主动将事件推送到前端,不再需要前端每次建立一个新的连接来请求。这种方式也称之为长连接。
除了SSE,像websocket 、TCP等都属于长连接的类型。依次连接可以多次交互。
SSE其实最初并不受重视,甚至很多人都不知道这个协议。如果是简单一点的话,通常直接多轮询几遍就解决问题了,如果是复杂一点的话,直接就使用websocket这样的重协议来处理了,功能也相对来说比较强大。但是自从交互大模型问世以后,大模型的流式对话往往能更高效的输出,这种流式输出的用户体验也更好。这种主要是侧重大模型响应的交互模式,(防盗连接:本文首发自http://www.cnblogs.com/jilodream/ )反而使得SSE的优势又体现出来了。
下面我们看下如何在springboot中使用sse来开发:
由于springboot的封装,我们使用SSE开发变得异常简单,
核心思路是:
创建一个 SseEmitter 对象,返回给前端
这个SseEmitter类似于一个socket,我们只管向里边塞数据即可,
而前端在收到SseEmitter对象后,则只管从sseEmitter中取数据即可。(注意此处一般采用注册响应方式)

后端代码如下:
pom文件新增依赖:

1         <dependency>
2             <groupId>org.springframework.boot</groupId>
3             <artifactId>spring-boot-starter-web</artifactId>
4         </dependency>

controller类:

1 packagecom.example.demo.learnsse;2 
3 importlombok.extern.slf4j.Slf4j;4 importorg.springframework.http.MediaType;5 importorg.springframework.web.bind.annotation.CrossOrigin;6 importorg.springframework.web.bind.annotation.GetMapping;7 importorg.springframework.web.bind.annotation.RequestParam;8 importorg.springframework.web.bind.annotation.RestController;9 importorg.springframework.web.servlet.mvc.method.annotation.SseEmitter;10 
11 importjava.io.IOException;12 importjava.util.concurrent.TimeUnit;13 
14 /**
15 * @discription16  */
17 @Slf4j18 @RestController19 @CrossOrigin(origins = "*")20 public classSseController {21 
22 
23     @GetMapping(value = "/learn/sseChat" , produces ={MediaType.TEXT_EVENT_STREAM_VALUE})24     public SseEmitter chat(@RequestParam String name) throwsIOException {25         SseEmitter sseEmitter = new SseEmitter(360000L);26         sseEmitter.onCompletion(() -> log.warn("sse complete!!!" +Thread.currentThread().getName()));27         sseEmitter.onError(throwable ->{28             log.warn("sse error  " +Thread.currentThread().getName(), throwable);29 });30         sseEmitter.send("start");31         Runnable r = () ->{32             int i = 1;33             try{34                 while (i <= 10) {35                     sseEmitter.send(Thread.currentThread().getName()+": the next index:" +i);36                     log.warn(Thread.currentThread().getName() + ":" +i);37                     i++;38                     TimeUnit.SECONDS.sleep(3);39 }40 sseEmitter.complete();41             } catch(Exception e) {42                 log.warn("catch a ex", e);43 sseEmitter.completeWithError(e);44 }45 };46         Thread t = newThread(r);47 t.start();48         log.warn("start return sse");49         returnsseEmitter;50 }51 }

我们可以不写前端,直接用浏览器或者命令行访问,

浏览器效果如下:

真实效果是一行行输出的

data:start

data:Thread
-2: the next index:1data:Thread-2: the next index:2data:Thread-2: the next index:3data:Thread-2: the next index:4data:Thread-2: the next index:5data:Thread-2: the next index:6data:Thread-2: the next index:7data:Thread-2: the next index:8data:Thread-2: the next index:9data:Thread-2: the next index:10

日志输出如下:

2024-12-02 11:06:36.267  WARN 2032 --- [nio-8081-exec-4] com.example.demo.learnsse.SseController  : sse complete!!!http-nio-8081-exec-4
2024-12-02 11:06:38.440  WARN 2032 --- [       Thread-2] com.example.demo.learnsse.SseController  : Thread-2:2
2024-12-02 11:06:41.442  WARN 2032 --- [       Thread-2] com.example.demo.learnsse.SseController  : Thread-2:3
2024-12-02 11:06:44.450  WARN 2032 --- [       Thread-2] com.example.demo.learnsse.SseController  : Thread-2:4
2024-12-02 11:06:47.458  WARN 2032 --- [       Thread-2] com.example.demo.learnsse.SseController  : Thread-2:5
2024-12-02 11:06:50.468  WARN 2032 --- [       Thread-2] com.example.demo.learnsse.SseController  : Thread-2:6
2024-12-02 11:06:53.471  WARN 2032 --- [       Thread-2] com.example.demo.learnsse.SseController  : Thread-2:7
2024-12-02 11:06:56.475  WARN 2032 --- [       Thread-2] com.example.demo.learnsse.SseController  : Thread-2:8
2024-12-02 11:06:59.483  WARN 2032 --- [       Thread-2] com.example.demo.learnsse.SseController  : Thread-2:9
2024-12-02 11:07:02.495  WARN 2032 --- [       Thread-2] com.example.demo.learnsse.SseController  : Thread-2:10
2024-12-02 11:07:05.508  WARN 2032 --- [nio-8081-exec-5] com.example.demo.learnsse.SseController  : sse complete!!!http-nio-8081-exec-5

这样一个简单的单次连接,服务器多次推送的示例就写完了。

当然你也可以写一个简短的前端代码,查看效果,注意此时涉及到跨域了,因此我们的java代码要使用注解@CrossOrigin(origins = "*") 来解决跨域,请看controller代码中红色字体

1 <!DOCTYPE html>
2 <html>
3 <head>
4     <title>SSE Example</title>
5 </head>
6 <body>
7     <div id="events"></div>
8     <script>
9         const eventSource = new EventSource('http://127.0.0.1:8081/learn/sseChat?name=xx');10 
11         eventSource.onmessage = function(event) {12             const newElement = document.createElement("div");13             newElement.textContent = "New message: " +event.data;14             document.getElementById("events").appendChild(newElement);15 };16 
17         eventSource.onerror = function(error) {18             console.error("Error:", error);19             const newElement = document.createElement("div");20             newElement.textContent = "error message: " +error;21             document.getElementById("events").appendChild(newElement);22 eventSource.close();23 };24         
25         eventSource.onclose = function(event) {26             const newElement = document.createElement("div");27             newElement.textContent = "close message: " +event.data;28             document.getElementById("events").appendChild(newElement);29 eventSource.close();30 };31     </script>
32 </body>
33 </html>

我们在创建好SSE示例时,一般会设置以下几个回调方法:

onCompletion(Runnable callback):当异步请求完成时,我们会调用此方法注册的回调函数。
onError(Consumer<Throwable> callback) 当异步处理期间发生错误时,会调用该方法设置的回调函数

服务端发现任务结束时,主动知会客户端关闭连接:
complete():表示已经完成推送,通知客户端不再有新的事件发送。
completeWithError(Throwable ex) 表示由于发生了某个异常而结束推送。springmvc将通过异常处理机制传递该异常。
一般在对接大模型时,(防盗连接:本文首发自http://www.cnblogs.com/jilodream/ )我们除了完成SSE相关的注册,还会设置与大模型的连接,
一般的思路是这样的:
1、当前端发送请求提问来后端时,
2、我们首先创建一个SseEmitter,作为未来发送的套接字,
3、接着启动一个http连接,来请求大模型,
4、此时我们会使用Reactor-Mono之类的响应式编程框架,来回调处理大模型推送回来的数据。(其中Reactor部分的代码实现,由于篇幅有限,我会在后边的文章中讲解)
5、在Mono的每次回调到大模型推送回来的数据时,我们通过SseEmitter发送给前端
6、将第二步创建好的SseEmitter,返回给前端。
注意3/4/5步都是作为异步回调注册到mono中的。整体的结构图如下:

书接上回,我们今天继续讲解实现对象集合与DataTable的相互转换。

01
、把表格转换为对象集合

该方法是将表格的列名称作为类的属性名,将表格的行数据转为类的对象。从而实现表格转换为对象集合。同时我们约定如果类的属性设置了DescriptionAttribute特性,则特性值和表格列名一一对应,如果没有设置特性则取属性名称和列名一一对应。

同时我们需要约束类只能为结构体或类,而不能是枚举、基础类型、以及集合类型、委托、接口等。

类的类型校验成功后,我们还需要校验表格是否能转换为对象,即判断表格列名和类的属性名称或者Description特性值是否存在一致,如果没有一个表格列名和类的属性能对应上,则报表格列名无法映射至对象属性,无法完成转换异常。

当这些校验成功后,开始循环处理表格行记录,把每一行都转换为一个对象。

我们可以通过反射动态实例化对象,再通过反射对对象的属性动态赋值。因为我们的对象即支持类也支持结构体,因此这里面就会遇到一个技术问题,正常的property.SetValue方法并没有办法给结构体动态赋值。

这是因为结构体是值类型,而property.SetValue方法的参数都是object,因此这里面就涉及到装箱拆箱,因此SetValue是设置了装箱以后的对象,而并不能改变原对象。

而解决办法就是先把结构体赋值给object变量,然后对object变量进行SetValue设置值,最后再把object变量转为结构体。

下面我们一起看看具体实现代码:

//把表格转换为对象集合
//如果设置DescriptionAttribute,则将特性值作为表格的列名称
//否则将属性名作为表格的列名称
public static IEnumerable<T> ToModels<T>(DataTable dataTable)
{
    //T必须是结构体或类,并且不能是集合类型
    AssertTypeValid<T>();
    if (0 == dataTable.Rows.Count)
    {
        return [];
    }
    //获取T所有可写入属性
    var properties = typeof(T).GetProperties().Where(u => u.CanWrite);
    //校验表格是否能转换为对象
    var isCanParse = IsCanMapDataTableToModel(dataTable, properties);
    if (!isCanParse)
    {
        throw new NotSupportedException("The column name of the table cannot be mapped to an object property, and the conversion cannot be completed.");
    }
    var models = new List<T>();
    foreach (DataRow dr in dataTable.Rows)
    {
        //通过反射实例化T
        var model = Activator.CreateInstance<T>();
        //把行数据映射到对象上
        if (typeof(T).IsClass)
        {
            //处理T为类的情况
            MapRowToModel<T>(dr, model, properties);
        }
        else
        {
            //处理T为结构体的情况
            object boxed = model!;
            MapRowToModel<object>(dr, boxed, properties);
            model = (T)boxed;
        }
        models.Add(model);
    }
    return models;
}
//校验表格是否能转换为对象
private static bool IsCanMapDataTableToModel(DataTable dataTable, IEnumerable<PropertyInfo> properties)
{
    var isCanParse = false;
    foreach (var property in properties)
    {
        //根据属性获取列名
        var columnName = GetColumnName(property);
        if (!dataTable.Columns.Contains(columnName))
        {
            continue;
        }
        isCanParse = true;
    }
    return isCanParse;
}
//把行数据映射到对象上
private static void MapRowToModel<T>(DataRow dataRow, T model, IEnumerable<PropertyInfo> properties)
{
    foreach (var property in properties)
    {
        //根据属性获取列名
        var columnName = GetColumnName(property);
        if (!dataRow.Table.Columns.Contains(columnName))
        {
            continue;
        }
        //获取单元格值
        var value = dataRow[columnName];
        if (value != DBNull.Value)
        {
            //给对象属性赋值
            property.SetValue(model, Convert.ChangeType(value, property.PropertyType));
        }
    }
}

我们做个简单的单元测试:

[Fact]
public void ToModels()
{
    //验证正常情况
    var table = TableHelper.Create<Student<double>>();
    var row1 = table.NewRow();
    row1[0] = "Id-11";
    row1[1] = "名称-12";
    row1[2] = 33.13;
    table.Rows.Add(row1);
    var row2 = table.NewRow();
    row2[0] = "Id-21";
    row2[1] = "名称-22";
    row2[2] = 33.23;
    table.Rows.Add(row2);
    var students = TableHelper.ToModels<Student<double>>(table);
    Assert.Equal(2, students.Count());
    Assert.Equal("Id-11", students.ElementAt(0).Id);
    Assert.Equal("名称-12", students.ElementAt(0).Name);
    Assert.Equal(33.13, students.ElementAt(0).Age);
    Assert.Equal("Id-21", students.ElementAt(1).Id);
    Assert.Equal("名称-22", students.ElementAt(1).Name);
    Assert.Equal(33.23, students.ElementAt(1).Age);
}

02
、把对象集合转换为表格

该方法首先会调用根据对象创建表格方法得到一个空白表格,然后通过反射获取对象的所有属性,然后循环处理对象集合,把一个对象的所有属性值一个一个添加行的所有列中,这样就完成了一个对象映射成表的一行记录,直至所有对象转换完成即可得到一个表格。

代码如下:

//把对象集合转为表格
//如果设置DescriptionAttribute,则将特性值作为表格的列名称
//否则将属性名作为表格的列名称
public static DataTable ToDataTable<T>(IEnumerable<T> models, string? tableName = null)
{
    //创建表格
    var dataTable = Create<T>(tableName);
    if (models == null || !models.Any())
    {
        return dataTable;
    }
    //获取所有属性
    var properties = typeof(T).GetProperties().Where(u => u.CanRead);
    foreach (var model in models)
    {
        //创建行
        var dataRow = dataTable.NewRow();
        foreach (var property in properties)
        {
            //根据属性获取列名
            var columnName = GetColumnName(property);
            //填充行数据
            dataRow[columnName] = property.GetValue(model);
        }
        dataTable.Rows.Add(dataRow);
    }
    return dataTable;
}

进行如下单元测试:

[Fact]
public void ToDataTable()
{
    //验证正常情况
    var students = new List<Student<double>>();
    var student1 = new Student<double>
    {
        Id = "Id-11",
        Name = "名称-12",
        Age = 33.13
    };
    students.Add(student1);
    var student2 = new Student<double>
    {
        Id = "Id-21",
        Name = "名称-22",
        Age = 33.23
    };
    students.Add(student2);
    var table = TableHelper.ToDataTable<Student<double>>(students, "学生表");
    Assert.Equal("学生表", table.TableName);
    Assert.Equal(2, table.Rows.Count);
    Assert.Equal("Id-11", table.Rows[0][0]);
    Assert.Equal("名称-12", table.Rows[0][1]);
    Assert.Equal("33.13", table.Rows[0][2].ToString());
    Assert.Equal("Id-21", table.Rows[1][0]);
    Assert.Equal("名称-22", table.Rows[1][1]);
    Assert.Equal("33.23", table.Rows[1][2].ToString());
}

03
、把一维数组作为一列转换为表格

该方法比较简单就是把一个一维数组作为一列数据创建一张表格,同时可以选择是否填写表名和列名。具体代码如下:

//把一维数组作为一列转换为表格
public static DataTable ToDataTableWithColumnArray<TColumn>(TColumn[] array, string? tableName = null, string? columnName = null)
{
    var dataTable = new DataTable(tableName);
    //创建列
    dataTable.Columns.Add(columnName, typeof(TColumn));
    //添加行数据
    foreach (var item in array)
    {
        var dataRow = dataTable.NewRow();
        dataRow[0] = item;
        dataTable.Rows.Add(dataRow);
    }
    return dataTable;
}

单元测试如下:

[Fact]
public void ToDataTableWithColumnArray()
{
    //验证正常情况
    var columns = new string[] { "A", "B" };
    var table = TableHelper.ToDataTableWithColumnArray<string>(columns, "学生表");
    Assert.Equal("学生表", table.TableName);
    Assert.Equal("Column1", table.Columns[0].ColumnName);
    Assert.Equal(2, table.Rows.Count);
    Assert.Equal("A", table.Rows[0][0]);
    Assert.Equal("B", table.Rows[1][0]);
    table = TableHelper.ToDataTableWithColumnArray<string>(columns, "学生表", "列");
    Assert.Equal("列", table.Columns[0].ColumnName);
}

04
、把一维数组作为一行转换为表格

该方法也比较简单就是把一个一维数组作为一行数据创建一张表格,同时可以选择是否填写表名。具体代码如下:

//把一维数组作为一行转换为表格
public static DataTable ToDataTableWithRowArray<TRow>(TRow[] array, string? tableName = null)
{
    var dataTable = new DataTable(tableName);
    //创建列
    for (var i = 0; i < array.Length; i++)
    {
        dataTable.Columns.Add(null, typeof(TRow));
    }
    //添加行数据
    var dataRow = dataTable.NewRow();
    for (var i = 0; i < array.Length; i++)
    {
        dataRow[i] = array[i];
    }
    dataTable.Rows.Add(dataRow);
    return dataTable;
}

05
、行列转置

该方法是指把DataTable中的行和列互换,就是行的数据变成列,列的数据变成行。如下图示例:

这个示例转换,第一个表格中列名并没有作为数据进行转换,因此我们会提供一个可选项参数用来指示要不要把类目作为数据进行转换。

整个方法实现逻辑也很简单,就是以原表格行数为列数创建一个新表格,然后在循环处理原表格列,并把原表格一列数据填充至新表格的一行数据中,直至原表格所有列处理完成则完成行列转置。具体代码如下:

//行列转置
public static DataTable Transpose(DataTable dataTable, bool isColumnNameAsData = true)
{
    var transposed = new DataTable(dataTable.TableName);
    //如果列名作为数据,则需要多加一列
    if (isColumnNameAsData)
    {
        transposed.Columns.Add();
    }
    //转置后,行数即为新的列数
    for (int i = 0; i < dataTable.Rows.Count; i++)
    {
        transposed.Columns.Add();
    }
    //以列为单位,一次处理一列数据
    for (var column = 0; column < dataTable.Columns.Count; column++)
    {
        //创建新行
        var newRow = transposed.NewRow();
        //如果列名作为数据,则先把列名加入第一列
        if (isColumnNameAsData)
        {
            newRow[0] = dataTable.Columns[column].ColumnName;
        }
        //把一列数据转为一行数据
        for (var row = 0; row < dataTable.Rows.Count; row++)
        {
            //如果列名作为数据,则行数据从第二列开始填充
            var rowIndex = isColumnNameAsData ? row + 1 : row;
            newRow[rowIndex] = dataTable.Rows[row][column];
        }
        transposed.Rows.Add(newRow);
    }
    return transposed;
}

下面进行简单的单元测试:

[Fact]
public void Transpose_ColumnNameAsData()
{
    DataTable originalTable = new DataTable("测试");
    originalTable.Columns.Add("A", typeof(string));
    originalTable.Columns.Add("B", typeof(int));
    originalTable.Columns.Add("C", typeof(int));
    originalTable.Rows.Add("D", 1, 2);
    //列名作为数据的情况
    var table = TableHelper.Transpose(originalTable, true);
    Assert.Equal(originalTable.TableName, table.TableName);
    Assert.Equal("Column1", table.Columns[0].ColumnName);
    Assert.Equal("Column2", table.Columns[1].ColumnName);
    Assert.Equal(3, table.Rows.Count);
    Assert.Equal("A", table.Rows[0][0]);
    Assert.Equal("D", table.Rows[0][1]);
    Assert.Equal("B", table.Rows[1][0]);
    Assert.Equal("1", table.Rows[1][1].ToString());
    Assert.Equal("C", table.Rows[2][0]);
    Assert.Equal("2", table.Rows[2][1].ToString());
}


:测试方法代码以及示例源码都已经上传至代码库,有兴趣的可以看看。
https://gitee.com/hugogoos/Ideal

本地缓存带来的挑战

分布式缓存相比于本地缓存,在实现层面需要关注的点有哪些不同。梳理如下:

维度 本地缓存 集中式缓存
缓存量 受限于单机内存大小,存储数据有限 需要提供给分布式系统里面所有节点共同使用,对于大型系统而言,对集中式缓存的容量诉求非常的大,远超单机内存的容量大小。
可靠性 影响有限,只有本进程使用,不会影响其他进程的可靠性。 作为整个系统扛压屏障,系统内所有节点共同依赖的通用服务,一旦集中式缓存出问题,会影响与其对接的所有业务节点,对系统的影响是
致命性
的。
承压性 承载单机节点的压力,请求量有限 承载整个分布式集群所有节点的流量,系统内业务分布式节点部署数量越多、业务体量越大,会导致集中缓存要承载的压力就越大,甚至是上不封顶的。

从上述几个维度的对比可以发现,同样是缓存,但
集中式缓存
所承担的使命是完全不一样的,业务对集中式缓存的
存储容量

可靠性

承压性
等方面的诉求也是天壤之别,不可等同视之。以
Redis
为例:

  • 如何打破redis缓存容量受限于机器单机内存大小的问题?
  • 如何使得redis能够扛住多方过来的请求压力?
  • 如何保证redis不会成为单点故障源?

其实答案很简单,加机器!通过多台机器的叠加使用,达到比单机更优的效果 —— 现在业务系统的集群化部署,也都是采用的这个思路。Redis的分布式之路亦是如此,但相比于常规的业务系统分布式集群化构建更加复杂:

  1. 很多业务实现集群化部署会很简单,因为每个业务进程节点都是
    无状态
    的,只需要部署下然后通过负载均衡的方式对外提供请求应答即可。
  2. Redis作为一个集中式缓存数据库,它是
    有状态
    的,不仅需要将进程分别部署在多个节点上,还需要将数据也分散存储在各个节点上,同时还得保证整个Redis集群对外是一个统一整体。

所以对于一个集中式缓存的分布式能力构建,必须要额外提供一些机制,来保障数据在各个节点上的安全与
一致性
,还需要将分散在各个节点上的数据都组成一个逻辑上的整体。

主从复制简介

主从复制是什么

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master),后者称为从节点(slave);数据的复制是单向的,只能由主节点到从节点,而对于redis来说,
一主两从
是比较常见的搭配。

主从模式按照读写分离的策略来提升整体的请求处理能力:

  1. 主节点(Master)同时对外提供
    读和写
    操作
  2. 从节点(Slave)通过
    replicate
    同步的方式,从主节点复制数据,保持自身数据与主节点一致
  3. 从节点只能对外提供
    读操作

当然,对于读多写少类的操作,为了提升整体读请求的处理能力,可以采用
一主多从
的方式。所有的从节点都从主节点进行数据同步,这样会导致主节点的同步处理压力过大而成为瓶颈。为了解决这个问题,redis还支持了从slave节点分发的能力,也就是从服务器也可以有自己的从服务器, 多个从服务器之间可以构成一个主从链。这样可以分摊主服务器压力。

主从复制的作用

  • 数据备份:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
  • 故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复。
  • 读写分离:由主节点提供写服务,由从节点提供读服务,提高Redis服务器的并发量。

主从复制流程

全量复制

在第一次同步时会进行全量复制(但并非只有第一次同步时全量复制,其他情况看后文)

第一次同步时流程:

第一阶段:建立链接、协商同步

从服务器向主服务器发送PSYNC ? -1 命令,主动请求进行完整重同步

psync 命令包含两个参数,分别是主服务器的 runID 和复制进度 offset。

  • runID,每个 Redis 服务器在启动时都会自动生产一个随机的 ID 来唯一标识自己。当从服务器和主服务器第一次同步时,因为不知道主服务器的 run ID,所以将其设置为 "?"。
  • offset,表示复制的进度,(也叫复制偏移量),主要为增量复制服务,这里因为是全量复制,所以使用-1表示。

主服务器收到 psync 命令后,会向从服务器发送FULLRESYNC响应命令并带上两个参数:主服务器的 runID 和主服务器目前的复制进度 offset。从服务器收到响应后,会记录这两个值。

FULLRESYNC
响应命令的意图是采用全量复制的方式,也就是主服务器会把所有的数据都同步给从服务器。

第二阶段:主服务器同步数据给从服务器

接着,主服务器会执行 bgsave 命令来生成 RDB 文件,然后把文件发送给从服务器(数据持久化)。

从服务器收到 RDB 文件后,
会先清空当前的数据,然后载入 RDB 文件
。这是因为从服务器在通过 replicaof 命令开始和主服务器同步前,可能保存了其他数据。为了避免之前数据的影响,从服务器需要先把当前数据库清空。

这里有一点要注意,主服务器生成 RDB 这个过程是不会阻塞主线程的,因为 bgsave 命令是产生了一个子进程来做生成 RDB 文件的工作,是异步工作的,这样 Redis 依然可以正常处理命令。

就像RDB文件生成过程中Redis不停止提供服务一样,从服务器在接收并载入RDB文件的过程中,主服务器仍然可以写入数据,那怎么将这部分数据传给从服务器呢?

第三阶段:主服务器发送新写操作命令给从服务器

为了保证主从服务器的数据一致性,主服务器为每个连接进来的从服务器准备了一个replication buffer缓冲区,这段时间内写入的数据都会被存入这个replication buffer中,从服务器完成 RDB 的载入后,会回复一个确认消息给主服务器。主服务器就将replication buffer中的数据推送过去。

长连接传播

主从服务器在完成第一次同步后,双方之间就会维护一个 TCP 连接,这个TCP连接是长连接

之后就会基于这个
长连接
进行命令传播。通过这种方式来保证第一次同步后的主从服务器的数据一致性。

增量复制

实际上,生成RDB文件是比较耗费资源的,同时,主服务器传输 RDB 文件给从服务器,这个操作会耗费主从服务器大量的网络资源,并对主服务器响应时延产生影响。而对从服务器而言,载入 RDB 文件期间,会阻塞其他命令请求,这也会导致响应效率的降低。并且,当从服务器断开后重新连接,主从数据不一致,在数据少量不一致的情况下,也不需要全量复制。因此,就提供了增量复制

复制偏移量(replication offset)

主服务器和从服务器会分别维护一个复制偏移量。如果主从服务器的复制偏移量相同,则说明二者的数据库状态一致;反之,则说明二者的数据库状态不一致,此时从服务器需要使用增量复制来同步缺失的这一部分数据。

复制积压缓冲区(replication backlog)

主服务器的写命令,除了传给从服务器后,还会写入replication backlog(全局唯一),这是一个固定长度的先进先出(FIFO)队列,默认大小为 1MB。其在内存中是一个环形结构。

  1. 主服务器按照顺时针方向写命令,主服务器最新写入的位置即为上文提到的主服务器的偏移量,这里叫master offset。
  2. 假设从服务器在set key2 2后断开连接,也就是上图中slave offset的位置,当它重连时,再次给主服务器发送psync指令时,会带上自己的offset(注意和全量复制的区别,全量复制时offset设置为-1,此时是从服务器真实的offset值)。
  3. 接着,主服务器发现从服务器的偏移量与自己不一致,需要进行增量复制。此时主服务器会计算出master offset与slave offset之间的指令,并发送给该为从服务器准备的replication buffer中,进而发送给从服务器。
  4. 从服务器进行写入后便又恢复到和主服务器一致的状态。

断开重连并不一定总是增量复制

网络断开后,当从服务器重新连上主服务器时,从服务器会通过 psync 命令将自己的复制偏移量 slave_repl_offset 发送给主服务器,主服务器根据自己的 master_repl_offset 和 slave_repl_offset 之间的差距,然后来决定对从服务器执行哪种同步操作:

  1. 整个replication backlog是个环形结构,也就是说最新的写命令会将最老的写命令覆盖。换句话说,如果从服务器断开时间太久,环形缓冲区被主服务器的写命令覆盖了,那么从服务器连上主服务器后只能通过
    全量复制
    来获取数据了。所以replication backlog配置要尽量大一些,可以降低主从断开后全量复制的概率。
    • 如果判断出从服务器要读取的数据还在 repl_backlog_buffer 缓冲区里,那么主服务器将采用增量同步的方式;
    • 相反,如果判断出从服务器要读取的数据已经不存在 repl_backlog_buffer 缓冲区里,那么主服务器将采用全量同步的方式。
  2. 上文中有提到每个实例有自己的RunID,这个值在服务器启动时自动生成,由 40 个随机的十六进制字符组成。从服务器断开重连时会将之前主服务器的RunID一起发送过去(这里注意和第一次连接的区别,第一次连接时发送的RunID是“?”),主服务器会判断这个RunID是否为自己,如果不是(比如出现脑裂,出现两个主服务器),则会和全量复制时一样返回FULLRESYNC响应命令,告知从服务器需要进行全量复制。

总结

主从服务器第一次同步的时候,就是采用全量复制。

第一次同步完成后,主从服务器都会维护着一个长连接,主服务器在接收到写操作命令后,就会通过这个连接将写命令传播给从服务器,来保证主从服务器的数据一致性。

如果遇到网络断开,就需要进行增量复制(当然不一定是增量复制,具体还需要看replication backlog的大小,以及对应的主服务器RunID)。

面试题专栏

Java面试题专栏
已上线,欢迎访问。

  • 如果你不知道简历怎么写,简历项目不知道怎么包装;
  • 如果简历中有些内容你不知道该不该写上去;
  • 如果有些综合性问题你不知道怎么答;

那么可以私信我,我会尽我所能帮助你。

数据库优化方案

1.对查询进行优化,要
尽量避免全表扫描
,首先应考虑在 where 及 order by 涉及的列上建立索引。

2.应尽量避免在 where 子句中对字段进行 null 值判断,否则将导致引擎放弃使用索引而进行全表扫描,如:

select id from t where num is null

最好不要给数据库留NULL,尽可能的使用 NOT NULL填充数据库.

备注、描述、评论之类的可以设置为 NULL,其他的,最好不要使用NULL。

不要以为 NULL 不需要空间,比如:char(100) 型,在字段建立时,空间就固定了, 不管是否插入值(NULL也包含在内),都是占用 100个字符的空间的,如果是varchar这样的变长字段, null 不占用空间。

可以在num上设置默认值0,确保表中num列没有null值,然后这样查询:

select id from t where num = 0

3.应尽量避免在 where 子句中使用 != 或 <> 操作符,否则将引擎放弃使用索引而进行全表扫描。

4.应尽量避免在 where 子句中使用 or 来连接条件,如果一个字段有索引,一个字段没有索引,将导致引擎放弃使用索引而进行全表扫描,如:

select id from t where num=10 or Name = 'admin'
可以这样查询:

select id from t where num = 10 union allselect id from t where Name = 'admin'

5.in和 not in 也要慎用,否则会导致全表扫描,如:
select id from t where num in(1,2,3)
对于连续的数值,能用 between就不要用 in 了:

select id from t where num between 1 and 3
很多时候用 exists 代替 in 是一个好的选择:

select num from a where num in(select num from b)
用下面的语句替换:

select num from a where exists(select 1 from b where num=a.num)

6.下面的查询也将导致全表扫描:

select id from t where name like ‘%abc%
若要提高效率,可以考虑全文检索。

7.如果在 where 子句中使用参数,也会导致全表扫描。因为SQL只有在运行时才会解析局部变量,但优化程序不能将访问计划的选择推迟到运行时;它必须在编译时进行选择。然 而,如果在编译时建立访问计划,变量的值还是未知的,因而无法作为索引选择的输入项。如下面语句将进行全表扫描:

select id from t where num = @num
可以改为
强制查询使用索引

select id from t with(index(索引名)) where num = @num

应尽量避免在 where子句中对字段进行表达式操作,这将导致引擎放弃使用索引而进行全表扫描。如:

select id from t where num/2 = 100
应改为:

select id from t wherenum = 100*2

9.应尽量避免在where子句中对字段进行函数操作,这将导致引擎放弃使用索引而进行全表扫描。如:

select id from t where substring(name,1,3) = ’abc’       -–name以abc开头的id
select id from t where datediff(day,createdate,’2005-11-30′) = 0    -–‘2005-11-30’    --生成的id

应改为:

select id from t where name like 'abc%'
select id from t where createdate >= '2005-11-30' and createdate < '2005-12-1'

10.不要在 where 子句中的“=”左边进行函数、算术运算或其他表达式运算,否则系统将可能无法正确使用索引。
11.在使用索引字段作为条件时,如果该索引是复合索引,那么必须使用到该索引中的第一个字段作为条件时才能保证系统使用该索引,否则该索引将不会被使用,并且应尽可能的让字段顺序与索引顺序相一致。

12.不要写一些没有意义的查询,如需要生成一个空表结构:

select col1,col2 into #t from t where 1=0
这类代码不会返回任何结果集,但是会消耗系统资源的,应改成这样:

create table #t(…)

13.Update 语句,如果只更改1、2个字段,
不要Update全部字段
,否则频繁调用会引起明显的性能消耗,同时带来大量日志。

14.对于多张大数据量(这里几百条就算大了)的表JOIN,要先分页再JOIN,否则逻辑读会很高,性能很差。

15.select count(*) from table;这样
不带任何条件的count会引起全表扫描
,并且没有任何业务意义,是一定要杜绝的。

16.
索引并不是越多越好
,索引固然可以提高相应的 select 的效率,但同时也降低了 insert 及 update 的效率,因为 insert 或 update 时有可能会重建索引,所以怎样建索引需要慎重考虑,视具体情况而定。一个表的索引数最好不要超过6个,若太多则应考虑一些不常使用到的列上建的索引是否有 必要。

17.
应尽可能的避免更新 clustered 索引数据列
,因为 clustered 索引数据列的顺序就是表记录的物理存储顺序,一旦该列值改变将导致整个表记录的顺序的调整,会耗费相当大的资源。若应用系统需要频繁更新 clustered 索引数据列,那么需要考虑是否应将该索引建为 clustered 索引。

18.
尽量使用数字型字段
,若只含数值信息的字段尽量不要设计为字符型,这会降低查询和连接的性能,并会增加存储开销。这是因为引擎在处理查询和连 接时会逐个比较字符串中每一个字符,而对于数字型而言只需要比较一次就够了。

19.尽可能的使用
varchar/nvarchar代替 char/nchar
,因为首先变长字段存储空间小,可以节省存储空间,其次对于查询来说,在一个相对较小的字段内搜索效率显然要高些。

20.任何地方都不要使用
select * from t
,用具体的字段列表代替“*”,不要返回用不到的任何字段。

21.
尽量使用表变量来代替临时表
。如果表变量包含大量数据,请注意索引非常有限(只有主键索引)。

  1. 避免频繁创建和删除临时表,以减少系统表资源的消耗。
    临时表并不是不可使用,适当地使用它们可以使某些例程更有效,例如,当需要重复引用大型表或常用表中的某个数据集时。但是,对于一次性事件, 最好使用导出表。

23.在新建临时表时,如果一次性插入数据量很大,那么可以使用
select into 代替 create table
,避免造成大量 log ,以提高速度;如果数据量不大,为了缓和系统表的资源,应先create table,然后insert。

24.如果使用到了临时表,在存储过程的最后务必将所有的临时表显式删除,先 truncate table ,然后 drop table ,这样可以避免系统表的较长时间锁定。

25.尽量避免使用游标,因为游标的效率较差,如果游标操作的数据超过1万行,那么就应该考虑改写。

26.使用基于游标的方法或临时表方法之前,应先寻找基于集的解决方案来解决问题,基于集的方法通常更有效。

27.与临时表一样,游标并不是不可使用。对小型数据集使用 FAST_FORWARD 游标通常要优于其他逐行处理方法,尤其是在必须引用几个表才能获得所需的数据时。在结果集中包括“合计”的例程通常要比使用游标执行的速度快。如果开发时 间允许,基于游标的方法和基于集的方法都可以尝试一下,看哪一种方法的效果更好。

28.在所有的存储过程和触发器的开始处设置 SET NOCOUNT ON ,在结束时设置 SET NOCOUNT OFF 。无需在执行存储过程和触发器的每个语句后向客户端发送 DONE_IN_PROC 消息。

29.尽量避免大事务操作,提高系统并发能力。

30.尽量避免向客户端返回大数据量,若数据量过大,应该考虑相应需求是否合理。