2024年10月

今天我们将开始第二个数据类型-链表的学习,同样我们还是用最原始的方式,自己申请内存管理内存来实现一个链表。

01
、01、定义

什么是链表?链表在物理存储结构上表现为非顺序性和非连续性,因此链表的数据元素物理存储位置是随机的,动态分配的;而在逻辑结构上表现为线性结构的特点,即元素一个连着一个元素串起来像一条线 。

节点
:其中链表元素又叫节点,一个节点主要包含数据域和指针域,其中数据域主要存放数据元素,而指针域主要存放下一个节点存储位置地址。

头指针
:一个表示链表第一个节点位置的普通指针,并且永远指向第一个节点位置,方便后面使用链表。

头节点
:通常表示链表的第一个节点,并且节点内数据域为空,因此也叫空节点,其作用主要用于解决一些特殊问题,因此也可以省略。

首元节点
:由于头节点数据域为空,因此链表的第一个数据域不为空的节点叫首元节点,只是一个名称,并没有什么实际意义。

02
、02、分类

链表有两种分类方法,其一可以分为静态链表和动态链表,其二可以分为单向链表、双向链表以及循环链表。

单链表只有一个方向,每个节点包含数据域和指向下一个节点的指针域。

双向链表有两个方向,即每个节点包含数据域以及同时指向上一个节点和下一个节点的指针域。

循环链表指链表首尾相连,即最后一个节点的指针域指向第一个节点。循环链表也分单向循环链表和双向循环链表,原理都一样。

03
、03、实现

下面我们一起使用最原始的方式,自己申请内存空间,自己维护,完成链表的实现。

1、ADT定义

我们首先来定义链表的ADT(单链表)。

ADT LinkedList{

数据对象:D 是一个非空的元素集合,D = {a1, a2, ..., an},其中 ai 表示一个元素即节点,一个节点存储着数据和指向下一个节点的指针。

数据关系:D中的节点通过指针进行连接,每一个节点都包含一个指向下一个节点的指针。

基本操作:[

Init(n) :初始化一个空链表,即声明一个头指针,如有必要也可以声明一个头节点。

Length:返回链表长度。

HeadNode:返回头节点。

Find(v):返回数据域v对应的节点。

Update(n,v):更新n节点的数据域。

InsertAfter(n,v):在n节点后面添加数据域为v的新节点。

Remove(n):移除n节点。

Destroy():销毁链表。

]

}

定义好链表ADT,下面我们就可以开始自己实现一个数据域为string类型的链表。

2、定义类

首先我们需要定义节点,其中包含两个字段一个是存放数据、一个是存放指针,代码如下。

public struct MyselfLinkedListNode
{
    //数据域
    public string Data { get; set; }
    //指针域
    public IntPtr Next { get; set; }
}

然后再定义链表实现类MyselfLinkedList,用来实现链表的相关操作。

因为我们直接管理内存,所以需要一个维护内存的指针字段;

因为我们直接获取链表长度,所以需要一个存储链表长度字段;

因此我们的MyselfLinkedList类初步是这样的:

public sealed class MyselfLinkedList : IDisposable
{
    //申请内存起始位置指针
    private IntPtr _head;
    //链表长度
    private int _length;
}

3、初始化Init

初始化结构主要做几件事。

a.分配内存空间;

b.什么头指针;

c.创建头节点;

d.维护链表长度属性;

具体实现代码如下:

//初始化链表,声明头指针,并创建头节点
public MyselfLinkedListNode Init()
{
    //计算节点的大小
    var size = Marshal.SizeOf(typeof(MyselfLinkedListNode));
    //分配指定字节数的内存空间
    _head = Marshal.AllocHGlobal(size);
    //创建头节点
    var node = new MyselfLinkedListNode
    {
        Data = null,
        Next = IntPtr.Zero
    };
    //将节点实例写入分配的内存
    Marshal.StructureToPtr(node, _head, false);
    //链表长度加1
    _length++;
    //返回头节点
    return node;
}

4、获取链表长度 Length

这个比较简单直接把链表长度私有字段返回即可。

//链表长度
public int Length
{
    get
    {
        return _length;
    }
}

5、获取头节点 HeadNode

获取头节点主要是为了方便数据处理,可以通过头指针直接读取内存地址获取。具体代码如下:

//头节点
public MyselfLinkedListNode? HeadNode
{
    get
    {
        if (_head == IntPtr.Zero)
        {
            return null;
        }
        return GetNode(_head);
    }
}
//获取节点
private MyselfLinkedListNode GetNode(IntPtr pointer)
{
    // 从分配的内存读取实例
    return Marshal.PtrToStructure<MyselfLinkedListNode>(pointer);
}

同样我们也可以定义一个尾节点属性,可以方便使用,原理都差不多,这里就不赘述了。

6、在指定节点后插入节点 InsertAfter

通过前面对链表结构的了解,要想再两个节点之间加入一个新节点,只需要把两者之间的线剪断,即前一个节点的指针域需要重新指向新节点,并且新节点的指针域要指向后一个节点,其他保持不变,如下图:

业务逻辑清楚了,我们再来梳理代码逻辑,要想实现这个功能我们大致需要一下几步:

a.获取指定节点的指针;

b.创建一个新的节点;

c.重新调整指定节点及新节点指针域;

d.把指定节点和新节点指针调整后数据更新到内存中;

e.更新链表长度属性;

具体实现如下:

//在指定节点后插入新节点
public MyselfLinkedListNode InsertAfter(MyselfLinkedListNode node, string value)
{
    //获指定取节点对应指针
    var pointer = GetPointer(node);
    //如果指针不为空才处理
    if (pointer != IntPtr.Zero)
    {
        //以新值创建一个节点
        var (newPointer, newNode) = CreateNode(value);
        //把新节点的下一个节点指针指向指定节点的下一个节点
        newNode.Next = node.Next;
        //把指定节点的下一个节点指针指向新节点
        node.Next = newPointer;
        //更新修改后的节点
        Marshal.StructureToPtr(newNode, newPointer, false);
        Marshal.StructureToPtr(node, pointer, false);
        //链表长度加1
        _length++;
        return newNode;
    }
    return default;
}
//获取节点对应指针
private IntPtr GetPointer(MyselfLinkedListNode node)
{
    //从头指针开始查找
    var currentPointer = _head;
    //如果当前指针为空则停止查找
    while (currentPointer != IntPtr.Zero)
    {
        //获取当前指针对应的节点
        var currentNode = GetNode(currentPointer);
        //如果当前节点数据域和指针域与要查找的节点相同则返回当前节点指针
        if (currentNode.Data == node.Data && currentNode.Next == node.Next)
        {
            return currentPointer;
        }
        //否则查找下一个节点
        currentPointer = currentNode.Next;
    }
    return IntPtr.Zero;
}
//创建节点
private (IntPtr Pointer, MyselfLinkedListNode Node) CreateNode(string value)
{
    //计算大小
    var size = Marshal.SizeOf(typeof(MyselfLinkedListNode));
    //分配指定字节数的内存空间
    var pointer = Marshal.AllocHGlobal(size);
    //创建实例并设置值
    var node = new MyselfLinkedListNode
    {
        Data = value,
        Next = IntPtr.Zero
    };
    //将实例写入分配的内存
    Marshal.StructureToPtr(node, pointer, false);
    //返回节点指针和节点
    return (pointer, node);
}

这里只实现了一个在指定节点后插入节点,我们还可以实现在指定节点前插入,在首元节点前插入,在尾节点后添加,都是可以的,感兴趣的可以自己实现试试。

7、根据数据域查找节点 Find

在链表中对查找是不友好的,因为查找一个值,需要从链表头一个一个往后查找,实现逻辑到不复杂,具体实现代码如下:

//根据数据查找节点
public MyselfLinkedListNode Find(string value)
{
    //从头指针开始查找
    var pointer = _head;
    //如果当前指针为空则停止查找
    while (pointer != IntPtr.Zero)
    {
        //获取当前指针对应的节点
        var node = GetNode(pointer);
        //如果当前节点数据域和要查找值相同则返回当前节点
        if (node.Data == value)
        {
            return node;
        }
        //否则查找下一个节点
        pointer = node.Next;
    }
    return default;
}

8、更新指定节点数据域 Update

这个方法逻辑也比较简单,只需要找到节点指针,然后把节点更新,最后把更新后的数据写入内存即可。

//更新节点数据
public void Update(MyselfLinkedListNode node, string value)
{
    //获取节点对应指针
    var pointer = GetPointer(node);
    //当指针不为空,则更新节点数据
    if (pointer != IntPtr.Zero)
    {
        //修改数据
        node.Data = value;
        //将数据写入分配的内存,完成数据更新
        Marshal.StructureToPtr(node, pointer, false);
    }
}

9、移除指定节点 Remove

如果要想移除一个节点,则需要把指定节点与前后节点的连接删除,然后把前后两个节点建立起连接,同时需要手动释放被删除节点内存。如下图。

具体代码实现如下:

//移除节点
public void Remove(MyselfLinkedListNode node)
{
    //从头指针开始查找
    var currentPointer = _head;
    //获取当前节点
    var currentNode = GetNode(_head);
    //查找节点对应的指针
    var pointer = GetPointer(node);
    while (true)
    {
        if (currentNode.Next == IntPtr.Zero)
        {
            //指针为空则返回
            return;
        }
        else if (currentNode.Next == pointer)
        {
            //把要删除节点的上一个节点对应的下一个节点指向要删除节点的下一个节点
            currentNode.Next = node.Next;
            //手动释放被删除节点对应的内存
            Marshal.FreeHGlobal(pointer);
            //更新要删除节点的上一个节点
            Marshal.StructureToPtr(currentNode, currentPointer, false);
            //链表长度减1
            _length--;
            break;
        }
        else
        {
            //查找下一个节点
            currentPointer = currentNode.Next;
            currentNode = GetNode(currentPointer);
        }
    }
}

10、销毁链表 Destroy

销毁链表主要是使用因为是我们自己手动管理内存,用完后要及时清理,放在内存泄漏等意外情况出现。代码也很简单,循环把每个节点内存释放即可,如下代码:

//销毁链表
public void Destroy()
{
    var pointer = _head;
    while (pointer != IntPtr.Zero)
    {
        var value = GetNode(pointer);
        Marshal.FreeHGlobal(pointer);
        _length--;
        pointer = value.Next;
    }
    _head = IntPtr.Zero;
}

11、释放内存 Dispose

因为我们实现了IDisposable接口,所有需要实现Dispose方法,只需要在Dispose方法中调用上面销毁链表Destroy方法即可。


:测试方法代码以及示例源码都已经上传至代码库,有兴趣的可以看看。
https://gitee.com/hugogoos/Planner


ServiceMesh系列

1 Istio部署

1.1 连接测试机

进入测试机服务器...

1.2 安装Istio

1.2.1 通过官方网站下载Istio

 # 下载最新版本的Istio
$ curl -L https://istio.io/downloadIstio | sh -

# 或者下载指定版本:
$ curl -L https://istio.io/downloadIstio | ISTIO_VERSION=1.8.6 TARGET_ARCH=x86_64 sh -

1.2.2 检查安装目录

如果安装的是1.8.6的版本,我们就可以直接进入到这个目录下,并查看该目录下的文件信息

[CCE~]$ cd /
[CCE~]$ cd /home/work/istio-1.8.6
[CCE~]$ ls -l

total 48
drwxr-x---  2 work work  4096 Nov  4 15:13 bin   # istioctl 客户端二进制文件
-rw-r--r--  1 work work 11348 Nov  4 15:13 LICENSE
drwxr-xr-x  5 work work  4096 Nov  4 15:13 manifests
-rw-r-----  1 work work   767 Nov  4 15:13 manifest.yaml
-rw-r--r--  1 work work  4183 Nov  4 15:13 productpage
-rw-r--r--  1 work work  5866 Nov  4 15:13 README.md
drwxr-xr-x 19 work work  4096 Nov 16 10:49 samples  # 示例应用程序
drwxr-x---  3 work work  4096 Nov  4 15:20 tools  

1.2.3 环境变量配置

使用export生成环境变量PATH,将 istioctl 客户端加入搜索路径

$ export PATH=$PWD/bin:$PATH

1.2.4 安装Istio

使用 istioctl 安装 Istio,可以看出,它不仅安装了Istio核心主程序,还包含了以下几个核心组件:

  • Istiod # 核心控制面
  • Egress gateways # 出流量路由
  • Ingress gateways # 入流量路由
[CCE~]$ istioctl install --set profile=demo -y

✔ Istio core installed
✔ Istiod installed
✔ Egress gateways installed
✔ Ingress gateways installed
✔ Installation complete

1.2.5 检查是否安装成功

默认的namespace 和 pod 应该已经正常创建(安装istio的时候,会默认创建 istio-system 命名空间),这边确认下:

[root@k8s-master ~]# kubectl get ns |grep istio
istio-system      Active   82m
[root@k8s-master ~]# kubectl get pods -n istio-system
NAME                                    READY   STATUS    RESTARTS   AGE
istio-egressgateway-xxxxxxxxxx-xxxxx    1/1     Running   0          2m33s
istio-ingressgateway-xxxxxxxxxx-xxxxx   1/1     Running   0          2m33s
istiod--xxxxxxxxxx-xxxxx                1/1     Running   0          3m7s

1.2.6 安装Istio自带示例BookInfo

因为我们要测试图书示例系统,所以我们建设一个恰当的命名空间名称:
istio-booking-demo

[CCE~]$ kubectl create namespace istio-booking-demo  # Create a namespace
[CCE~]$ kubectl get namespace

NAME                 STATUS   AGE
default              Active   23d
icp                  Active   14d
ingress-nginx        Active   23d
istio-booking-demo   Active   16s
istio-system         Active   22d
kube-node-lease      Active   23d
kube-public          Active   23d
kube-system          Active   23d
local-path-storage   Active   23d

1.2.7 设置Pod自动注入SideCar

给命名空间加上标签,指示Istio在部署应用的时候,自动注入Evnoy SideCard代理。

[CCE~]$ kubectl label namespace istio-booking-demo istio-injection=enabled
namespace/istio-booking-demo labeled

★ 说明:这个代表该命名空间(istio-booking-demo)下部署的Pod中都会自动注入Envoy数据面,即使没有配置任何策略,流量依旧会被数据面Envoy拦截并透传给服务。

1.2.8 设置别名,简化操作

可以对某些命名空间下的操作设置别名,避免每一次都输入太长

[CCE~]$ alias kb='kubectl -n istio-booking-demo -o wide'  # 设置别名
[CCE~]$ alias kb  # 查看别名
alias kb='kubectl -n istio-booking-demo -o wide'

1.3 部署实例应用

1.3.1 检查示例目录

查看下Istio 自带的BookInfo示例的地址,可以看到有好几个目录,如下:

[CCE~ kube]$ cd /
[CCE~ /]$ cd /home/work/istio-1.8.6/samples/bookinfo/
[CCE~ bookinfo]$ ls -l
total 32
-rwxr-xr-x 1 work work 4029 Nov  4 15:13 build_push_update_images.sh
drwxr-xr-x 2 work work 4096 Nov 16 10:11 networking
drwxr-xr-x 3 work work 4096 Nov  4 15:13 platform
drwxr-xr-x 2 work work 4096 Nov 16 15:04 policy
-rw-r--r-- 1 work work 1306 Nov  4 15:13 README.md
drwxr-xr-x 8 work work 4096 Nov 16 15:06 src
-rw-r--r-- 1 work work 6329 Nov  4 15:13 swagger.yaml

1.3.2 部署

部署Bookinfo 示例应用,应用bookinfo的yaml配置:

$ kb apply -f platform/kube/bookinfo.yaml

service/details created
serviceaccount/bookinfo-details created
deployment.apps/details-v1 created
service/ratings created
serviceaccount/bookinfo-ratings created
deployment.apps/ratings-v1 created
service/reviews created
serviceaccount/bookinfo-reviews created
deployment.apps/reviews-v1 created
deployment.apps/reviews-v2 created
deployment.apps/reviews-v3 created
service/productpage created
serviceaccount/bookinfo-productpage created
deployment.apps/productpage-v1 created

1.3.3 检查部署结果

应用部署之后,检查 Services 和 Pods 的部署情况,就可以发现,Pod准备就绪时,Istio的 边车Envoy 会一起打包部署进去。
确保所有的 Pod 达到此状态: 就绪状态(READY)的值为 2/2 、状态(STATUS)的值为 Running 。

[CCE~ bookinfo]$ kb get services
NAME          TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)    AGE   SELECTOR
details       ClusterIP   10.11.12.111   <none>        9080/TCP   28d   app=details
productpage   ClusterIP   10.11.12.112   <none>        9080/TCP   28d   app=productpage
ratings       ClusterIP   10.11.12.113   <none>        9080/TCP   28d   app=ratings
reviews       ClusterIP   10.11.12.114    <none>        9080/TCP   28d   app=reviews
[CCE~ bookinfo]$ kb get pods
NAME                              READY   STATUS    RESTARTS   AGE   IP             NODE    NOMINATED NODE   READINESS GATES
details-v1-xxxxxxxxxx-xxxxx        2/2     Running   0          28d   10.233.67.8    CCE.01   <none>           <none>
productpage-v1-xxxxxxxxxx-xxxxx   2/2     Running   0          28d   10.233.67.10   CCE.01   <none>           <none>
ratings-v1-xxxxxxxxxx-xxxxx        2/2     Running   0          22d   10.233.67.11   CCE.01   <none>           <none>
reviews-v1-xxxxxxxxxx-xxxxx       2/2     Running   0          28d   10.233.68.4    CCE.02   <none>           <none>
reviews-v2-xxxxxxxxxx-xxxxx       2/2     Running   0          22d   10.233.68.6    CCE.02   <none>           <none>
reviews-v3-xxxxxxxxxx-xxxxx         2/2     Running   0          28d   10.233.67.9    CCE.02   <none>           <none>

1.3.4 验证安装是否成功

验证下是否安装成功,看下是否某个页面可以被读取到

[CCE~ bookinfo]$ kubectl -n istio-booking-demo exec "$(kubectl -n istio-booking-demo get pod -l app=ratings -o jsonpath='{.items[0].metadata.name}')" -c ratings -- curl -s productpage:9080/productpage | grep -o "<title>.*</title>"
<title>Simple Bookstore App</title>

1.3.5 配置流量Inbound

关联Istio网关,并确保配置文件正常,应用 bookinfo-gateway的yaml配置:

$ kubectl apply -f networking/bookinfo-gateway.yaml
gateway.networking.istio.io/bookinfo-gateway created
virtualservice.networking.istio.io/bookinfo created
$ istioctl analyze
✔ No validation issues found when analyzing namespace: default.

1.3.6 绑定NodePort

获取ingressgateway的详细信息,会发现 EXTERNAL-IP 为none,就直接改yaml,绑定nodePort的端口,映射为8601。验证访问

[CCE~]$ kb get svc istio-ingressgateway -n istio-system
NAME                   TYPE       CLUSTER-IP      EXTERNAL-IP   PORT(S)                                                                     AGE   SELECTOR
istio-ingressgateway   NodePort   10.233.32.219   <none>        15021:24534/TCP,80:8601/TCP,443:31159/TCP,31400:23074/TCP,15443:13324/TCP   28d   app=istio-ingressgateway,istio=ingressgateway
#  ========================================================

[CCE ~]$ kubectl edit svc istio-ingressgateway -n istio-system   # 编辑完输入 :wq 退出
spec:
  clusterIP: 10.233.32.219
  externalTrafficPolicy: Cluster
  ports:
  - name: status-port
    nodePort: 24534
    port: 15021
    protocol: TCP
    targetPort: 15021
  - name: http2
    nodePort: 8601
    port: 80
    protocol: TCP
    targetPort: 8080
  - name: https
    nodePort: 31159
    port: 443
    protocol: TCP
    targetPort: 8443
  - name: tcp
    nodePort: 23074
    port: 31400
    protocol: TCP
    targetPort: 31400
  - name: tls
    nodePort: 13324
    port: 15443
    protocol: TCP
    targetPort: 15443
  selector:
    app: istio-ingressgateway
    istio: ingressgateway
  sessionAffinity: None
  type: NodePort

1.4 安装仪表盘

应用服务安装完成之后,需要安装很多相关仪表板进行可视化管理。包括 kiali监控 、k8s 仪表盘、Grafana(BI报表)、Jaeger Trace系统

1.4.1 Kiali 仪表盘安装

[CCE~ samples]$ cd samples/addons
[CCE~ addons]$ ls -l # 包含4个可视化系统:grafana、jaeger、kiali、prometheus
total 304
drwxr-xr-x 2 work work   4096 Nov  4 15:13 extras
-rw-r--r-- 1 work work 240054 Nov  4 15:13 grafana.yaml
-rw-r--r-- 1 work work   2317 Nov  4 15:13 jaeger.yaml
-rw-r--r-- 1 work work  35080 Nov  4 15:13 kiali.yaml
-rw-r--r-- 1 work work  13250 Nov  4 15:13 prometheus.yaml
-rw-r--r-- 1 work work   5186 Nov  4 15:13 README.md

# 安装Kiali和其他插件,等待部署完成!
$ kubectl apply -f samples/addons
$ kubectl rollout status deployment/kiali -n istio-system
Waiting for deployment "kiali" rollout to finish: 0 of 1 updated replicas are available...
deployment "kiali" successfully rolled out


# 访问仪表盘,ctl+c 退出当前读取
[CCE~ addons]$ istioctl dashboard kiali
http://localhost:20001/kiali
[CCE~ addons]$ istioctl dashboard grafana
http://localhost:3000
[CCE~ addons]$ istioctl dashboard jaeger
http://localhost:16686
[CCE~ addons]$ istioctl dashboard prometheus
http://localhost:9090
# 实际上的外部访问地址跟 1.3.6 的做法一致
# 先查一下整个集群的所有namespace的情况
$ kb get service --all-namespaces
# 查一下具体的服务信息
$ kb get service --all-namespaces | grep grafana
# 进行nodePart端口绑定
$ kubectl edit svc grafana -n istio-system  # nodePort的端口绑定 8663
$ kubectl edit svc kiali -n istio-system  # nodePort的端口绑定 8661
$ kubectl edit svc prometheus -n istio-system # nodePort的端口绑定 8664
$ kubectl edit svc tracing -n istio-system # nodePort的端口绑定 8665,jaeger

备注:官方也提供了部署的步骤,可以参考下 搭建步骤

2 Istio能力介绍

2.1 简要介绍

Istio具备丰富的流量治理能力,可以参考官方提供的丰富的测试案例,包含但不限于请求路由配置、故障注入、流量转移、TCP 流量转移、请求超时、熔断、流量镜像、地域负载均衡、Ingress+Egress

2.2 请求路由配置

2.2.1 请求路由调度

请求路由中有一种普遍的需求:就是把让不同的用户群体看到的信息不一样,比如VIP用户和普通用户看到的内容不一样,折扣价也不一样;又比如登录用户和未登录用户看到的信息也不一样。

2.2.1 路由初始化

先应用 virtual-service-all-v1,会把所有请求流量都指向版本1的服务(即Virtual Service版本都指向v1),
virtual-service-all-v1.yaml

应用yaml

[CCE ~]$ cd /
[CCE /]$ cd /home/work/istio-1.8.6/samples/bookinfo/
[CCE bookinfo]$ kubectl -n istio-booking-demo apply -f networking/virtual-service-all-v1.yaml
virtualservice.networking.istio.io/productpage unchanged
virtualservice.networking.istio.io/reviews unchanged
virtualservice.networking.istio.io/ratings unchanged
virtualservice.networking.istio.io/details unchanged

路由规则

[CCE bookinfo]$ kb get virtualservices -o yaml # 查看所有的virtualservices
[CCE bookinfo]$ kb get destinationrules -o yaml # 查看所有的destinationrules
[CCE bookinfo]$ kb get virtualservices reviews -o yaml # 查看reviews的virtualservices
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata: # 数据不重要,这边屏蔽掉
spec:
  hosts:
  - reviews
  http:
  - route:
    - destination: # 指向 reviews 服务的v1版本
        host: reviews
        subset: v1
[CCE bookinfo]$ kb get virtualservices ratings -o yaml # 查看ratings的virtualservices
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata: # 数据不重要,这边屏蔽掉
spec:
  hosts:
  - ratings
  http:
  - route:
    - destination:  # 指向 ratings 服务的v1版本
        host: ratings
        subset: v1

打开 界面,你会发现无论怎么刷新页面,都不会显示星级,那是因为reviews:v1版本不会访问星级评分服务。
image

2.2.2 基于用户身份的路由

做一下调整,让Jason用户的流量转发到 reviews:v2,具体做法是请求Header中 end-user 有具体的人员名称Jason。
virtual-service-reviews-test-v2.yaml

[CCE bookinfo]$ kubectl -n istio-booking-demo apply -f networking/virtual-service-reviews-test-v2.yaml
virtualservice.networking.istio.io/reviews configured
[CCE bookinfo]$ kb get virtualservices reviews -o yaml
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata: # 数据不重要,这边屏蔽掉
spec:
  hosts:
  - reviews
  http:
  - match:
    - headers: # header中带end-user并且值为jason用户的流量走v2版本,其他走v1版本
        end-user:
          exact: jason
    route:
    - destination:
        host: reviews
        subset: v2
  - route:
    - destination:
        host: reviews
        subset: v1

对比一下2.2.1,你会发现差别,这边的reviews服务的在特定的jason用户下会走到v2版本中,效果如下,可以看到ratings服务的星星评价了。

  • productpage → reviews:v2 → ratings (针对 jason 用户)
  • productpage → reviews:v1 (其他用户)

image

image

2.2.3 清除应用程序 Virtual Service

如果不需要规则可以直接删除,删除完,所有的路由规则都不存在了,只剩各个Pod之间的SideCar拦截。

[CCE bookinfo]$ kubectl -n istio-booking-demo delete -f networking/virtual-service-all-v1.yaml

3 总结

本文介绍了Istio的部署和简单的路由实现,后续的章节将探索更多精彩的内容。

  • 复杂路由调度
  • 故障注入
  • 流量转移
  • TCP 流量转移
  • 请求超时
  • 熔断
  • 流量镜像
  • 地域负载均衡
  • Ingress+Egress
  • ServiceEntry


一、写在开头

本文的主题是和大家一起探讨学习:“在浏览器中输入URL开始后,计算机所做的几件事”,这个问题是好几年前自己面试的时候,面试官考问过的,当时准备十分不充分,回答的一塌糊涂,今天拿出来再整理学习一遍,一同进步!

其实这个问题本身倒是不难,但它巧妙的是可以将我们所学过的网络编程知识给串联起来,面试官仅仅通过一个问题就可以考察出我们对于这部分知识的掌握程度。那么我们今天也以此为题展开网络编程的正式学习啦!


二、访问网页的底层运行机制

image

如上图所示,当我们在浏览器的搜索框中输入了“www.baidu.com”并回车后,浏览器就跳转到了百度的首页中,这个过程计算机做了哪些操作呢?

1. 在浏览器中输入指定的URL地址;
2. 浏览器通过DNS协议(域名解析协议),获取域名所对应IP地址;
3. 浏览器根据获取到的IP+端口,向目标服务器端发动一个TCP连接请求;
4. 经过三次握手后TCP连接成功,浏览器会在该TCP连接上发送一个HTTP请求报文;
5. 服务器处理HTTP请求后,反馈响应报文给浏览器;
6. 浏览器收到 HTTP 响应报文后,解析响应体中的 HTML 代码,渲染网页的结构和样式,同时根据 HTML 中的其他资源的 URL(如图片、CSS、JS 等),再次发起 HTTP 请求,获取这些资源的内容,直到网页完全加载显示。
7. 浏览器不与服务器交互时,会通过四次挥手关闭TCP连接。



三、解析底层

在第二章中我们将浏览器的操作分为了7个小点,接下来,我们就来分别解释一下其中所用到的技术,由于单篇文章篇幅不易过长,其中的知识点都是粗讲,达到贯穿理解即可,后续会针对每一个知识点,如TCP/UDP,HTTP,DNS等进行单独分析。


3.1 URL

在我们浏览器输入的一个可以访问我们想要的内容,这个输入就叫做
URL
,英文为:Uniform Resource Locators,释义:统一资源定位器。它可以标识网络中唯一的资源,并给出定位它的路径。与之相关的还有一个
URI
(Uniform Resource Identifier):统一资源标志符,它可以唯一标识一个资源。

URL是一种具体的URI,它不仅唯一标识资源,还提供定位地址,URI比喻成我们的身份证号的话,那URL就是我们具体的家庭住址。

URL的结构:

image

在这里插入图片描述
  • http:// :超文本传输协议,URL的前缀,属于应用层协议,通常有HTTP和HTTPS,文件传输URL中前缀为ftp等。
  • www.example.com : 域名,这里也可以为IP地址,它们之间具有一一映射关系,只不过域名更通俗易记。
  • 80 : 端口,如果指明了访问网址的端口的话,端口会紧跟在域名后面,并用一个冒号隔开,当然有些时候,域名已配置好对应的默认访问地址的话,这里就不会存在端口号。
  • /path/to/myfile.html : 资源路径,从域名(端口)后以/开始到?前结束的这一段路径,作为访问具体资源的一个地址,从第一个/开始,表示从服务器上根目录开始进行索引到的文件路径,上图中要访问的文件就是服务器根目录下/path/to/myfile.html。
  • key1=value&key2=value2 :参数,在http发送get请求时,参数会包含在URL中,与路径以?分割开始,key=value的形式出现,多参情况下,用&分割,有些请求的参数是放在body中的,比如post。
  • # SomewhereInTheDocument : 锚点,顾名思义,是在要访问的页面上的一个锚。要访问的页面大部分都多于一页,如果指定了锚点,那么在客户端显示该网页是就会定位到锚点处,相当于一个小书签。值得一提的是,在 URL 中,锚点以#开头,并且不会作为请求的一部分发送给服务端。


3.2 DNS

我们在上面提到了域名与IP地址映射关系,这里其决定作用的就是DNS(Domain Name System) 域名系统,具体流程如下:

  1. 本地缓冲查询 :当我们在浏览器中输入一个域名后,会先到浏览器缓存中查询,是否已经存在该域名解析结果,如果存在则返回对应IP地址,否则进行下一步;
  2. 本地DNS服务器查询 :本地缓存中没有后,浏览器会向本地配置的DNS服务器发送递归查询请求,如果本地DNS服务器也没有命中,则继续下一步;
  3. 根DNS服务器查询 :如果本地DNS服务器也没有相应的解析结果,它会向根DNS服务器发送迭代查询请求。根DNS服务器负责管理顶级域名服务器的IP地址,它会根据顶级域名(例如.com)的信息返回对应的顶级域名服务器的IP地址;
  4. 顶级域名查询 :本地DNS服务器向顶级域名服务器发送查询请求,并根据顶级域名的信息返回下一级域名服务器的IP地址。这个过程会一层一层地向下查询,直到找到负责解析的权威域名服务器;
  5. 权威域名查询 :本地DNS服务器向权威域名服务器发送查询请求,并获取域名对应的IP地址。本地DNS服务器会将解析结果缓存起来,并返回给浏览器;
  6. 返回并缓存 :浏览器收到本地DNS服务器返回的IP地址后,会将其存储在本地缓存中,并发起与该IP地址相关的网络请求。


3.3 TCP

在3.2部分,通过DNS解析,拿到了目标主机的IP地址后,浏览器就可以向目标服务器发送TCP连接请求,TCP协议是传输层协议,可以在建立了安全连接的基础上,控制数据传输,保证可靠性,并且支持双向通信,像HTTP,HTTPS都是建立在TCP协议之上的。(TCP连接最经典的就是3次握手!)

注意:TCP/IP协议是绑定来看的,建议TCP协议时,需要发送数据,发送数据就需要网络层的IP协议,这个协议一种分组交换的协议,不保证可靠传输,负责将数据包从源主机路由到目标主机。


3.4 HTTP

建立了TCP连接后,浏览器就可以向目标服务器发送HTTP请求报文啦,当然,有些网站配置了安全增强的HTTPS协议,之间的区别,我们后面会单独来聊,包括HTTP1.0,HTTP1.1等内容。


3.5 服务器处理并返回响应

服务器在接收到HTTP报文后,根据对应的接口、参数、cookies生成一个HTML响应,并返回给浏览器,浏览器收到 HTTP 响应报文后,解析响应体中的 HTML 代码,渲染网页的结构和样式,同时根据 HTML 中的其他资源的 URL(如图片、CSS、JS 等),再次发起 HTTP 请求,获取这些资源的内容,直到网页完全加载显示。


四、总结

以上就是整个网络请求所涉及到的TCP/IP四层模型的实战内容,它也是我们网络编程学习中的重中之重,至于其中的每一层的细节,我们后面单个精讲!

1、背景

最近在开发的过程中遇到这么一个问题,当产生某种类型的工单后,需要实时通知到另外的系统,由另外的系统进行数据的研判操作。
由于某种原因
, 像向消息队列中推送工单消息、或直接调用另外系统的接口、或者部署
Cannal
等都不可行,因此此处使用
mysql-binlog-connector-java
这个库来完成数据库
binlog
的监听,从而通知到另外的系统。

2、mysql-binlog-connector-java简介

mysql-binlog-connector-java
是一个Java库,通过它可以实现
mysql binlog
日志的监听和解析操作。它提供了一系列可靠的方法,使开发者通过监听数据库的binlog日志,来
实时
获取数据库的变更信息,比如:数据的
插入

更新

删除
等操作。

github地址
https://github.com/osheroff/mysql-binlog-connector-java

3、准备工作

1、验证数据库是否开启binlog

mysql> show variables like '%log_bin%';
+---------------------------------+------------------------------------+
| Variable_name                   | Value                              |
+---------------------------------+------------------------------------+
| log_bin                         | ON                                 |
| log_bin_basename                | /usr/local/mysql/data/binlog       |
| log_bin_index                   | /usr/local/mysql/data/binlog.index |
| log_bin_trust_function_creators | OFF                                |
| log_bin_use_v1_row_events       | OFF                                |
| sql_log_bin                     | ON                                 |
+---------------------------------+------------------------------------+

log_bin
的值为
ON
时,表示开启了
binlog

2、开启数据库的binlog

# 修改 my.cnf 配置文件
[mysqld]
#binlog日志的基本文件名,需要注意的是启动mysql的用户需要对这个目录(/usr/local/var/mysql/binlog)有写入的权限
log_bin=/usr/local/var/mysql/binlog/mysql-bin
# 配置binlog日志的格式
binlog_format = ROW
# 配置 MySQL replaction 需要定义,不能和已有的slaveId 重复
server-id=1

3、创建具有REPLICATION SLAVE权限的用户

CREATE USER binlog_user IDENTIFIED BY 'binlog#Replication2024!';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'binlog_user'@'%';
FLUSH PRIVILEGES;

使用BinaryLogClient需要的权限

4、事件类型 eventType 解释

注意:不同的mysql版本事件类型可能不同,我们本地是mysql8

TABLE_MAP: 在表的 insert、update、delete 前的事件,用于记录操作的数据库名和表名。
EXT_WRITE_ROWS: 插入数据事件类型,即 insert 类型
EXT_UPDATE_ROWS: 插入数据事件类型,即 update 类型
EXT_DELETE_ROWS: 插入数据事件类型,即 delete 类型

ROTATE: 当mysqld切换到新的二进制日志文件时写入。当发出一个FLUSH LOGS 语句。或者当前二进制日志文件超过max_binlog_size。

1、TABLE_MAP 的注意事项

一般情况下,当我们向数据库中执行
insert

update

delete
事件时,一般会先有一个
TABLE_MAP
事件发出,通过这个事件,我们就知道当前操作的是那个数据库和表。
但是
如果我们操作的表上存在触发器时,那么可能顺序就会错乱,导致我们获取到错误的数据库名和表名。
解决方案

2、获取操作的列名

此处以
EXT_UPDATE_ROWS
事件为列,当我们往数据库中
update
一条记录时,触发此事件,事件内容为:

Event{header=EventHeaderV4{timestamp=1727498351000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=201, nextPosition=785678, flags=0}, data=UpdateRowsEventData{tableId=264, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5, 6, 7}, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
    {before=[1, zhangsan, 张三-update, 0, [B@7b720427, [B@238552f, 1727524798000, 1727495998000], after=[1, zhangsan, 张三-update, 0, [B@21dae489, [B@2c0fff72, 1727527151000, 1727498351000]}
]}}

从上面的语句中可以看到
includedColumnsBeforeUpdate

includedColumns
这2个字段表示更新前的列名和更新后的列名,
但是这个时候展示的数字,那么如果展示具体的列名呢?
可以通过
information_schema.COLUMNS
获取。

列名获取

5、监听binlog的position

1、从最新的binlog位置开始监听

默认情况下,就是从最新的binlog位置开始监听。

BinaryLogClient client = new BinaryLogClient(hostname, port, username, password);

2、从指定的位置开始监听

BinaryLogClient client = new BinaryLogClient(hostname, port, username, password);
// binlog的文件名
client.setBinlogFilename("");
// binlog的具体位置
client.setBinlogPosition(11);

3、断点续传

这个指的是,当我们的
mysql-binlog-connector-java
程序宕机后,如果数据发生了binlog的变更,我们应该从程序上次宕机的位置的position进行监听,而不是程序重启后从最新的binlog position位置开始监听。默认情况下
mysql-binlog-connector-java
程序没有为我们实现,需要我们自己去实现。大概的实现思路为:

  1. 监听
    ROTATE
    事件,可以获取到最新的binlog文件名和位置。
  2. 记录每个事件的position的位置。

6、创建表和准备测试数据

CREATE TABLE `binlog_demo`
(
    `id`          int NOT NULL AUTO_INCREMENT COMMENT '主键',
    `user_name`   varchar(64) DEFAULT NULL COMMENT '用户名',
    `nick_name`   varchar(64) DEFAULT NULL COMMENT '昵称',
    `sex`         tinyint     DEFAULT NULL COMMENT '性别 0-女 1-男 2-未知',
    `address`     text COMMENT '地址',
    `ext_info`    json        DEFAULT NULL COMMENT '扩展信息',
    `create_time` datetime    DEFAULT NULL COMMENT '创建时间',
    `update_time` timestamp NULL DEFAULT NULL COMMENT '修改时间',
    PRIMARY KEY (`id`),
    UNIQUE KEY `uidx_username` (`user_name`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='测试binlog'

-- 0、删除数据
truncate table binlog_demo;

-- 1、添加数据
insert into binlog_demo(user_name, nick_name, sex, address, ext_info, create_time, update_time)
values ('zhangsan', '张三', 1, '地址', '[
  "aaa",
  "bbb"
]', now(), now());

-- 2、修改数据
update binlog_demo
set nick_name   = '张三-update',
    sex         = 0,
    address     = '地址-update',
    ext_info    = '{
      "ext_info": "扩展信息"
    }',
    create_time = now(),
    update_time = now()
where user_name = 'zhangsan';

-- 3、删除数据
delete
from binlog_demo
where user_name = 'zhangsan';

4、功能实现

通过
mysql-binlog-connector-java
库,当
数据库
中的
表数据
发生
变更
时,
进行监听

1、从最新的binlog位置开始监听

1、引入jar包

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- 监听 mysql binlog -->
    <dependency>
        <groupId>com.zendesk</groupId>
        <artifactId>mysql-binlog-connector-java</artifactId>
        <version>0.29.2</version>
    </dependency>
</dependencies>

2、监听binlog数据

package com.huan.binlog;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 初始化 binary log client
 *
 * @author huan.fu
 * @date 2024/9/22 - 16:23
 */
@Component
public class BinaryLogClientInit {

    private static final Logger log = LoggerFactory.getLogger(BinaryLogClientInit.class);

    private BinaryLogClient client;

    @PostConstruct
    public void init() throws IOException, TimeoutException {
        /**
         * # 创建用户
         * CREATE USER binlog_user IDENTIFIED BY 'binlog#Replication2024!';
         * GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'binlog_user'@'%';
         * FLUSH PRIVILEGES;
         */
        String hostname = "127.0.0.1";
        int port = 3306;
        String username = "binlog_user";
        String password = "binlog#Replication2024!";
        // 创建 BinaryLogClient客户端
        client = new BinaryLogClient(hostname, port, username, password);
        // 这个 serviceId 不可重复
        client.setServerId(12);

        // 反序列化配置
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(
                // 将日期类型的数据反序列化成Long类型
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
        );

        client.setEventDeserializer(eventDeserializer);
        client.registerEventListener(new BinaryLogClient.EventListener() {
            @Override
            public void onEvent(Event event) {
                EventType eventType = event.getHeader().getEventType();
                log.info("接收到事件类型: {}", eventType);
                log.warn("接收到的完整事件: {}", event);
                log.info("============================");
            }
        });
        client.registerLifecycleListener(new BinaryLogClient.AbstractLifecycleListener() {
            @Override
            public void onConnect(BinaryLogClient client) {
                log.info("客户端连接到 mysql 服务器 client: {}", client);
            }

            @Override
            public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
                log.info("客户端和 mysql 服务器 通讯失败 client: {}", client);
            }

            @Override
            public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
                log.info("客户端序列化失败 client: {}", client);
            }

            @Override
            public void onDisconnect(BinaryLogClient client) {
                log.info("客户端断开 mysql 服务器链接 client: {}", client);
            }
        });
        // client.connect 在当前线程中进行解析binlog,会阻塞当前线程
        // client.connect(xxx) 会新开启一个线程,然后在这个线程中解析binlog
        client.connect(10000);
    }

    @PreDestroy
    public void destroy() throws IOException {
        client.disconnect();
    }
}

3、测试

更新数据
从上图中可以看到,我们获取到了更新后的数据,但是具体更新了哪些列名这个我们是不清楚的。

2、获取数据更新具体的列名

此处以
更新数据为例
,大体的实现思路如下:

  1. 通过监听
    TABLE_MAP
    事件,用于获取到
    insert

    update

    delete
    语句操作前的
    数据库


  2. 通过查询
    information_schema.COLUMNS
    表获取 某个表在某个数据库中具体的列信息(比如:列名、列的数据类型等操作)。

2.1 新增common-dbutils依赖用于操作数据库

<!-- 操作数据库 -->
<dependency>
    <groupId>commons-dbutils</groupId>
    <artifactId>commons-dbutils</artifactId>
    <version>1.8.1</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
</dependency>

2.2 监听TABLE_MAP事件,获取数据库和表名

  1. 定义2个
    成员变量

    database

    tableName
    用于接收数据库和表名。
/**
 * 数据库
 */
private String database;
/**
 * 表名
 */
private String tableName;
  1. 监听
    TABLE_MAP
    事件,获取数据库和表名
// 成员变量 - 数据库名
private String database;
// 成员变量 - 表名
private String tableName;

client.registerEventListener(new BinaryLogClient.EventListener() {
    @Override
    public void onEvent(Event event) {
        EventType eventType = event.getHeader().getEventType();
        log.info("接收到事件类型: {}", eventType);
        log.info("============================");

        if (event.getData() instanceof TableMapEventData) {
            TableMapEventData eventData = (TableMapEventData) event.getData();
            database = eventData.getDatabase();
            tableName = eventData.getTable();
            log.info("获取到的数据库名: {} 和 表名为: {}", database, tableName);
        }
    }
});

监听TABLE_MAP事件,获取数据库和表名

2.3 编写工具类获取表的列名和位置信息

编写工具类获取表的列名和位置信息

/**
 * 数据库工具类
 *
 * @author huan.fu
 * @date 2024/10/9 - 02:39
 */
public class DbUtils {

    public static Map<String, String> retrieveTableColumnInfo(String database, String tableName) throws SQLException {
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/temp_work", "binlog_user", "binlog#Replication2024!");

        QueryRunner runner = new QueryRunner();
        Map<String, String> columnInfoMap = runner.query(
                connection,
                "select a.COLUMN_NAME,a.ORDINAL_POSITION from information_schema.COLUMNS a where a.TABLE_SCHEMA = ? and a.TABLE_NAME = ?",
                resultSet -> {
                    Map<String, String> result = new HashMap<>();
                    while (resultSet.next()) {
                        result.put(resultSet.getString("ORDINAL_POSITION"), resultSet.getString("COLUMN_NAME"));
                    }
                    return result;
                },
                database,
                tableName
        );
        connection.close();
        return columnInfoMap;
    }

    public static void main(String[] args) throws SQLException {
        Map<String, String> stringObjectMap = DbUtils.retrieveTableColumnInfo("temp_work", "binlog_demo");
        System.out.println(stringObjectMap);
    }
}

编写工具类获取表的列名和位置信息

2.4 以更新语句为例获取 更新的列名和对应的值

1、编写java代码获取更新后的列和值信息

client.registerEventListener(new BinaryLogClient.EventListener() {
    @Override
    public void onEvent(Event event) {
        EventType eventType = event.getHeader().getEventType();
        log.info("接收到事件类型: {}", eventType);
        log.warn("接收到的完整事件: {}", event);
        log.info("============================");

        // 通过 TableMap 事件获取 数据库名和表名
        if (event.getData() instanceof TableMapEventData) {
            TableMapEventData eventData = (TableMapEventData) event.getData();
            database = eventData.getDatabase();
            tableName = eventData.getTable();
            log.info("获取到的数据库名: {} 和 表名为: {}", database, tableName);
        }

        // 监听更新事件
        if (event.getData() instanceof UpdateRowsEventData) {
            try {
                // 获取表的列信息
                Map<String, String> columnInfo = DbUtils.retrieveTableColumnInfo(database, tableName);
                // 获取更新后的数据
                UpdateRowsEventData eventData = ((UpdateRowsEventData) event.getData());
                // 可能更新多行数据
                List<Map.Entry<Serializable[], Serializable[]>> rows = eventData.getRows();

                for (Map.Entry<Serializable[], Serializable[]> row : rows) {
                    // 更新前的数据
                    Serializable[] before = row.getKey();
                    // 更新后的数据
                    Serializable[] after = row.getValue();
                    // 保存更新后的一行数据
                    Map<String, Serializable> afterUpdateRowMap = new HashMap<>();
                    for (int i = 0; i < after.length; i++) {
                        // 因为 columnInfo 中的列名的位置是从1开始,而此处是从0开始
                        afterUpdateRowMap.put(columnInfo.get((i + 1) + ""), after[i]);
                    }
                    log.info("监听到更新的数据为: {}", afterUpdateRowMap);
                }
            } catch (Exception e) {
                log.error("监听更新事件发生了异常");
            }
        }

        // 监听插入事件
        if (event.getData() instanceof WriteRowsEventData) {
            log.info("监听到插入事件");
        }

        // 监听删除事件
        if (event.getData() instanceof DeleteRowsEventData) {
            log.info("监听到删除事件");
        }
    }
});

2、执行更新语句

update binlog_demo
    set nick_name = '张三-update11',
        -- sex = 0,
        -- address = '地址-update1',
        -- ext_info = '{"ext_info":"扩展信息"}',
        -- create_time = now(),
        update_time = now()
where user_name = 'zhangsan';

3、查看监听到更新数据信息

查看监听到更新数据信息

3、自定义序列化字段

从下图中可知,针对
text
类型的字段,默认转换成了
byte[]
类型,那么怎样将其转换成
String
类型呢?

此处针对更新语句来演示

数据库中text类型变成了字节数组类型

3.1 自定义更新数据text类型字段的反序列

注意:断点跟踪源码发现text类型的数据映射成了blob类型,因此需要重写 deserializeBlob 方法

public class CustomUpdateRowsEventDataDeserializer extends UpdateRowsEventDataDeserializer {
    public CustomUpdateRowsEventDataDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
        super(tableMapEventByTableId);
    }

    @Override
    protected Serializable deserializeBlob(int meta, ByteArrayInputStream inputStream) throws IOException {
        byte[] bytes = (byte[]) super.deserializeBlob(meta, inputStream);
        if (null != bytes && bytes.length > 0) {
            return new String(bytes, StandardCharsets.UTF_8);
        }
        return null;
    }
}

3.2 注册更新数据的反序列

注意: 需要通过 EventDeserializer 来进行注册

// 反序列化配置
EventDeserializer eventDeserializer = new EventDeserializer();

Field field = EventDeserializer.class.getDeclaredField("tableMapEventByTableId");
field.setAccessible(true);
Map<Long, TableMapEventData> tableMapEventByTableId = (Map<Long, TableMapEventData>) field.get(eventDeserializer);
eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS, new CustomUpdateRowsEventDataDeserializer(tableMapEventByTableId)
        .setMayContainExtraInformation(true));

3.3 更新text类型的字段,看输出的结果

可以看到数据库中的text类型已经转换成String类型了

4、只订阅感兴趣的事件

// 反序列化配置
EventDeserializer eventDeserializer = new EventDeserializer();
 eventDeserializer.setCompatibilityMode(
         // 将日期类型的数据反序列化成Long类型
         EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
 );
// 表示对 删除事件不感兴趣 ( 对于DELETE事件的反序列化直接返回null )
 eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, new NullEventDataDeserializer());

对于不感兴趣的事件直接使用
NullEventDataDeserializer
,可以提高程序的性能。

5、断点续传

当binlog的信息发生变更时,需要保存起来,下次程序重新启动时,读取之前保存好的binlog信息。

5.1 binlog信息持久化

此处为了模拟,将binlog的信息保存到文件中。

/**
 * binlog position 的持久化处理
 *
 * @author huan.fu
 * @date 2024/10/11 - 12:54
 */
public class FileBinlogPositionHandler {

    /**
     * binlog 信息实体类
     */
    public static class BinlogPositionInfo {
        /**
         * binlog文件的名字
         */
        public String binlogName;
        /**
         * binlog的位置
         */
        private Long position;
        /**
         * binlog的server id的值
         */
        private Long serverId;
    }

    /**
     * 保存binlog信息
     *
     * @param binlogName binlog文件名
     * @param position   binlog位置信息
     * @param serverId   binlog server id
     */
    public void saveBinlogInfo(String binlogName, Long position, Long serverId) {
        List<String> data = new ArrayList<>(3);
        data.add(binlogName);
        data.add(position + "");
        data.add(serverId + "");
        try {
            Files.write(Paths.get("binlog-info.txt"), data);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取 binlog 信息
     *
     * @return BinlogPositionInfo
     */
    public BinlogPositionInfo retrieveBinlogInfo() {
        try {
            List<String> lines = Files.readAllLines(Paths.get("binlog-info.txt"));
            BinlogPositionInfo info = new BinlogPositionInfo();
            info.binlogName = lines.get(0);
            info.position = Long.parseLong(lines.get(1));
            info.serverId = Long.parseLong(lines.get(2));
            return info;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

5.2、构建BinaryLogClient时,传递已存在的binlog信息

// 设置 binlog 信息
FileBinlogPositionHandler fileBinlogPositionHandler = new FileBinlogPositionHandler();
FileBinlogPositionHandler.BinlogPositionInfo binlogPositionInfo = fileBinlogPositionHandler.retrieveBinlogInfo();
if (null != binlogPositionInfo) {
    log.info("获取到了binlog 信息 binlogName: {} position: {} serverId: {}", binlogPositionInfo.binlogName,
            binlogPositionInfo.position, binlogPositionInfo.serverId);
    client.setBinlogFilename(binlogPositionInfo.binlogName);
    client.setBinlogPosition(binlogPositionInfo.position);
    client.setServerId(binlogPositionInfo.serverId);
}

5.3 更新binlog信息

// FORMAT_DESCRIPTION(写入每个二进制日志文件前的描述事件) HEARTBEAT(心跳事件)这2个事件不进行binlog位置的记录
if (eventType != EventType.FORMAT_DESCRIPTION && eventType != EventType.HEARTBEAT) {
    // 当有binlog文件切换时产生
    if (event.getData() instanceof RotateEventData) {
        RotateEventData eventData = event.getData();
        // 保存binlog position 信息
        fileBinlogPositionHandler.saveBinlogInfo(eventData.getBinlogFilename(), eventData.getBinlogPosition(), event.getHeader().getServerId());
    } else {
        // 非 rotate 事件,保存位置信息
        EventHeaderV4 header = event.getHeader();
        FileBinlogPositionHandler.BinlogPositionInfo info = fileBinlogPositionHandler.retrieveBinlogInfo();
        long position = header.getPosition();
        long serverId = header.getServerId();
        fileBinlogPositionHandler.saveBinlogInfo(info.binlogName, position, serverId);
    }
}

5.4 演示

  1. 启动程序
  2. 修改
    address
    的值为
    地址-update2
  3. 停止程序
  4. 修改
    address
    的值为
    地址-offline-update
  5. 启动程序,看能否收到 上一步修改address的值为
    地址-offline-update
    的事件
    演示结果

5、参考地址

  1. github地址 - https://github.com/osheroff/mysql-binlog-connector-java
  2. maven仓库地址https://mvnrepository.com/artifact/com.zendesk/mysql-binlog-connector-java/0.29.2
  3. TABLE_MAP事件顺序问题. - https://github.com/shyiko/mysql-binlog-connector-java/issues/67
  4. dbutils的官网 - https://commons.apache.org/proper/commons-dbutils/examples.html

MySQL数据备份是数据库管理员非常重要的工作之一。系统意外崩溃或者硬件的损坏都可能导致数据的丢失,因此MySQL管理员应该定期地备份数据,使得在意外情况发生时最大限度地减少损失。本节将介绍数据备份的3种方法。

11.1.1  使用mysqldump命令备份数据

mysqldump是MySQL提供的一个非常有用的数据库备份工具。mysqldump命令执行时,可以将数据库备份成一个文本文件,该文件中实际包含了多个CREATE和INSERT语句,使用这些语句可以重新创建表和插入数据。

mysqldump备份数据库的基本语法格式如下:

mysqldump  –u user –h host –ppassword dbname[tbname, [tbname...]]> filename.sql

user表示用户名称;host表示登录用户的主机名称;password为登录密码;dbname为需要备份的数据库名称;tbname为dbname数据库中需要备份的数据表,可以指定多张需要备份的表,如果不指定,则表示备份所有数据表;右箭头符号(>)告诉mysqldump将备份数据表的定义和数据写入备份文件;filename.sql为备份文件的名称。

1. 使用mysqldump备份单个数据库中的所有表

【例11.1】使用mysqldump命令备份单个数据库中的所有表。

为了更好地理解mysqldump工具是如何工作的,这里给出一个完整的数据库例子。首先登录MySQL,按下面数据库结构创建booksDB数据库和各个表,并插入数据记录。数据库和表定义如下:

CREATE DATABASE booksDB;

use booksDB;



CREATE TABLE books

(

bk_id INT NOT NULL PRIMARY KEY,

bk_title VARCHAR(
50) NOT NULL,

copyright YEAR NOT NULL

);

INSERT INTO books

VALUES (
11078, 'Learning MySQL', 2010),

(
11033, 'Study Html', 2011),

(
11035, 'How to use php', 2003),

(
11072, 'Teach yourself javascript', 2005),

(
11028, 'Learning C++', 2005),

(
11069, 'MySQL professional', 2009),

(
11026, 'Guide to MySQL 9.0', 2024),

(
11041, 'Inside VC++', 2011);



CREATE TABLE authors

(

auth_id INT NOT NULL PRIMARY KEY,

auth_name VARCHAR(
20),

auth_gender CHAR(
1)

);

INSERT INTO authors

VALUES (
1001, 'WriterX' ,'f'),

(
1002, 'WriterA' ,'f'),

(
1003, 'WriterB' ,'m'),

(
1004, 'WriterC' ,'f'),

(
1011, 'WriterD' ,'f'),

(
1012, 'WriterE' ,'m'),

(
1013, 'WriterF' ,'m'),

(
1014, 'WriterG' ,'f'),

(
1015, 'WriterH' ,'f');



CREATE TABLE authorbook

(

auth_id INT NOT NULL,

bk_id INT NOT NULL,

PRIMARY KEY (auth_id, bk_id),

FOREIGN KEY (auth_id) REFERENCES authors (auth_id),

FOREIGN KEY (bk_id) REFERENCES books (bk_id)

);



INSERT INTO authorbook

VALUES (
1001, 11033), (1002, 11035), (1003, 11072), (1004, 11028),

(
1011, 11078), (1012, 11026), (1012, 11041), (1014, 11069);

完成数据插入后,打开操作系统命令行输入窗口,输入如下备份命令:

C:\ >mysqldump -u root -p booksdb > C:/backup/booksdb_20240301.sql

Enter password:
**

这里要保证C盘下的backup文件夹存在,否则将提示错误信息:系统找不到指定的路径。

输入密码之后,MySQL便对数据库进行备份。使用文本查看器打开C:\backup文件夹下刚才备份过的文件,部分文件内容大致如下:

-- MySQL dump 10.13  Distrib 9.0.1, forWin64 (x86_64)--

--Host: localhost    Database: booksdb-- ------------------------------------------------------

-- Server version    9.0.1

/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT*/;/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS*/;/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION*/;/*!50503 SET NAMES utf8mb4*/;/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE*/;/*!40103 SET TIME_ZONE='+00:00'*/;/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0*/;/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0*/;/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO'*/;/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0*/;--

-- Table structure fortable `authorbook`--DROP TABLE IF EXISTS `authorbook`;/*!40101 SET @saved_cs_client     = @@character_set_client*/;/*!50503 SET character_set_client = utf8mb4*/;

CREATE TABLE `authorbook` (

`auth_id`
intNOT NULL,

`bk_id`
intNOT NULL,

PRIMARY KEY (`auth_id`,`bk_id`),

KEY `bk_id` (`bk_id`),

CONSTRAINT `authorbook_ibfk_1` FOREIGN KEY (`auth_id`) REFERENCES `authors` (`auth_id`),

CONSTRAINT `authorbook_ibfk_2` FOREIGN KEY (`bk_id`) REFERENCES `books` (`bk_id`)

) ENGINE
=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;/*!40101 SET character_set_client = @saved_cs_client*/;-- -- Dumping data fortable `authorbook`--LOCK TABLES `authorbook` WRITE;/*!40000 ALTER TABLE `authorbook` DISABLE KEYS*/;

INSERT INTO `authorbook` VALUES (
1012,11026),(1004,11028),(1001,11033),(1002,11035),(1012,11041),(1014,11069),(1003,11072),(1011,11078);/*!40000 ALTER TABLE `authorbook` ENABLE KEYS*/;

UNLOCK TABLES;
-- -- Table structure fortable `authors`--DROP TABLE IF EXISTS `authors`;/*!40101 SET @saved_cs_client = @@character_set_client*/;/*!50503 SET character_set_client = utf8mb4*/;

CREATE TABLE `authors` (

`auth_id`
intNOT NULL,

`auth_name` varchar(
20) DEFAULT NULL,

`auth_gender`
char(1) DEFAULT NULL,

PRIMARY KEY (`auth_id`)

) ENGINE
=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;/*!40101 SET character_set_client = @saved_cs_client*/;-- -- Dumping data fortable `authors`--LOCK TABLES `authors` WRITE;/*!40000 ALTER TABLE `authors` DISABLE KEYS*/;

INSERT INTO `authors` VALUES (
1001,'WriterX','f'),(1002,'WriterA','f'),(1003,'WriterB','m'),(1004,'WriterC','f'),(1011,'WriterD','f'),(1012,'WriterE','m'),(1013,'WriterF','m'),(1014,'WriterG','f'),(1015,'WriterH','f');/*!40000 ALTER TABLE `authors` ENABLE KEYS*/;

UNLOCK TABLES;
-- -- Table structure fortable `books`--DROP TABLE IF EXISTS `books`;/*!40101 SET @saved_cs_client = @@character_set_client*/;/*!50503 SET character_set_client = utf8mb4*/;

CREATE TABLE `books` (

`bk_id`
intNOT NULL,

`bk_title` varchar(
50) NOT NULL,

`copyright` year NOT NULL,

PRIMARY KEY (`bk_id`)

) ENGINE
=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;/*!40101 SET character_set_client = @saved_cs_client*/;-- -- Dumping data fortable `books`--LOCK TABLES `books` WRITE;/*!40000 ALTER TABLE `books` DISABLE KEYS*/;

INSERT INTO `books` VALUES (
11026,'Guide to MySQL 9.0',2024),(11028,'Learning C++',2005),(11033,'Study Html',2011),(11035,'How to use php',2003),(11041,'Inside VC++',2011),(11069,'MySQL professional',2009),(11072,'Teach yourself javascript',2005),(11078,'Learning MySQL',2010);/*!40000 ALTER TABLE `books` ENABLE KEYS*/;

UNLOCK TABLES;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE*/;/*!40101 SET SQL_MODE=@OLD_SQL_MODE*/;/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS*/;/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS*/;/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT*/;/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS*/;/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION*/;/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES*/;-- Dump completed on 2024-07-25 18:39:42

可以看到,备份文件中包含了一些信息,文件开头首先表明了备份文件使用的mysqldump工具的版本号;然后是备份账户的名称和主机信息,以及备份的数据库的名称;最后是MySQL服务器的版本号,在这里为9.0.1。

接下来是一些SET语句,这些语句将一些系统变量值赋给用户自定义变量,以确保被恢复的数据库的系统变量和原来备份时的变量相同,例如:

/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT*/;

该SET语句将当前系统变量character_set_client的值赋给用户自定义变量@old_character_ set_client。其他变量与此类似。

备份文件的最后几行是使用SET语句恢复服务器系统变量原来的值,例如:

/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT*/;

该语句将用户自定义的变量@old_character_set_client中保存的值赋给实际的系统变量character_set_client。

备份文件中以“--”字符开头的行为注释语句;以“/*!”开头、“*/”结尾的语句为可执行的MySQL注释,这些语句可以被MySQL执行,但在其他数据库管理系统中将被作为注释忽略,以提高数据库的可移植性。

另外,备份文件的一些可执行注释语句以数字开头,该数字代表的是MySQL版本号,表示这些语句只有在指定的MySQL版本或者比该版本高的情况下才能执行。例如“40101”,表明这些语句只有在MySQL版本号为4.01.01或者更高的条件下才可以被执行。

2. 使用mysqldump备份单个数据库中的某张表

mysqldump还可以备份数据库中的某张表。

备份某张表和备份数据库中所有表的语句的不同之处在于,要在数据库名称dbname之后指定需要备份的表名称。

【例11.2】备份booksDB数据库中的表books,SQL语句如下:

mysqldump -u root -p booksDB books > C:/backup/books_20240301.sql 

该语句创建名称为“books_20240301.sql”的备份文件,文件中包含了前面介绍的SET语句等内容;不同的是,该文件只包含表books的CREATE和INSERT语句。

3. 使用mysqldump备份多个数据库

如果要使用mysqldump备份多个数据库,就需要使用--databases参数。备份多个数据库的语法格式如下:

mysqldump  –u user –h host –p --databases  [dbname, [dbname...]] > filename.sql

使用--databases参数之后,必须指定至少一个数据库的名称,多个数据库名称之间用空格隔开。

【例11.3】使用mysqldump备份数据库booksDB和test_db,SQL语句如下:

mysqldump -u root -p --databases  booksDB test_db>C:\backup\books_testDB_20240301.sql

该语句创建名称为“books_testDB_20240301.sql”的备份文件,该文件中包含了创建两个数据库booksDB和test_db所必需的所有语句。

另外,使用--all-databases参数可以备份系统中所有的数据库,SQL语句如下:

mysqldump  –u user –h host –p --all-databases > filename.sql

使用参数--all-databases时,不需要指定数据库名称。

【例11.4】使用mysqldump备份服务器中的所有数据库,输入语句如下:

mysqldump  -u root -p --all-databases > C:/backup/alldbinMySQL.sql

该语句创建名称为“alldbinMySQL.sql”的备份文件,文件中包含了系统中所有数据库的备份信息。

如果在服务器上进行备份,并且表均为MyISAM表,就应该考虑使用MySQLhotcopy,因为可以更快地进行备份和恢复。

11.1.2  直接复制整个数据库目录

因为MySQL表保存为文件方式,所以可以直接复制MySQL数据库的存储目录和文件进行备份。MySQL的数据库目录位置不一定相同,在Windows平台下,MySQL 9.0存放数据库的目录通常为“C:\Documents and Settings\All Users\Application Data\MySQL\MySQL Server 9.0\data”或者其他用户自定义目录;在Linux平台下,数据库目录位置通常为“/var/lib/MySQL/”,不同Linux版本下目录会有所不同,读者应在自己使用的平台下查找该目录。

这是一种简单、快速、有效的备份方式。要想保持备份的一致性,备份前需要对相关表执行LOCK TABLES操作,然后对表执行FLUSH TABLES语句。FLUSH TABLES语句可以确保开始备份前将所有激活的索引页写入硬盘,这样当复制数据库目录中的文件时,将允许其他客户继续查询表。当然,也可以停止MySQL服务再进行备份操作。

这种方法虽然简单,但并不是最好的,因为这种方法对InnoDB存储引擎的表不适用。使用这种方法备份的数据最好恢复到相同版本的服务器中,不同的版本可能不兼容。

在MySQL版本号中,第一个数字表示主版本号,主版本号相同的MySQL数据库文件格式相同。

11.1.3  使用MySQLhotcopy工具快速备份

MySQLhotcopy是一个Perl脚本,最初由Tim Bunce编写并提供。它使用LOCK TABLES、FLUSH TABLES和cp或scp来快速备份数据库。它是备份数据库或单表最快的途径,但它只能运行在数据库目录所在的机器上,并且只能备份MyISAM类型的表。MySQLhotcopy在UNIX系统中运行。

MySQLhotcopy命令的语法格式如下:

mysqlhotcopy db_name_1, ... db_name_n  /path/to/new_directory

db_name_1,…,db_name_n分别为需要备份的数据库的名称;“/path/to/new_directory”指定备份文件目录。

【例11.5】使用MySQLhotcopy备份数据库test_db到“/usr/backup”目录下,SQL语句如下:

mysqlhotcopy  -u root –p test_db /usr/backup

要想执行MySQLhotcopy,必须可以访问备份的表文件,具有那些表的SELECT权限、RELOAD权限(以便能够执行FLUSH TABLES)和LOCK TABLES权限。

MySQLhotcopy只是将表所在的目录复制到另一个位置,只能用于备份MyISAM表和ARCHIVE表,备份InnoDB类型的数据表时会出现错误信息。由于它复制本地格式的文件,因此也不能移植到其他硬件或操作系统下。