2024年4月

本文分享自华为云社区《
Python网络编程实践从Socket到HTTP协议的探索与实现
》,作者:柠檬味拥抱。

在当今互联网时代,网络编程是程序员不可或缺的一项技能。Python作为一种高级编程语言,提供了丰富的网络编程库,使得开发者能够轻松地实现各种网络应用。本文将介绍Python中两种主要的网络编程方式:Socket编程和基于HTTP协议的网络编程,并通过实际案例来演示它们的应用。

1. Socket编程

Socket是实现网络通信的基础。通过Socket,程序可以在网络中传输数据,实现客户端与服务器之间的通信。Python提供了
socket
模块,使得Socket编程变得简单而直观。

下面是一个简单的Socket服务器和客户端的实现:

# 服务器端
import socket

# 创建socket对象
server_socket
=socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 获取本地主机名
host
=socket.gethostname()
port
= 9999# 绑定端口
server_socket.bind((host, port))

# 设置最大连接数,超过后排队
server_socket.listen(
5)whileTrue:
# 建立客户端连接
client_socket, addr
=server_socket.accept()

print(
"连接地址: %s" %str(addr))

msg
= '欢迎访问Socket服务器!' + "\r\n"client_socket.send(msg.encode('utf-8'))

client_socket.close()
# 客户端
import socket

# 创建socket对象
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 获取本地主机名
host = socket.gethostname()
port = 9999

# 连接服务,指定主机和端口
client_socket.connect((host, port))

# 接收服务端发送的数据
msg = client_socket.recv(1024)

print(msg.decode('utf-8'))

client_socket.close()

运行以上代码,可以在本地搭建一个简单的Socket服务器,并通过客户端连接并接收消息。

2. HTTP协议的实践

HTTP(HyperText Transfer Protocol)是一种用于传输超媒体文档(例如HTML)的应用层协议。Python提供了多种库用于HTTP通信,其中最常用的是
requests
库。

以下是一个使用
requests
库发送HTTP GET请求的示例:

import requests

url
= 'https://api.github.com'response= requests.get(url)

print(
"状态码:", response.status_code)
print(
"响应内容:", response.text)

通过
requests.get()
函数可以发送HTTP GET请求,并获取响应的状态码和内容。

3. 使用Socket进行简单的网络通信

Socket编程在Python中是一种基础的网络通信方式,它提供了一种在网络上发送和接收数据的方法,可用于构建各种类型的网络应用程序,包括即时通讯、文件传输等。

下面是一个简单的基于Socket的聊天程序,包括服务端和客户端:

# 服务器端
import socket

server_socket
=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host
=socket.gethostname()
port
= 12345server_socket.bind((host, port))
server_socket.listen(
1)

print(
"等待客户端连接...")
client_socket, client_address
=server_socket.accept()
print(
"连接地址:", client_address)whileTrue:
data
= client_socket.recv(1024).decode('utf-8')ifnot data:breakprint("客户端消息:", data)
message
= input("服务器消息:")
client_socket.send(message.encode(
'utf-8'))

client_socket.close()
# 客户端
import socket

client_socket
=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host
=socket.gethostname()
port
= 12345client_socket.connect((host, port))whileTrue:
message
= input("客户端消息:")
client_socket.send(message.encode(
'utf-8'))
data
= client_socket.recv(1024).decode('utf-8')
print(
"服务器消息:", data)

client_socket.close()

运行以上代码,可以实现一个简单的基于Socket的聊天程序。客户端和服务器端可以互相发送消息,实现简单的即时通讯功能。

4. 使用HTTP协议进行网络通信

HTTP协议是一种应用层协议,广泛用于传输超文本文档(如HTML)的数据传输。在Python中,使用HTTP协议进行网络通信通常通过
requests
库来实现,这个库提供了简单易用的接口,方便发送HTTP请求和处理响应。

下面是一个使用
requests
库发送HTTP POST请求的示例:

import requests

url
= 'https://httpbin.org/post'data= {'key1': 'value1', 'key2': 'value2'}

response
= requests.post(url, data=data)

print(
"状态码:", response.status_code)
print(
"响应内容:", response.text)

运行以上代码,可以向指定的URL发送一个HTTP POST请求,并获取服务器返回的响应。

5. 使用Socket进行多线程网络通信

在实际应用中,往往需要处理多个客户端的连接请求。为了实现高并发处理,可以使用多线程来处理每个客户端的连接。Python的
threading
模块提供了多线程支持,可以很方便地实现多线程网络通信。

以下是一个使用多线程处理Socket连接的示例:

# 服务器端
import socket
import threading

def handle_client(client_socket):
whileTrue:
data
= client_socket.recv(1024).decode('utf-8')ifnot data:breakprint("客户端消息:", data)
message
= input("服务器消息:")
client_socket.send(message.encode(
'utf-8'))
client_socket.close()

server_socket
=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host
=socket.gethostname()
port
= 12345server_socket.bind((host, port))
server_socket.listen(
5)

print(
"等待客户端连接...")whileTrue:
client_socket, client_address
=server_socket.accept()
print(
"连接地址:", client_address)
client_thread
= threading.Thread(target=handle_client, args=(client_socket,))
client_thread.start()
# 客户端
import socket

client_socket
=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host
=socket.gethostname()
port
= 12345client_socket.connect((host, port))whileTrue:
message
= input("客户端消息:")
client_socket.send(message.encode(
'utf-8'))
data
= client_socket.recv(1024).decode('utf-8')
print(
"服务器消息:", data)

client_socket.close()

通过在服务器端的主循环中创建新的线程来处理每个客户端的连接,可以实现同时处理多个客户端的请求,提高服务器的并发处理能力。

6. 使用HTTP协议进行网络通信

HTTP(HyperText Transfer Protocol)是一种用于传输超文本文档(如HTML)的应用层协议。在网络编程中,基于HTTP协议的通信方式更为常见,特别是在Web开发和API交互中。Python提供了多种库用于HTTP通信,其中最常用的是
requests
库。

以下是一个使用
requests
库发送HTTP GET请求的示例:

import requests

url
= 'https://api.github.com'response= requests.get(url)

print(
"状态码:", response.status_code)
print(
"响应内容:", response.text)

通过
requests.get()
函数可以发送HTTP GET请求,并获取响应的状态码和内容。
requests
库还提供了丰富的参数和方法,用于处理各种HTTP请求和响应,如设置请求头、传递参数、处理Cookie等。

7. 使用Socket进行多线程网络通信

在实际应用中,往往需要处理多个客户端的连接请求。为了实现高并发处理,可以使用多线程来处理每个客户端的连接。Python的
threading
模块提供了多线程支持,可以很方便地实现多线程网络通信。

以下是一个使用多线程处理Socket连接的示例:

# 服务器端
import socket
import threading

def handle_client(client_socket):
whileTrue:
data
= client_socket.recv(1024).decode('utf-8')ifnot data:breakprint("客户端消息:", data)
message
= input("服务器消息:")
client_socket.send(message.encode(
'utf-8'))
client_socket.close()

server_socket
=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host
=socket.gethostname()
port
= 12345server_socket.bind((host, port))
server_socket.listen(
5)

print(
"等待客户端连接...")whileTrue:
client_socket, client_address
=server_socket.accept()
print(
"连接地址:", client_address)
client_thread
= threading.Thread(target=handle_client, args=(client_socket,))
client_thread.start()
# 客户端
import socket

client_socket
=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host
=socket.gethostname()
port
= 12345client_socket.connect((host, port))whileTrue:
message
= input("客户端消息:")
client_socket.send(message.encode(
'utf-8'))
data
= client_socket.recv(1024).decode('utf-8')
print(
"服务器消息:", data)

client_socket.close()

通过在服务器端的主循环中创建新的线程来处理每个客户端的连接,可以实现同时处理多个客户端的请求,提高服务器的并发处理能力。

8. 使用Socket进行多线程网络通信

在实际应用中,往往需要处理多个客户端的连接请求。为了实现高并发处理,可以使用多线程来处理每个客户端的连接。Python的
threading
模块提供了多线程支持,可以很方便地实现多线程网络通信。

以下是一个使用多线程处理Socket连接的示例:

# 服务器端
import socket
import threading

def handle_client(client_socket):
whileTrue:
data
= client_socket.recv(1024).decode('utf-8')ifnot data:breakprint("客户端消息:", data)
message
= input("服务器消息:")
client_socket.send(message.encode(
'utf-8'))
client_socket.close()

server_socket
=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host
=socket.gethostname()
port
= 12345server_socket.bind((host, port))
server_socket.listen(
5)

print(
"等待客户端连接...")whileTrue:
client_socket, client_address
=server_socket.accept()
print(
"连接地址:", client_address)
client_thread
= threading.Thread(target=handle_client, args=(client_socket,))
client_thread.start()
# 客户端
import socket

client_socket
=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host
=socket.gethostname()
port
= 12345client_socket.connect((host, port))whileTrue:
message
= input("客户端消息:")
client_socket.send(message.encode(
'utf-8'))
data
= client_socket.recv(1024).decode('utf-8')
print(
"服务器消息:", data)

client_socket.close()

通过在服务器端的主循环中创建新的线程来处理每个客户端的连接,可以实现同时处理多个客户端的请求,提高服务器的并发处理能力。

总结

本文深入介绍了Python中的网络编程,重点探讨了两种主要方式:Socket编程和基于HTTP协议的网络编程。首先,我们了解了Socket编程,它是一种底层的网络通信方式,可以实现自定义的通信协议,具有灵活性和高度控制性。我们通过示例演示了如何使用Socket编程在服务器端和客户端之间进行简单的通信,并介绍了如何使用多线程来处理多个客户端的连接请求,以提高服务器的并发处理能力。

其次,我们介绍了基于HTTP协议的网络编程,这是一种更高层次的抽象,适用于构建Web应用、访问API等场景。我们使用了
requests
库来发送HTTP请求,并获取服务器的响应,演示了如何发送GET和POST请求,并处理响应的状态码和内容。基于HTTP协议的网络编程更简单易用,适合于与现有的Web服务进行交互。

通过本文的学习,我们可以了解到Python提供了丰富的网络编程工具和库,使得开发者能够轻松实现各种网络应用。无论是底层的Socket编程还是基于HTTP协议的网络编程,都可以满足不同场景下的需求。掌握网络编程技术对于开发网络应用和系统非常重要,希望本文能够帮助读者更好地理解和应用Python中的网络编程技术,为其在项目开发中提供帮助和启发。

点击关注,第一时间了解华为云新鲜技术~

相关文章

数据库系列:MySQL慢查询分析和性能优化
数据库系列:MySQL索引优化总结(综合版)
数据库系列:高并发下的数据字段变更
数据库系列:覆盖索引和规避回表
数据库系列:数据库高可用及无损扩容
数据库系列:使用高区分度索引列提升性能
数据库系列:前缀索引和索引长度的取舍
数据库系列:MySQL引擎MyISAM和InnoDB的比较
数据库系列:InnoDB下实现高并发控制
数据库系列:事务的4种隔离级别
数据库系列:RR和RC下,快照读的区别
数据库系列:MySQL InnoDB锁机制介绍
数据库系列:MySQL不同操作分别用什么锁?
数据库系列:业内主流MySQL数据中间件梳理
数据库系列:大厂使用数据库中间件解决什么问题?
数据库系列:索引失效场景总结

1 背景

我们在之前的一篇文章《
数据库系列:MySQL InnoDB锁机制介绍
》中介绍过InnodB引擎下几种常见锁的机制和原理。而在实际的
select...for update
操作中,锁影响的范围还是有区别的,下面就详细讨论下
select
操作中的加锁规则。

2 回顾常见的锁类型

★InnoDB默认的事务隔离级别为可重复读(Repeated Read, RR),我们当下的所有介绍都是基于这个隔离级别为前提的。

  • 记录锁(Record Locks):锁定单一行记录,InnoDB 使用记录锁来实现行级锁,这样允许多个事务并发访问不同的行。
  • 间隙锁(Gap Locks):InnoDB 的特性,用于锁定一个范围,但不包括实际的记录。这主要用于防止幻读(Phantom Reads)。
  • 临键锁(Next-Key Locks):InnoDB 存储引擎的一种锁定机制,在执行查询语句时,根据查询条件所锁定的一个范围。这个范围中包含有间隙锁和记录锁。它的设计目的是为了解决幻读(Phantom Reads)。

2.1 记录锁(Record Locks)

记录锁一般在使用主键或者唯一索引进行查找时体现

记录锁,它封锁索引记录,例如:

select * from table where id=5 for update;

它会在id=1的索引记录上加锁,以阻止其他事务插入,更新,删除id=1的这一行。

需要说明的是:

select * from table where id=5;

则是快照读(SnapShot Read),它并不加锁,快照读可以参考作者这篇文章:
数据库系列:RR和RC下,快照读的区别

2.2 间隙锁(Gap Locks)

间隙锁通常在不使用唯一索引进行范围查找时出现

间隙锁,它封锁索引记录中的间隔,或者第一条索引记录之前的范围,又或者最后一条索引记录之后的范围。
延续上面的那个例子继续演示:

# 表结构
users (Id PK, Name , Company);

# 表中包含四条记录
5, Gates, Microsoft
7, Bezos, Amazon
11, Jobs, Apple
14, Elison, Oracle

执行SQL语句如下:

select * from users
    where id between 7 and 13 
    for update;

-- 假设我们要删除id在7到13之间的所有用户记录(不包括id=7和id=13)  
DELETE FROM users WHERE id BETWEEN 7 AND 13;

这样的话,会封锁数据的区间,以防止其他事务插入id=8的记录。
假设没有间隙锁,则可能够插入成功,而之前的select事务,会发现检索的结果集莫名多了一条记录,即幻影数据。
所以间隙锁主要目的用于防止幻读(Phantom Reads),避免其他事务在间隔中插入数据,导致 『不可重复读』。

如果把事务的隔离级别降级为读提交(Read Committed, RC),对,就是互联网最常用的隔离级别,间隙锁则会自动失效。

2.3 临键锁(Next-Key Locks)

临键锁(Next-Key Locks)是数据库管理系统InnoDB中的一种重要锁定机制。这种锁是查询时根据查询条件锁定的一个范围,这个范围包括间隙锁和记录锁,左开右闭,即不锁住左边界,但会锁住右边界。临键锁的主要设计目的是为了解决所谓的“幻读”问题。

# 左开右闭 示例
(-infinity, 1]
(1, 7]
(7, +infinity)

依然沿用上面的例子,InnoDB引擎,RR隔离级别:

-- 创建一个示例表  
CREATE TABLE users (  
    Id INT PRIMARY KEY,  
    Name VARCHAR(255) NOT NULL,  
    Company VARCHAR(255) NOT NULL,  
);  
  
-- 插入一些示例数据  
INSERT INTO users (id, name, company) VALUES (1, 'Alice', 'ali');
INSERT INTO users (id, name, company) VALUES (2, 'Brand', 'tencent');
INSERT INTO users (id, name, company) VALUES (3, 'Charlie', 'baidu');
  
-- 开始一个事务,并使用临键锁查询数据  
START TRANSACTION;  
SELECT * FROM users WHERE id > 1 FOR UPDATE;  
  
-- 在另一个事务中尝试插入新数据,将会被阻塞直到第一个事务释放锁 
START TRANSACTION;  
INSERT INTO users (id, name, age) VALUES (4, 'David', 30);  
COMMIT;  
  
-- 第一个事务提交后,第二个事务可以继续执行插入操作  
COMMIT;

临键锁的主要目的,也是为了避免幻读(Phantom Read),在事务隔离级别为可重复读的情况下,InnoDB存储引擎默认使用临键锁。这种锁提供了一种有效的机制来保证在并发环境中数据的完整性和一致性。
如果把事务的隔离级别降级为RC,临键锁则也会失效。

3 不同select操作的加锁规则

3.0 前置条件

# 表结构(姓名、公司、工号)
userinfo (Id PK, username, company, usercode);

# 表中包含四条记录
5, Gates, Microsoft, 24
7, Bezos, Amazon,35
11, Jobs, Apple,37
14, Elison, Oracle,38

3.1 主键检索

1. 记录存在的情况

# 5是存在的记录,行锁
mysql> select * from userinfo where id=5 for update;

mysql> update userinfo set username = "Brand" where id = 5;
ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction

# X 排他锁
# RECORD 记录锁
mysql> select * from performance_schema.data_lock_waits;
+---------------+-------------+
| lock_mode     | lock_type|
+---------------+-------------+
| X             | RECORD   |
+---------------+-------------+

2. 记录不存在的情况

# 6是不存在的记录,间隙锁,锁住的区间为(5,7),对应上面的前置条件
mysql> select * from userinfo where id = 6 for update;

mysql>  insert into user values(6, 'Brand', 'Ali',100);
ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transactio

# X 排他锁 + Gap 间隙锁
# RECORD 记录锁
mysql> select * from performance_schema.data_lock_waits;
+---------------+-------------+
| lock_mode     | lock_type|
+---------------+-------------+
| X,GAP         | RECORD   |
+---------------+-------------+

3.2 唯一索引检索

与主键检索结果一致,因为这两种都是可以唯一确定索引值和区间范围的。

3.3 普通索引检索

1. 记录存在的情况

# 24是存在的记录,更新行锁,插入间隙锁。24要算在内,锁住的区间为 usercode的[24,35),对应上面的前置条件
mysql> select * from userinfo where usercode = 24 for update;

mysql> insert into user values(6, 'Brand', 'Ali',25);
ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction

# X 排他锁
# RECORD 记录锁  + Gap 间隙锁
mysql> select * from performance_schema.data_lock_waits;
+---------------+-------------+
| lock_mode     | lock_type|
+---------------+-------------+
| X,GAP         | RECORD   |
+---------------+-------------+

2. 记录不存在的情况

# 25是不存在的记录,间隙锁,锁住的区间为 usercode的(24,35),对应上面的前置条件
mysql> select * from userinfo where id = 25 for update;

mysql>  insert into user values(6, 'Brand', 'Ali',26);
ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transactio

# X 排他锁 + Gap 间隙锁
# RECORD 记录锁
mysql> select * from performance_schema.data_lock_waits;
+---------------+-------------+
| lock_mode     | lock_type|
+---------------+-------------+
| X,GAP         | RECORD   |
+---------------+-------------+

3.4 索引的范围检索

索引包括主键(默认)、唯一索引和其他普通索引

mysql> select * from userinfo where id > 4 for update;

mysql> insert into user values(66, 'Brand', 'Ali',25);
ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction

# X 排他锁 + Gap 间隙锁
# RECORD 记录锁
mysql> select * from performance_schema.data_lock_waits;
+---------------+-------------+
| lock_mode     | lock_type|
+---------------+-------------+
| X,GAP         | RECORD   |
+---------------+-------------+

可以对 id <= 4 的数据进行更新(如果有的话),而且他的数据都会被锁住,锁住的Id字段的范围是为:

(5, 7], (7, 11], (11,14], (14, +infinity) 

3.5 普通检索(无索引)

表锁,因为需要扫描整张表。扫描期间所有的操作都不能被获取或变更。

4 总结

  • 事务隔离级别为可重复读(Repeated Read, RR)
  • 以主键或唯一索引作为查询条件,有存在值(记录)时是行锁,不存在值时触发间隙锁。
  • 普通索引作为查询条件,恒定间隙锁。
  • 索引作为查询条件,并以范围取值时,产生间隙锁。
  • 无索引时的普通检索,产生表锁。

前言

前段时间我们从
SkyWalking
切换到了
OpenTelemetry
,与此同时之前使用 SkyWalking 编写的插件也得转移到 OpenTelemetry 体系下。

我也写了相关介绍文章:
实战:
如何优雅的从 SkyWalking 切换到 OpenTelemetry

好在 OpenTelemetry 社区也提供了 Extensions 的扩展开发,我们可以不用去修改社区发行版:
opentelemetry-javaagent.jar
的源码也可以扩展其中的能力。

比如可以:

  • 修改一些 trace,某些 span 不想记录等。
  • 新增 metrics

这次我准备编写的插件也是和 metrics 有关的,因为 pulsar 的 Java sdk 中并没有暴露客户端的一些监控指标,所以我需要在插件中拦截到一些关键函数,然后执行暴露出指标。

截止到本文编写的时候, Pulsar 社区也已经将
Java-client
集成
了 OpenTelemetry,后续正式发版后我这个插件也可以光荣退休了。


由于 OpenTelemetry 社区还处于高速发展阶段,我在中文社区没有找到类似的参考文章(甚至英文社区也没有,只有一些 example 代码,或者是只有去社区成熟插件里去参考代码)

其中也踩了不少坑,所以觉得非常有必要分享出来帮助大家减少遇到同类问题的机会。

开发流程

OpenTelemetry extension 的写法其实和
skywalking
相似,都是用的
bytebuddy
这个字节码增强库,只是在一些 API 上有一些区别。

创建项目

首先需要创建一个 Java 项目,这里我直接参考了官方的示例,使用了 gradle 进行管理(理论上 maven 也是可以的,只是要找到在 gradle 使用的 maven 插件)。

这里贴一下简化版的
build.gradle
文件:

plugins {
    id 'java'
    id "com.github.johnrengelman.shadow" version "8.1.1"
    id "com.diffplug.spotless" version "6.24.0"
}

group = 'com.xx.otel.extensions'
version = '1.0.0'

ext {
    versions = [
            // this line is managed by .github/scripts/update-sdk-version.sh
            opentelemetrySdk           : "1.34.1",

            // these lines are managed by .github/scripts/update-version.sh
            opentelemetryJavaagent     : "2.1.0-SNAPSHOT",
            opentelemetryJavaagentAlpha: "2.1.0-alpha-SNAPSHOT",

            junit                      : "5.10.1"
    ]

    deps = [
    // 自动生成服务发现 service 文件
            autoservice: dependencies.create(group: 'com.google.auto.service', name: 'auto-service', version: '1.1.1')
    ]
}

repositories {
    mavenLocal()
    maven { url "https://maven.aliyun.com/repository/public" }
    mavenCentral()
}

configurations {
    otel
}


dependencies {

    implementation(platform("io.opentelemetry:opentelemetry-bom:${versions.opentelemetrySdk}"))

    /*
    Interfaces and SPIs that we implement. We use `compileOnly` dependency because during
    runtime all necessary classes are provided by javaagent itself.
     */
    compileOnly 'io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi:1.34.1'
    compileOnly 'io.opentelemetry.instrumentation:opentelemetry-instrumentation-api:1.32.0'
    compileOnly 'io.opentelemetry.javaagent:opentelemetry-javaagent-extension-api:1.32.0-alpha'

    //Provides @AutoService annotation that makes registration of our SPI implementations much easier
    compileOnly deps.autoservice
    annotationProcessor deps.autoservice

    // https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-client
    compileOnly 'org.apache.pulsar:pulsar-client:2.8.0'

}

test {
    useJUnitPlatform()
}

然后便是要创建 javaagent 的一个核心类:

@AutoService(InstrumentationModule.class)  
public class PulsarInstrumentationModule extends InstrumentationModule {
    public PulsarInstrumentationModule() {
        super("pulsar-client-metrics", "pulsar-client-metrics-2.8.0");
    }	
}

在这个类中定义我们插件的名称,同时使用
@AutoService
注解可以在打包的时候帮我们在
META-INF/services/
目录下生成 SPI 服务发现的文件:

这是一个 Google 的插件,本质是插件是使用 SPI 的方式进行开发的。

关于 SPI 以前也写过一篇文章,不熟的朋友可以用作参考:

创建 Instrumentation

之后就需要创建自己的 Instrumentation,这里可以把它理解为自己的拦截器,需要配置对哪个类的哪个函数进行拦截:

public class ProducerCreateImplInstrumentation implements TypeInstrumentation {

    @Override
    public ElementMatcher<TypeDescription> typeMatcher() {
        return named("org.apache.pulsar.client.impl.ProducerBuilderImpl");
    }
    @Override
    public void transform(TypeTransformer transformer) {
        transformer.applyAdviceToMethod(
                isMethod()
                        .and(named("createAsync")),
                ProducerCreateImplInstrumentation.class.getName() + "$ProducerCreateImplConstructorAdvice");
    }

比如这就是对
ProducerBuilderImpl
类的 createAsync 创建函数进行拦截,拦截之后的逻辑写在了
ProducerCreateImplConstructorAdvice
类中。

值得注意的是对一些继承和实现类的拦截方式是不相同的:

@Override  
public ElementMatcher<TypeDescription> typeMatcher() {  
    return extendsClass(named(ENHANCE_CLASS));  
    // return implementsInterface(named(ENHANCE_CLASS));
}

从这两个函数名称就能看出,分别是针对继承和实现类进行拦截的。

这里的 API 比 SkyWalking 的更易读一些。

之后需要把我们自定义的 Instrumentation 注册到刚才的 PulsarInstrumentationModule 类中:

    @Override
    public List<TypeInstrumentation> typeInstrumentations() {
        return Arrays.asList(
                new ProducerCreateImplInstrumentation(),
                new ProducerCloseImplInstrumentation(),
                );
    }

有多个的话也都得进行注册。

编写切面代码

之后便是编写我们自定义的切面逻辑了,也就是刚才自定义的
ProducerCreateImplConstructorAdvice
类:

    public static class ProducerCreateImplConstructorAdvice {

        @Advice.OnMethodEnter(suppress = Throwable.class)
        public static void onEnter() {
            // inert your code
            MetricsRegistration.registerProducer();
        }

        @Advice.OnMethodExit(suppress = Throwable.class)
        public static void after(
                @Advice.Return CompletableFuture<Producer> completableFuture) {
            try {
                Producer producer = completableFuture.get();
                CollectionHelper.PRODUCER_COLLECTION.addObject(producer);
            } catch (Throwable e) {
                System.err.println(e.getMessage());
            }
        }
    }

可以看得出来其实就是两个核心的注解:

  • @Advice.OnMethodEnter
    切面函数调用之前
  • @Advice.OnMethodExit
    切面函数调用之后

还可以在
@Advice.OnMethodExit
的函数中使用
@Advice.Return
获得函数调用的返回值。

当然也可以使用
@Advice.This
来获取切面的调用对象。

编写自定义 metrics

因为我这个插件的主要目的是暴露一些自定义的 metrics,所以需要使用到
io.opentelemetry.api.metrics
这个包:

这里以 Producer 生产者为例,整体流程如下:

  • 创建生产者的时候将生产者对象存储起来
  • OpenTelemetry 框架会每隔一段时间回调一个自定义的函数
  • 在这个函数中遍历所有的 producer 获取它的监控指标,然后暴露出去。

注册函数:

public static void registerObservers() {  
    Meter meter = MetricsRegistration.getMeter();  
  
    meter.gaugeBuilder("pulsar_producer_num_msg_send")  
            .setDescription("The number of messages published in the last interval")  
            .ofLongs()  
            .buildWithCallback(  
                    r -> recordProducerMetrics(r, ProducerStats::getNumMsgsSent));


private static void recordProducerMetrics(ObservableLongMeasurement observableLongMeasurement, Function<ProducerStats, Long> getter) {  
    for (Producer producer : CollectionHelper.PRODUCER_COLLECTION.list()) {  
        ProducerStats stats = producer.getStats();  
        String topic = producer.getTopic();  
        if (topic.endsWith(RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX)) {  
            continue;  
        }        observableLongMeasurement.record(getter.apply(stats),  
                Attributes.of(PRODUCER_NAME, producer.getProducerName(), TOPIC, topic));  
    }}

回调函数,在这个函数中遍历所有的生产者,然后读取它的监控指标。

这样就完成了一个自定义指标的暴露,使用的时候只需要加载这个插件即可:

java -javaagent:opentelemetry-javaagent.jar \
     -Dotel.javaagent.extensions=ext.jar
     -jar myapp.jar

-Dotel.javaagent.extensions=/extensions
当然也可以指定一个目录,该目录下所有的 jar 都会被作为 extensions 被加入进来。

打包

使用
./gradlew build
打包,之后可以在
build/libs/
目录下找到生成物。

当然也可以将 extension 直接打包到
opentelemetry-javaagent.jar
中,这样就可以不用指定
-Dotel.javaagent.extensions
参数了。

具体可以在 gradle 中加入以下 task:

task extendedAgent(type: Jar) {
  dependsOn(configurations.otel)
  archiveFileName = "opentelemetry-javaagent.jar"
  from zipTree(configurations.otel.singleFile)
  from(tasks.shadowJar.archiveFile) {
    into "extensions"
  }
  //Preserve MANIFEST.MF file from the upstream javaagent
  doFirst {
    manifest.from(
      zipTree(configurations.otel.singleFile).matching {
        include 'META-INF/MANIFEST.MF'
      }.singleFile
    )
  }
}

具体可以参考这里的配置:
https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/examples/extension/build.gradle#L125

踩坑

看起来这个开发过程挺简单的,但其中的坑还是不少。

NoClassDefFoundError

首先第一个就是我在调试过程中出现
NoClassDefFoundError
的异常。

但我把打包好的 extension 解压后明明是可以看到这个类的。

排查一段时间后没啥头绪,我就从头仔细阅读了开发文档:

发现我们需要重写
getAdditionalHelperClassNames
函数,用于将我们外部的一些工具类加入到应用的 class loader 中,不然在应用在运行的时候就会报
NoClassDefFoundError
的错误。

因为是字节码增强的关系,所以很多日常开发觉得很常见的地方都不行了,比如:

  • 如果切面类是一个内部类的时候,必须使用静态函数
  • 只能包含静态函数
  • 不能包含任何字段,常量。
  • 不能使用任何外部类,如果要使用就得使用
    getAdditionalHelperClassNames
    额外加入到 class loader 中(这一条就是我遇到的问题)
  • 所有的函数必须使用
    @Advice
    注解

以上的内容其实在文档中都有写:

所以还是得仔细阅读文档。

缺少异常日志

其实上述的异常刚开始都没有打印出来,只有一个现象就是程序没有正常运行。

因为没有日志也不知道如何排查,也怀疑是不是运行过程中报错了,所以就尝试把
@Advice
注解的函数全部 try catch ,果然打印了上述的异常日志。

之后我注意到了注解的这个参数,原来在默认情况下是不会打印任何日志的,需要手动打开。

比如这样:
@Advice.OnMethodExit(suppress = Throwable.class)

调试日志

最后就是调试功能了,因为我这个插件的是把指标发送到 OpenTelemetry-collector ,再由它发往
VictoriaMetrics/Prometheus
;由于整个链路比较长,我想看到最终生成的指标是否正常的干扰条件太多了。

好在 OpenTelemetry 提供了多种 metrics.exporter 的输出方式:

  • -Dotel.metrics.exporter=otlp (default),默认通过 otlp 协议输出到 collector 中。
  • -Dotel.metrics.exporter=logging,以 stdout 的方式输出到控制台,主要用于调试
  • -Dotel.metrics.exporter=logging-otlp
  • -Dotel.metrics.exporter=prometheus,以 Prometheus 的方式输出,还可以配置端口,这样也可以让 Prometheus 进行远程采集,同样的也可以在本地调试。

采用哪种方式可以根据环境情况自行选择。

Opentelemetry-operator 配置 extension

最近在使用
opentelemetry-operator
注入 agent 的时候发现 operator 目前并不支持配置 extension,所以在社区也提交了一个
草案
,下周会尝试提交一个 PR 来新增这个特性。

这个需求我在 issue 列表中找到了好几个,时间也挺久远了,不太确定为什么社区还为实现。

目前 operator 只支持在自定义镜像中配置
javaagent.jar
,无法配置 extension:

这个原理在之前的
文章
中有提到。

apiVersion: opentelemetry.io/v1alpha1
kind: Instrumentation
metadata:
  name: my-instrumentation
spec:
  java:
    image: your-customized-auto-instrumentation-image:java

我的目的是可以在自定义镜像中把 extension 也复制进去,类似于这样:

FROM busybox

ADD open-telemetry/opentelemetry-javaagent.jar /javaagent.jar

# Copy extensions to specify a path.
ADD open-telemetry/ext-1.0.0.jar /ext-1.0.0.jar

RUN chmod -R go+r /javaagent.jar
RUN chmod -R go+r /ext-1.0.0.jar

然后在 CRD 中配置这个
extension
的路径:

apiVersion: opentelemetry.io/v1alpha1
kind: Instrumentation
metadata:
  name: my-instrumentation
spec:
  java:
    image: custom-image:1.0.0
    extensions: /ext-1.0.0.jar
    env:
    # If extension.jar already exists in the container, you can only specify a specific path with this environment variable.
      - name: OTEL_EXTENSIONS_DIR
        value: /custom-dir

这样 operator 在拿到 extension 的路径时,就可以在环境变量中加入
-Dotel.javaagent.extensions=${java.extensions}
参数,从而实现自定义 extension 的目的。

总结

整个过程其实并不复杂,只是由于目前用的人还不算多,所以也很少有人写教程或者文章,相信用不了多久就会慢慢普及。

这里有一些官方的
example
可以参考。

参考链接:

引言

Redis作为一款高效的内存数据存储系统,凭借其优异的读写性能和丰富的数据结构支持,被广泛应用于缓存层以提升整个系统的响应速度和吞吐量。尤其是在与关系型数据库(如MySQL、PostgreSQL等)结合使用时,通过将热点数据存储在Redis中,可以在很大程度上缓解数据库的压力,提高整体系统的性能表现。

然而,在这种架构中,一个不容忽视的问题就是如何确保Redis缓存与数据库之间的双写一致性。所谓双写一致性,是指当数据在数据库中发生变更时,能够及时且准确地反映在Redis缓存中,反之亦然,以避免出现因缓存与数据库数据不一致导致的业务逻辑错误或用户体验下降。尤其在高并发场景下,由于网络延迟、并发控制等因素,保证双写一致性变得更加复杂。

在实际业务开发中,若不能妥善处理好缓存与数据库的双写一致性问题,可能会带来诸如数据丢失、脏读、重复读等一系列严重影响系统稳定性和可靠性的后果。本文将尝试剖析这一问题,介绍日常开发中常用的一些策略和模式,并结合具体场景分析不同的解决方案,为大家提供一些有力的技术参考和支持。

image.png

谈谈分布式系统中的一致性

分布式系统中的一致性指的是在多个节点上存储和处理数据时,确保系统中的数据在不同节点之间保持一致的特性。在分布式系统中,一致性通常可以分为以下几个类别:

  1. 强一致性

    所有节点在任何时间都看到相同的数据。任何更新操作都会立即对所有节点可见,保证了数据的强一致性。这意味着,如果一个节点完成了写操作,那么所有其他节点读取相同的数据之后,都将看到最新的结果。强一致性通常需要付出更高的代价,例如增加通信开销和降低系统的可用性。

  2. 弱一致性:
    系统中的数据在某些情况下可能会出现不一致的状态,但最终会收敛到一致状态。弱一致性下的系统允许在一段时间内,不同节点之间看到不同的数据状态。弱一致性通常用于需要在性能和一致性之间进行权衡的场景,例如缓存系统等。

  3. 最终一致性:
    是弱一致性的一种特例,它保证了在经过一段时间后,系统中的所有节点最终都会达到一致状态。尽管在数据更新时可能会出现一段时间的不一致,但最终数据会收敛到一致状态。最终一致性通常通过一些技术手段来实现,例如基于版本向量或时间戳的数据复制和同步机制。

除此之外,还有一些其他的一致性类别,例如:因果一致性,顺序一致性,基于本篇文章讨论的重点,我们暂且只讨论以上三种一致性类别。

什么是双写一致性问题?

在分布式系统中,双写一致性主要指在一个数据同时存在于缓存(如Redis)和持久化存储(如数据库)的情况下,任何一方的数据更新都必须确保另一方数据的同步更新,以保持双方数据的一致状态。这一问题的核心在于如何在并发环境下正确处理缓存与数据库的读写交互,防止数据出现不一致的情况。

典型场景分析

  1. 写数据库后忘记更新缓存

    当直接对数据库进行更新操作而没有相应地更新缓存时,后续的读请求可能仍然从缓存中获取旧数据,导致数据的不一致。

  2. 删除缓存后数据库更新失败:
    在某些场景下,为了保证数据新鲜度,会在更新数据库前先删除缓存。但如果数据库更新过程中出现异常导致更新失败,那么缓存将长时间处于空缺状态,新的查询将会直接命中数据库,加重数据库压力,并可能导致数据版本混乱。

  3. 并发环境下读写操作的交错执行

    在高并发场景下,可能存在多个读写请求同时操作同一份数据的情况。比如,在删除缓存、写入数据库的过程中,新的读请求获取到了旧的数据库数据并放入缓存,此时就出现了数据不一致的现象。

  4. 主从复制延迟与缓存失效时间窗口冲突

    对于具备主从复制功能的数据库集群,主库更新数据后,存在一定的延迟才将数据同步到从库。如果在此期间缓存刚好过期并重新从数据库加载数据,可能会从尚未完成同步的从库读取到旧数据,进而导致缓存与主库数据的不一致。

数据不一致不仅会导致业务逻辑出错,还可能引发用户界面展示错误、交易状态不准确等问题,严重时甚至会影响系统的正常运行和用户体验。

解决双写一致性问题的主要策略

在解决Redis缓存与数据库双写一致性问题上,有多种策略和模式。我们主要介绍以下几种主要的策略:

Cache Aside Pattern(旁路缓存模式)

Cache Aside Pattern 是一种在分布式系统中广泛采用的缓存和数据库协同工作策略,在这个模式中,数据以数据库为主存储,缓存作为提升读取效率的辅助手段。也是日常中比较常见的一种手段。其工作流程如下:
image.png

由上图我们可以看出Cache Aside Pattern的工作原理:

  • 读取操作
    :首先尝试从缓存中获取数据,如果缓存命中,则直接返回;否则,从数据库中读取数据并将其放入缓存,最后返回给客户端。
  • 更新操作
    :当需要更新数据时,首先更新数据库,然后再清除或使缓存中的对应数据失效。这样一来,后续的读请求将无法从缓存获取数据,从而迫使系统从数据库加载最新的数据并重新填充缓存。

我们从更新操作上看会发现两个很有意思的问题:

为什么操作缓存的时候是删除旧缓存而不是直接更新缓存?

我们举例模拟下并发环境下的更新DB&缓存:

  • 线程A先发起一个写操作,第一步先更新数据库,然后更新缓存
  • 线程B再发起一个写操作,第二步更新了数据库,然后更新缓存
    当以上两个线程的执行,如果严格先后顺序执行,那么对于更新缓存还是删除缓存去操作缓存都可以,但是如果两个线程同时执行时,由于网络或者其他原因,导致线程B先执行完更新缓存,然后线程A才会更新缓存。如下图:
    image.png

这时候缓存中保存的就是线程A的数据,而数据库中保存的是线程B的数据。这时候如果读取到的缓存就是脏数据。但是如果使用删除缓存取代更新缓存,那么就不会出现这个脏数据。这种方式可以简化并发控制、保证数据一致性、降低操作复杂度,并能更好地适应各种潜在的异常场景和缓存策略。尽管这种方法可能会增加一次数据库访问的成本,但在实际应用中,考虑到数据的一致性和系统的健壮性,这是值得付出的折衷。

并且在写多读少的情况下,数据很多时候并不会被读取到,但是一直被频繁的更新,这样也会浪费性能。实际上,写多的场景,用缓存也不是很划算。只有在读多写少的情况下使用缓存才会发挥更大的价值。

为什么是先操作数据库再操作缓存?

在操作缓存时,为什么要先操作数据库而不是先操作缓存?我们同样举例模拟两个线程,线程A写入数据,先删除缓存在更新DB,线程B读取数据。流程如下:

  1. 线程A发起一个写操作,第一步删除缓存
  2. 此时线程B发起一个读操作,缓存中没有,则继续读DB,读出来一个老数据
  3. 然后线程B把老数据放入缓存中
  4. 线程A更新DB数据

image.png

所以这样就会出现缓存中存储的是旧数据,而数据库中存储的是新数据,这样就出现脏数据,所以我们一般都采取先操作数据库,在操作缓存。这样后续的读请求从数据库获取最新数据并重新填充缓存。这样的设计降低了数据不一致的风险,提升了系统的可靠性。同时,这也符合CAP定理中对于一致性(Consistency)和可用性(Availability)权衡的要求,在很多场景下,数据一致性被优先考虑。

Cache Aside Pattern相对简单直观,容易理解和实现。只需要简单的判断和缓存失效逻辑即可,对已有系统的改动较小。并且由于缓存是按需加载的,所以不会浪费宝贵的缓存空间存储未被访问的数据,同时我们可以根据实际情况决定何时加载和清理缓存。

尽管Cache Aside Pattern在大多数情况下可以保证最终一致性,但它并不能保证强一致性。在数据库更新后的短暂时间内(还未开始操作缓存),如果有读请求发生,缓存中仍是旧数据,但是实际数据库中已是最新数据,造成短暂的数据不一致。在并发环境下,特别是在更新操作时,有可能在更新数据库和删除缓存之间的时间窗口内,新的读请求加载了旧数据到缓存,导致不一致。

Read-Through/Write-Through(读写穿透)

Read-Through 和 Write-Through 是两种与缓存相关的策略,它们主要用于缓存系统与持久化存储之间的数据交互,旨在确保缓存与底层数据存储的一致性。

Read-Through(读穿透)

Read-Through 是一种在缓存中找不到数据时,自动从持久化存储中加载数据并回填到缓存中的策略。具体执行流程如下:

  • 客户端发起读请求到缓存系统。
  • 缓存系统检查是否存在请求的数据。
  • 如果数据不在缓存中,缓存系统会透明地向底层数据存储(如数据库)发起读请求。
  • 数据库返回数据后,缓存系统将数据存储到缓存中,并将数据返回给客户端。
  • 下次同样的读请求就可以直接从缓存中获取数据,提高了读取效率。

image.png

整体简要流程类似
Cache Aside Pattern
,但在缓存未命中的情况下,Read-Through 策略会自动隐式地从数据库加载数据并填充到缓存中,而无需应用程序显式地进行数据库查询。

Cache Aside Pattern 更多地依赖于应用程序自己来管理缓存与数据库之间的数据流动,包括缓存填充、失效和更新。而Read-Through Pattern 则是在缓存系统内部实现了一个更加自动化的过程,使得应用程序无需关心数据是从缓存还是数据库中获取,以及如何保持两者的一致性。在Read-Through 中,缓存系统承担了更多的职责,实现了更紧密的缓存与数据库集成,从而简化了应用程序的设计和实现。

Write-Through(写穿透)

Write-Through 是一种在缓存中更新数据时,同时将更新操作同步到持久化存储的策略。具体流程如下:

  • 当客户端向缓存系统发出写请求时,缓存系统首先更新缓存中的数据。
  • 同时,缓存系统还会把这次更新操作同步到底层数据存储(如数据库)。
  • 当数据在数据库中成功更新后,整个写操作才算完成。
  • 这样,无论是从缓存还是直接从数据库读取,都能得到最新一致的数据。

image.png

Read-Through 和 Write-Through 的共同目标是确保缓存与底层数据存储之间的一致性,并通过自动化的方式隐藏了缓存与持久化存储之间的交互细节,简化了客户端的处理逻辑。这两种策略经常一起使用,以提供无缝且一致的数据访问体验,特别适用于那些对数据一致性要求较高的应用场景。然而,需要注意的是,虽然它们有助于提高数据一致性,但在高并发或网络不稳定的情况下,仍然需要考虑并发控制和事务处理等问题,以防止数据不一致的情况发生。

Write behind (异步缓存写入)

Write Behind(异步缓存写入),也称为 Write Back(回写)或 异步更新策略,是一种在处理缓存与持久化存储(如数据库)之间数据同步时的策略。在这种模式下,当数据在缓存中被更新时,并非立即同步更新到数据库,而是将更新操作暂存起来,随后以异步的方式批量地将缓存中的更改写入持久化存储。其流程如下:

  • 应用程序首先在缓存中执行数据更新操作,而不是直接更新数据库。
  • 缓存系统会将此次更新操作记录下来,暂存于一个队列(如日志文件或内存队列)中,而不是立刻同步到数据库。
  • 在后台有一个独立的进程或线程定期(或者当队列积累到一定大小时)从暂存队列中取出更新操作,然后批量地将这些更改写入数据库。

image.png

使用 Write Behind 策略时,由于更新并非即时同步到数据库,所以在异步处理完成之前,如果缓存或系统出现故障,可能会丢失部分更新操作。并且对于高度敏感且要求强一致性的数据,Write Behind 策略并不适用,因为它无法提供严格的事务性和实时一致性保证。Write Behind 适用于那些可以容忍一定延迟的数据一致性场景,通过牺牲一定程度的一致性换取更高的系统性能和扩展性。

解决双写一致性问题的3种方案

以上我们主要讲解了解决双写一致性问题的主要策略,但是每种策略都有一定的局限性,所以我们在实际运用中,还要结合一些其他策略去屏蔽上述策略的缺点。

1. 延时双删策略

延时双删策略主要用于解决在高并发场景下,由于网络延迟、并发控制等原因造成的数据库与缓存数据不一致的问题。

当更新数据库时,首先删除对应的缓存项,以确保后续的读请求会从数据库加载最新数据。
但是由于网络延迟或其他不确定性因素,删除缓存与数据库更新之间可能存在时间窗口,导致在这段时间内的读请求从数据库读取数据后写回缓存,新写入的缓存数据可能还未反映出数据库的最新变更。

所以为了解决这个问题,延时双删策略在第一次删除缓存后,设定一段短暂的延迟时间,如几百毫秒,然后在这段延迟时间结束后再次尝试删除缓存。这样做的目的是确保在数据库更新传播到所有节点,并且在缓存中的旧数据彻底过期失效之前,第二次删除操作可以消除缓存中可能存在的旧数据,从而提高数据一致性。

public class DelayDoubleDeleteService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private TaskScheduler taskScheduler;

    public void updateAndScheduleDoubleDelete(String key, String value) {
        // 更新数据库...
        updateDatabase(key, value);

        // 删除缓存
        redisTemplate.delete(key);

        // 延迟执行第二次删除
        taskScheduler.schedule(() -> {
            redisTemplate.delete(key);
        }, new CronTrigger("0/1 * * * * ?")); // 假设1秒后执行,实际应根据需求设置定时表达式
    }

    // 更新数据库的逻辑
    private void updateDatabase(String key, String value) {
        
    }
}

这种方式可以较好地处理网络延迟导致的数据不一致问题,较少的并发写入数据库和缓存,降低系统的压力。但是,延迟时间的选择需要权衡,过短可能导致实际效果不明显,过长可能影响用户体验。并且对于极端并发场景,仍可能存在数据不一致的风险。

2. 删除缓存重试机制

删除缓存重试机制是在删除缓存操作失败时,设定一个重试策略,确保缓存最终能被正确删除,以维持与数据库的一致性。

在执行数据库更新操作后,尝试删除关联的缓存项。如果首次删除缓存失败(例如网络波动、缓存服务暂时不可用等情况),系统进入重试逻辑,按照预先设定的策略(如指数退避、固定间隔重试等)进行多次尝试。直到缓存删除成功,或者达到最大重试次数为止。通过这种方式,即使在异常情况下也能尽量保证缓存与数据库的一致性。

@Service
public class RetryableCacheService {

    @Autowired
    private CacheManager cacheManager;

    @Retryable(maxAttempts = 3, backoff = @Backoff(delay = 1000L))
    public void deleteCacheWithRetry(String key) {
        ((org.springframework.data.redis.cache.RedisCacheManager) cacheManager).getCache("myCache").evict(key);
    }

    public void updateAndDeleteCache(String key, String value) {
        // 更新数据库...
        updateDatabase(key, value);

        // 尝试删除缓存,失败时自动重试
        deleteCacheWithRetry(key);
    }

    // 更新数据库的逻辑,此处仅示意
    private void updateDatabase(String key, String value) {
        // ...
    }
}

这种重试方式确保缓存删除操作的成功执行,可以应对网络抖动等导致的临时性错误,提高数据一致性。但是可能占用额外的系统资源和时间,重试次数过多可能会阻塞其他操作。

监听并读取biglog异步删除缓存

在数据库发生写操作时,将变更记录在binlog或类似的事务日志中,然后使用一个专门的异步服务或者监听器订阅binlog的变化(比如Canal),一旦检测到有数据更新,便根据binlog中的操作信息定位到受影响的缓存项。讲这些需要更新缓存的数据发送到消息队列,消费者处理消息队列中的事件,异步地删除或更新缓存中的对应数据,确保缓存与数据库保持一致。

@Service
public class BinlogEventHandler {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void handleBinlogEvent(BinlogEvent binlogEvent) {
        // 解析binlogEvent,获取需要更新缓存的key
        String cacheKey = deriveCacheKeyFromBinlogEvent(binlogEvent);

        // 发送到RocketMQ
        rocketMQTemplate.asyncSend("cacheUpdateTopic", cacheKey, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 发送成功处理
            }

            @Override
            public void onException(Throwable e) {
                // 发送失败处理
            }
        });
    }

    // 从binlog事件中获取缓存key的逻辑,这里仅为示意
    private String deriveCacheKeyFromBinlogEvent(BinlogEvent binlogEvent) {
        // ...
    }
}

@RocketMQMessageListener(consumerGroup = "myConsumerGroup", topic = "cacheUpdateTopic")
public class CacheUpdateConsumer {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public void onMessage(MessageExt messageExt) {
        String cacheKey = new String(messageExt.getBody());
        redisTemplate.delete(cacheKey);
    }
}

这种方法的好处是将缓存的更新操作与主业务流程解耦,避免阻塞主线程,同时还能处理数据库更新后由于网络问题或并发问题导致的缓存更新滞后情况。当然,实现这一策略相对复杂,需要对数据库的binlog机制有深入理解和定制开发。

总结

在分布式系统中,为了保证缓存与数据库双写一致性,可以采用以下方案:

  1. 读取操作


    • 先尝试从缓存读取数据,若缓存命中,则直接返回缓存中的数据。
    • 若缓存未命中,则从数据库读取数据,并将数据放入缓存。
  2. 更新操作


    • 在更新数据时,首先在数据库进行写入操作,确保主数据库数据的即时更新。
    • 为了减少数据不一致窗口,采用异步方式处理缓存更新,具体做法是监听数据库的binlog事件,异步进行删除缓存。
    • 在一主多从的场景下,为了确保数据一致性,需要等待所有从库的binlog事件都被处理后才删除缓存(确保全部从库均已更新)。

同时,还需注意以下要点:

  • 对于高并发环境,可能需要结合分布式锁、消息队列或缓存失效延时等技术,进一步确保并发写操作下的数据一致性。
  • 异步处理binlog时,务必考虑异常处理机制和重试策略,确保binlog事件能够正确处理并执行缓存更新操作。

本文已收录于我的个人博客:
码农Academy的博客,专注分享Java技术干货,包括Java基础、Spring Boot、Spring Cloud、Mysql、Redis、Elasticsearch、中间件、架构设计、面试题、程序员攻略等

这几天忙于编程。上次发布了壁纸管理器的插件版(
https://www.cnblogs.com/lzhdim/p/18074135
),然后整理和添加了一下相关的壁纸图片文件,虽然在管理器中也能浏览壁纸并设置,但是还是看图软件更加方便,双击图片文件就能打开浏览了。笔者用的Windows 11操作系统,也自带了图片查看器,不过出于兴趣爱好,于是就想用C#开发一个图片浏览的软件,经过3天的努力,于是就有了此应用,然后有了此博文以进行记录,让更多的读者能够对应用的代码进行复用。

此应用为笔者自己开发的小作品,需要的读者自己复用代码。

1、
项目目录;

2、
源码介绍;

1) 主窗体内;

目前主要的操作还是在主窗体内。

2) 配置窗体内;

3) 注册默认的图片类型应用;

3、
运行界面;

1) 主窗体;

2) 配置窗体;

3) 关于窗体;

4) 系统菜单;

4、
使用介绍;

1) 将EXE文件放到目录里;

2) 双击打开EXE文件,能够拖动图片文件到窗体里进行浏览;

3) 或者打开EXE文件,点击左上角的菜单选择配置,打开配置窗体,对支持的图片文件类型进行注册此应用为默认打开图片类型的应用;

4) 如果不想打开配置窗体注册类型支持,也能够在图片文件上鼠标右键菜单,在打开方式里选择该EXE应用文件,选择始终,这样该类型的图片文件就默认用此应用进行打开了;

5) 移动鼠标到图片显示栏的左侧或者右侧,能够对该图片文件所在目录的其它文件进行浏览;

6) 移动鼠标到窗体底部将有图片文件操作菜单栏,能够对文件进行相关操作;

7) 双击左上角的菜单图标,或者双击图片显示栏能够关闭该应用;

5、
源码下载;

提供源码下载:
https://download.csdn.net/download/lzhdim/89132884

6、
其它建议;

这个只是一个简单的例子。其它的还能够添加浏览时的文件的管理功能,以及窗体的拖放改变大小的功能(这个在另一个博文中会介绍),这个例子只是抛砖引玉,希望能够有所帮助。

上面介绍了笔者用C#开发的一个图片浏览管理器的小应用,请需要的读者自己去复用该应用里的代码,或者直接在此应用的基础上进行完善或者扩展等功能。谢谢大家。