wenmo8 发布的文章

场景:

最近在写代码的时候遇到提示语金额后面无效0过多的问题,比如

用户输入金额:500元,

因为数据库是有对于decimal有限制的[Column(TypeName = "numeric(18,6)")],小数位可以达到6位

则保存到数据库保存之后变成了500.000000。

提示语:变动金额不能超过500元

变动金额不能超过[从数据库中拿到的金额]元

实际上提示语变成了:变动金额不能超过500.000000元

提示语中这样子的金额看着确实有点不太友好,所以建议在提示语中直接将小数位末尾后无效的0去掉。

我也看了大多数建议的方案是:保留N位小数,去掉N未小数之后的0,

但是这样子容易弄掉精准数据:比如500.001000,保留N位小数,去掉末尾的0,

比如保留2位小数变成了:500.00 ,和实际的500.001 就有差异了,无法确认第几位小数是有效值,此方案在我使用的场景中是不可取的。

我想要达到的效果:

经过试验,归纳出以下代码:


   /// <summary>
        ///去掉小数位后面的无效的0/// </summary>
        /// <param name="num"></param>
        /// <returns></returns>
        public static string TrimEndZero(decimalnum)
{
if (num == 0) return "0";decimalfloorNumber;if (num < 0)
{
//负数部分 floorNumber =Math.Ceiling(num);
}
else{//非负数部分 floorNumber =Math.Floor(num);
}
var diff = num - floorNumber; //得到0.几的小数 if (diff == 0)
{
//是一个整数 return num.ToString("#");
}
else{//是小数 return num.ToString().TrimEnd('0');
}

}

View Code

==========================================以下是园友们推荐的写法===================================

评论区园友们推荐了很多简易的写法,大家可以去翻阅看看,找一个最适合的写法

前言

消息队列(MQ)是分布式系统中不可或缺的技术之一。

对很多小伙伴来说,刚接触MQ时,可能觉得它只是个“传话工具”,但用着用着,你会发现它简直是系统的“润滑剂”。

无论是解耦、削峰,还是异步任务处理,都离不开MQ的身影。

下面我结合实际场景,从简单到复杂,逐一拆解MQ的10种经典使用方式,希望对你会有所帮助。

1. 异步处理:让系统轻松一点

场景

小伙伴们是不是经常遇到这样的情况:用户提交一个操作,比如下单,然后要发送短信通知。

如果直接在主流程里调用短信接口,一旦短信服务响应慢,就会拖累整个操作。

用户等得不耐烦,心态直接崩了。

解决方案

用MQ,把非关键流程抽出来异步处理。下单时,直接把“发短信”这件事丢给MQ,订单服务就能立刻响应用户,而短信的事情让MQ和消费者去搞定。

示例代码

// 订单服务:生产者
Order order = createOrder(); // 订单生成逻辑
rabbitTemplate.convertAndSend("order_exchange", "order_key", order);
System.out.println("订单已生成,发短信任务交给MQ");

// 短信服务:消费者
@RabbitListener(queues = "sms_queue")
public void sendSms(Order order) {
    System.out.println("发送短信,订单ID:" + order.getId());
    // 调用短信服务接口
}

深度解析

这种方式的好处是:
主流程解耦,不受慢服务的拖累
。订单服务只管自己的事,短信服务挂了也没关系,MQ会把消息暂存,等短信服务恢复后继续处理。

2. 流量削峰:稳住系统别崩

场景

每年的“双十一”电商大促,用户秒杀商品时一窝蜂冲进来。

突然涌入的高并发请求,不仅会压垮应用服务,还会直接让数据库“趴窝”。

解决方案

秒杀请求先写入MQ,后端服务以稳定的速度从MQ中消费消息,处理订单。

这样既能避免系统被瞬时流量压垮,还能提升处理的平稳性。

示例代码

// 用户提交秒杀请求:生产者
rabbitTemplate.convertAndSend("seckill_exchange", "seckill_key", userRequest);
System.out.println("用户秒杀请求已进入队列");

// 秒杀服务:消费者
@RabbitListener(queues = "seckill_queue")
public void processSeckill(UserRequest request) {
    System.out.println("处理秒杀请求,用户ID:" + request.getUserId());
    // 执行秒杀逻辑
}

深度解析

MQ在这里相当于一个缓冲池,把瞬时流量均匀分布到一段时间内处理。
系统稳定性提升,用户体验更好

3. 服务解耦:减少相互牵制

场景

比如一个订单系统需要通知库存系统扣减库存,还要通知支付系统完成扣款。

如果直接用同步接口调用,服务间的依赖性很强,一个服务挂了,整个链条都会被拖垮。

解决方案

订单服务只负责把消息丢到MQ里,库存服务和支付服务各自从MQ中消费消息。

这样订单服务不需要直接依赖它们。

示例代码

// 订单服务:生产者
rabbitTemplate.convertAndSend("order_exchange", "order_key", order);
System.out.println("订单生成消息已发送");

// 库存服务:消费者
@RabbitListener(queues = "stock_queue")
public void updateStock(Order order) {
    System.out.println("扣减库存,订单ID:" + order.getId());
}

// 支付服务:消费者
@RabbitListener(queues = "payment_queue")
public void processPayment(Order order) {
    System.out.println("处理支付,订单ID:" + order.getId());
}

深度解析

通过MQ,各个服务之间可以实现松耦合。

即使库存服务挂了,也不会影响订单生成的流程,
大幅提升系统的容错能力

4. 分布式事务:保证数据一致性

场景

订单服务需要同时生成订单和扣减库存,这涉及两个不同的数据库操作。

如果一个成功一个失败,就会导致数据不一致。

解决方案

通过MQ实现分布式事务。

订单服务生成订单后,将扣减库存的任务交给MQ,最终实现数据的一致性。

示例代码

// 订单服务:生产者
rabbitTemplate.convertAndSend("order_exchange", "order_key", order);
System.out.println("订单创建消息已发送");

// 库存服务:消费者
@RabbitListener(queues = "stock_queue")
public void updateStock(Order order) {
    System.out.println("更新库存,订单ID:" + order.getId());
    // 执行扣减库存逻辑
}

深度解析

通过“最终一致性”解决了分布式事务的难题,虽然短时间内可能有数据不一致,但最终状态一定是正确的。

5. 广播通知:一条消息,通知多个服务

场景

比如商品价格调整,库存、搜索、推荐服务都需要同步更新。

如果每个服务都要单独通知,工作量会很大。

解决方案

MQ的广播模式(Fanout)可以让多个消费者订阅同一条消息,实现消息的“一发多收”。

示例代码

// 生产者:广播消息
rabbitTemplate.convertAndSend("price_update_exchange", "", priceUpdate);
System.out.println("商品价格更新消息已广播");

// 消费者1:库存服务
@RabbitListener(queues = "stock_queue")
public void updateStockPrice(PriceUpdate priceUpdate) {
    System.out.println("库存价格更新:" + priceUpdate.getProductId());
}

// 消费者2:搜索服务
@RabbitListener(queues = "search_queue")
public void updateSearchPrice(PriceUpdate priceUpdate) {
    System.out.println("搜索价格更新:" + priceUpdate.getProductId());
}

深度解析

这种模式让多个服务都能接收到同一条消息,
扩展性非常强

6. 日志收集:分布式日志集中化

场景

多个服务产生的日志需要统一存储和分析。

如果直接写数据库,可能导致性能瓶颈。

解决方案

各服务将日志写入MQ,日志分析系统从MQ中消费消息并统一处理。

示例代码

// 服务端:生产者
rabbitTemplate.convertAndSend("log_exchange", "log_key", logEntry);
System.out.println("日志已发送");

// 日志分析服务:消费者
@RabbitListener(queues = "log_queue")
public void processLog(LogEntry log) {
    System.out.println("日志处理:" + log.getMessage());
    // 存储或分析逻辑
}

7. 延迟任务:定时触发操作

场景

用户下单后,如果30分钟内未支付,需要自动取消订单。

解决方案

使用MQ的延迟队列功能,设置消息延迟消费的时间。

示例代码

// 生产者:发送延迟消息
rabbitTemplate.convertAndSend("delay_exchange", "delay_key", order, message -> {
    message.getMessageProperties().setDelay(30 * 60 * 1000); // 延迟30分钟
    return message;
});
System.out.println("订单取消任务已设置");

// 消费者:处理延迟消息
@RabbitListener(queues = "delay_queue")
public void cancelOrder(Order order) {
    System.out.println("取消订单:" + order.getId());
    // 取消订单逻辑
}

8. 数据同步:跨系统保持数据一致

场景

在一个分布式系统中,多个服务依赖同一份数据源。

例如,电商平台的订单状态更新后,需要同步到缓存系统和推荐系统。

如果让每个服务直接从数据库拉取数据,会增加数据库压力,还可能出现延迟或不一致的问题。

解决方案

利用MQ进行数据同步。订单服务更新订单状态后,将更新信息发送到MQ,缓存服务和推荐服务从MQ中消费消息并同步数据。

示例代码

订单服务:生产者

// 更新订单状态后,将消息发送到MQ
Order order = updateOrderStatus(orderId, "PAID"); // 更新订单状态为已支付
rabbitTemplate.convertAndSend("order_exchange", "order_status_key", order);
System.out.println("订单状态更新消息已发送:" + order.getId());

缓存服务:消费者

@RabbitListener(queues = "cache_update_queue")
public void updateCache(Order order) {
    System.out.println("更新缓存,订单ID:" + order.getId() + " 状态:" + order.getStatus());
    // 更新缓存逻辑
    cacheService.update(order.getId(), order.getStatus());
}

推荐服务:消费者

@RabbitListener(queues = "recommendation_queue")
public void updateRecommendation(Order order) {
    System.out.println("更新推荐系统,订单ID:" + order.getId() + " 状态:" + order.getStatus());
    // 更新推荐服务逻辑
    recommendationService.updateOrderStatus(order);
}

深度解析

通过MQ实现数据同步的好处是:

  1. 减轻数据库压力
    :避免多个服务同时查询数据库。
  2. 最终一致性
    :即使某个服务处理延迟,MQ也能保障消息不丢失,最终所有服务的数据状态一致。

9. 分布式任务调度

场景

有些任务需要定时执行,比如每天凌晨清理过期订单。

这些订单可能分布在多个服务中,如果每个服务独立运行定时任务,可能会出现重复处理或任务遗漏的问题。

解决方案

使用MQ统一分发调度任务,每个服务根据自身的业务需求,从MQ中消费任务并执行。

示例代码

任务调度服务:生产者

// 定时任务生成器
@Scheduled(cron = "0 0 0 * * ?") // 每天凌晨触发
public void generateTasks() {
    List<Task> expiredTasks = taskService.getExpiredTasks();
    for (Task task : expiredTasks) {
        rabbitTemplate.convertAndSend("task_exchange", "task_routing_key", task);
        System.out.println("任务已发送:" + task.getId());
    }
}

订单服务:消费者

@RabbitListener(queues = "order_task_queue")
public void processOrderTask(Task task) {
    System.out.println("处理订单任务:" + task.getId());
    // 执行订单清理逻辑
    orderService.cleanExpiredOrder(task);
}

库存服务:消费者

@RabbitListener(queues = "stock_task_queue")
public void processStockTask(Task task) {
    System.out.println("处理库存任务:" + task.getId());
    // 执行库存释放逻辑
    stockService.releaseStock(task);
}

深度解析

分布式任务调度可以解决:

  1. 重复执行
    :每个服务只处理自己队列中的任务。
  2. 任务遗漏
    :MQ确保任务可靠传递,防止任务丢失。

10. 文件处理:异步执行大文件任务

场景

用户上传一个大文件后,需要对文件进行处理(如格式转换、压缩等)并存储。

如果同步执行这些任务,前端页面可能会一直加载,导致用户体验差。

解决方案

用户上传文件后,立即将任务写入MQ,后台异步处理文件,处理完成后通知用户或更新状态。

示例代码

上传服务:生产者

// 上传文件后,将任务写入MQ
FileTask fileTask = new FileTask();
fileTask.setFileId(fileId);
fileTask.setOperation("COMPRESS");
rabbitTemplate.convertAndSend("file_task_exchange", "file_task_key", fileTask);
System.out.println("文件处理任务已发送,文件ID:" + fileId);

文件处理服务:消费者

@RabbitListener(queues = "file_task_queue")
public void processFileTask(FileTask fileTask) {
    System.out.println("处理文件任务:" + fileTask.getFileId() + " 操作:" + fileTask.getOperation());
    // 模拟文件处理逻辑
    if ("COMPRESS".equals(fileTask.getOperation())) {
        fileService.compressFile(fileTask.getFileId());
    } else if ("CONVERT".equals(fileTask.getOperation())) {
        fileService.convertFileFormat(fileTask.getFileId());
    }
    // 更新任务状态
    taskService.updateTaskStatus(fileTask.getFileId(), "COMPLETED");
}

前端轮询或回调通知

// 前端轮询文件处理状态
setInterval(() => {
    fetch(`/file/status?fileId=${fileId}`)
        .then(response => response.json())
        .then(status => {
            if (status === "COMPLETED") {
                alert("文件处理完成!");
            }
        });
}, 5000);

深度解析

异步文件处理的优势:

  1. 提升用户体验
    :主线程迅速返回,减少用户等待时间。
  2. 后台任务灵活扩展
    :支持多种操作逻辑,适应复杂文件处理需求。

总结

消息队列不只是传递消息的工具,更是系统解耦、提升稳定性和扩展性的利器。

在这10种经典场景中,每一种都能解决特定的业务痛点。

希望这篇文章对你理解MQ的应用场景有帮助!

最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下我的同名公众号:苏三说技术,您的支持是我坚持写作最大的动力。

求一键三连:点赞、转发、在看。

关注公众号:【苏三说技术】,在公众号中回复:进大厂,可以免费获取我最近整理的10万字的面试宝典,好多小伙伴靠这个宝典拿到了多家大厂的offer。

Python
的依赖管理工具一直没有标准化,原因主要包括:

  1. 历史发展的随意性

    Python
    发展早期对于依赖管理的重视程度不足,缺乏从一开始就进行统一规划和设计的意识
  2. 社区的分散性

    Python
    社区庞大且分散,众多开发者和团队各自为政,根据自己的需求和偏好开发工具,缺乏统一的协调和整合机制
  3. 多样化的使用场景

    Python
    应用场景广泛,从 Web 开发到数据科学、机器学习、系统管理脚本等。不同场景对依赖管理有着不同的要求
  4. 向后兼容性的挑战

    Python
    语言本身非常注重向后兼容性,这在一定程度上限制了对依赖管理工具进行根本性变革的可能性
  5. 缺乏统一的治理
    :与一些编程语言(如
    Java
    有 Oracle 主导的规范制定)不同,
    Python
    没有一个强有力的单一实体来主导依赖管理工具的标准化工作
  6. 生态系统的快速变化

    Python
    生态系统发展迅速,新的库和框架不断涌现,这使得依赖关系变得越来越复杂

1. 什么是依赖管理

依赖管理工具
常用于处理软件项目中的依赖关系。

在软件开发过程中,一个项目往往会依赖于许多其他的软件库、框架或组件。

依赖管理工具
能够帮助开发者精确地指定这些依赖项的版本,自动下载和安装它们,并且可以在不同的环境中(如开发、测试、生产环境)保证依赖项的一致性。

这样可以有效避免因依赖版本混乱而导致的软件故障、兼容性问题等情况。

比如,其他编程语言的标准
依赖管理工具
有:
NodeJS

npm

Rust

cargo

Java

Maven
等等。

依赖管理工具最关键的作用是
可重复性
,意味着我们可以遵循一系列步骤,最终得到的软件项目是完全相同的。

特别是现在的项目(不管是开源的还是内部的)基本都需要多人协作,确保每个人的代码编译出来的软件运行结果一致是至关重要的。

良好的依赖管理可以对
开发

构建

部署
阶段的所有依赖关系都明确声明,并与版本控制中的代码一起跟踪。

简单来说:
应用程序=代码+所有依赖项

具体可以归纳为以下几个步骤:

  1. 创建定义文件
    :项目的描述,声明所需的依赖项和最小版本约束等
  2. 生成锁定文件
    :固定依赖项的版本和依赖项之间的关系
  3. 同步环境
    :一般都过git之类的版本管理工具互相同步
  4. 追踪定义文件和锁定文件
    :定义文件和锁定文件有变化时及时互相同步

2. 依赖管理工具对比

Python
的依赖管理工具虽然没有统一,但是有很多可供选择,下面一一分析每个工具的优缺点。

2.1. pip

pip
是自带的默认包管理器,也是使用最多的工具,它的特点是只能用来安装
Python
包。

优势


  1. Python 3.4
    起包含在
    Python
    中,无需额外安装
  2. 2013年开始引入
    wheels
    分发格式,安装速度大大提高
  3. 2020年开始加入了依赖解析算法,能够更好的保持环境的一致性

不足之处

  1. 依赖
    Python
    ,也就是说使用
    pip
    必须先安装
    Python
  2. 不能安装非Python的包
  3. 没有锁定文件

2.2. venv

用于创建虚拟环境的内置工具,在虚拟环境中可使用 pip 安装包,通过设置环境变量来隔离环境。

优势
:自
Python 3.3
起包含在
Python
中。

不足之处


  1. Python
    工具,依赖
    Python
    安装
  2. 所有环境必须使用相同的
    Python
    解释器
  3. 无法安装非
    Python

2.3. virtualenv


venv
成为
Python
内置工具前,我们通常使用
virtualenv
创建虚拟环境,可指定不同的
Python
解释器创建虚拟环境,需通过
pip
安装。

优势
:能指定不同
Python
解释器创建虚拟环境。

不足之处

venv
是一样的。

2.4. pip-tools

轻量级工具,引入锁文件机制。

需先编写
requirements.in
作为定义文件,再用
pip-compile
生成
requirements.txt
锁文件,同步环境是使用
pip-sync

优势

轻量、简单,与基本的
<font style="color:rgba(0, 0, 0, 0.85);">pip/venv</font>
工具协同工作。

不足之处


  1. Python
    工具,需安装到项目环境中,可能存在兼容性问题
  2. 只能处理
    pip
    可安装的包
  3. 定义文件
    需手动维护

2.5. Pipenv

整合了
pip

virtualenv

pip-tools
的功能,通过
Pipfile

Pipfile.lock
管理依赖和虚拟环境,自动更新文件。

优势
:轻量、简单,包装了基本的
pip/venv
工具。

不足之处


  1. Python
    工具
  2. 有自己的定义和锁文件格式
  3. 只能处理
    pip
    可安装的包
  4. 只能区分开发和非开发依赖,环境定义不够灵活

2.6. Poetry

旨在涵盖
Python
项目整个开发流程,包括项目引导、虚拟环境、依赖管理、构建和发布包。

通过
pyproject.toml
管理依赖,自动维护
poetry.lock
,支持依赖分组。

优势

  1. 一体化工具,涵盖项目开发全生命周期
  2. 有方便的命令行界面
  3. 支持依赖分组

不足之处


  1. Python
    工具,较重量级,依赖多,安装可能有问题
  2. 与其他工具互操作性差,不支持其他构建后端
  3. 不支持维护互斥环境
  4. 有自己的依赖定义和锁文件格式
  5. 只能处理
    pip
    可安装的包

2.7. PDM

类似于
Poetry
,但遵循
PEP
标准,可使用
uv
进行依赖解析和安装,其构建后端可独立使用。

uv
是后面将要介绍的另一个依赖管理工具。

优势

  1. 遵循
    PEP
    标准
  2. 可利用
    uv
    进行依赖管理

不足之处
:与
Poetry
类似,是
Python
工具,有较多依赖,存在相关缺点。

2.8. pyenv

用于安装和管理不同版本的
Python
,可在全局或项目级别激活指定版本,是简单的
shell
实用程序,不依赖
Python
安装。

优势


  1. shell
    脚本,无
    Python
    依赖
  2. 遵循
    Unix
    哲学,专注于管理
    Python
    版本

不足之处

  1. 安装新
    Python
    版本需下载并编译源代码,耗时
  2. 首次设置可能较麻烦,需安装多个构建依赖
  3. 不支持
    Windows

2.9. pipx


pip
包安装在用户级别的独立虚拟环境中,避免依赖冲突,通过
symlink
(软链接) 将入口点链接到
PATH
,方便调用。

优势
:比直接在用户级别
pip
安装工具更好,能隔离依赖,可使用不同
Python
解释器。

不足之处


  1. Python
    工具
  2. 无法安装同一工具的多个版本,所有项目需共享工具版本

2.10. uv


Rust
编写的全能工具,旨在替代多个
Python
管理工具,处理整个开发流程,包括安装包、管理虚拟环境、构建和发布等。

遵循
Python
标准,依赖定义在
pyproject.toml
,锁文件为
uv.lock
,支持任意依赖分组,能安装
pip
包作为可执行文件,可管理
Python
版本,维护全局包缓存。

优势


  1. Rust
    编写,速度极快,单二进制文件,无外部依赖
  2. 多平台支持
  3. 一体化工具,功能全面
  4. 遵循
    Python
    标准
  5. 支持选择任何构建后端
  6. 支持依赖分组。

不足之处

  1. 不支持维护多个互斥环境
  2. 只能处理
    pip
    可安装的包

2.11. Conda


Anaconda
公司开发的不同生态系统的包管理器,主要用于安装
anaconda.org
上的包,

能创建虚拟环境,与
pip
生态系统不同,对【

】 的定义更广泛,包括共享库、头文件、可执行文件等。

优势

  1. 多平台支持
  2. 有全局包缓存
  3. 包以编译二进制形式分发
  4. 依赖解析算法健壮
  5. 可在
    Conda
    环境中使用
    pip
  6. 支持全局和共享环境

不足之处

  1. 速度慢
  2. 包的下载是串行的
  3. 安装过程有些侵入性,会修改
    shell
    配置
  4. 与 “


    Python
    生态系统互操作性有限
  5. 无锁文件
  6. 构建和分发
    Conda
    包较痛苦

2.12. Mamba


Conda
的改进版,旨在解决
Conda
的痛点,如慢的依赖解析和并行下载问题,用
C++
实现,使用不同算法,推荐安装方式已改变。

速度比
Conda
快很多,其他方面和
Conda
类似。

2.13. Pixi

类似于
uv
,但针对
Conda
生态系统,用
Rust
编写,支持多平台。

通过
pyproject.toml

pixi.toml
配置,有方便的命令行界面,支持管理多个虚拟环境和定义文件,有锁文件机制,支持类似
Makefile
的项目自动化任务,可指定系统依赖,但不帮助构建包。

优势

  1. 用 Rust 编写,速度快,单二进制文件,无外部依赖
  2. 多平台支持
  3. 方便的命令行界面
  4. 全局包缓存
  5. 可下载
    Python
    二进制文件和
    anaconda.org
    上的非
    Python
  6. 能使用
    pyproject.toml

    pixi.toml
    配置
  7. 可选择任何构建后端

不足之处
:与其他工具兼容性有限,且没有遵循
Conda
的全局环境理念。

3. 工具选择建议

如此之多的依赖管理工具,我们应该如何选择呢?

如果我们的项目只有对Python包的依赖,那么推荐
uv

Pixi

如果需要维护多个互斥的环境,那么推荐
pip + venv + pip-tools + pyenv

如果需处理无法通过
pip
安装的依赖,那么建议使用
Pixi

4. 总结

Python
的依赖管理工具很多,但是大部分工具其实大同小异,只是互相做了一些小的改进。

我们选择时,除了考虑遗留项目的问题之外,尽量优先选择新出的工具。

新的工具除了会改进原有工具的缺点,还会借鉴其他语言的优秀的依赖管理工具。

目前,我个人的话,使用
uv
来管理项目比较多。

Finereport调用python服务进行大数据量导出

Aerial view of woman using a computer laptop on a marble table

背景:

在使用finereport过程中,我们发现在数据导出这块一直是一个瓶颈,闲来无事,思索一番,想出来一种场景来应对此问题。供各位大佬参考讨论,也欢迎其他大佬提供更好的解决方案。

文笔较差,大佬见谅。

废话不多说,直接上代码,案例。

正文:

首先,我们需要使用python启一个flask服务,来供传递参数,sql等。

本次举例写的固定sql,后续大家可以自行优化扩展。

1.启动flask服务

from flask import Flask, request, jsonify, send_file
import pymysql
import pandas as pd
import threading
import os
import time

app = Flask(__name__)

# 数据库连接配置
db_config = {
   'host': 'ip',
   'user': '用户名',
   'password': '密码',
   'database': '数据库'
}

# 数据库连接
def connect_db():
   return pymysql.connect(**db_config)

# 导出到CSV函数
def export_to_csv(sql, params, output_file):
   connection = connect_db()
   try:
       # 执行查询并使用参数化查询
       with connection.cursor() as cursor:
           cursor.execute(sql, params)  # 使用参数化查询
           result = cursor.fetchall()

           # 如果没有结果,返回None
           if not result:
               return None

           # 将查询结果转换为DataFrame并写入CSV
           df = pd.DataFrame(result, columns=[i[0] for i in cursor.description])
           df.to_csv(output_file, index=False, encoding='utf-8')

           print(f"CSV文件已生成: {output_file}")
           return output_file
   finally:
       connection.close()

# 后台线程处理文件导出
def generate_csv(sql, params, output_file):
   export_to_csv(sql, params, output_file)

# 这里为了验证场景使用的get请求,实际大多数情况下都需要传递参数,推荐post请求。
@app.route('/run_report', methods=['GET'])
def run_report():
   params = request.args.getlist('params')  # 获取URL查询参数
   sql = "SELECT * FROM test_table"  # 默认的SQL查询

   # 生成唯一的输出文件名(可以用时间戳或UUID)
   output_file = f'output_{int(time.time())}.csv'

   # 启动后台线程生成CSV文件
   thread = threading.Thread(target=generate_csv, args=(sql, params, output_file))
   thread.start()

   # 等待线程完成,确保文件生成后直接返回文件
   thread.join()  # 确保线程完成后返回

   # 如果文件生成成功,直接返回文件
   if os.path.exists(output_file):
       return send_file(output_file, as_attachment=True, mimetype='text/csv')
   else:
       return jsonify({'error': '生成CSV文件失败'}), 500

if __name__ == '__main__':
   app.run(debug=True)

2.创建模板:

在按钮点击事件上绑定服务地址即可。

提示这块目前做的比较粗糙,大家可以自行优化。

window.location.href="http://127.0.0.1:5000/run_report"
FR.Msg.alert("提示","数据正在下载请稍后");

3.示例模板

下载
后台导出.rar (1.99 K)

4.示例操作

img

不知道怎么上传视频,大家自己尝试吧

第一次写贴子,写的不咋好,大家见谅,感谢。顺便吐槽一句,这个发帖写作的好难使用啊。

目前仅为思路的验证,代码健壮性目前比较差,见谅。

作者:来自 vivo 互联网服务器团队- Pang Haiyun

介绍 Kafka Streams 的原理架构,常见配置以及在监控场景的应用。

一、背景

在当今大数据时代,实时数据处理变得越来越重要,而监控数据的实时性和可靠性是监控能力建设最重要的一环。随着监控业务需求的变化和技术的发展,需要能够实时处理和分析庞大的数据流。作为一种流式处理平台,Kafka Streams 为处理实时数据提供了强大的支持。本文将重点介绍如何利用 Kafka Streams 进行实时数据处理,包括其基本原理、功能和实际应用。通过本文的学习,读者将能够深入了解 Kafka Streams 的优势、在监控场景的应用及实践。

二、Kafka Streams 的基本概念

Kafka Streams 是一个开源的流式处理框架,基于 Kafka 消息队列构建,能够处理无限量的数据流。与传统的批处理不同,Kafka Streams 允许用户以流式处理的方式实时处理数据,而且处理延迟仅为毫秒级。

通过 Kafka Streams ,用户可以进行数据的实时转换、聚合、过滤等操作,同时能够与 Kafka Connect 和 Kafka Producer/Consumer 无缝集成。Kafka Streams 也是一个客户端程序库,用于处理和分析存储在 Kafka 中的数据,并将得到的数据写回 Kafka 或发送到外部系统。

Kafka、Storm、Flink 和 Spark 是大数据领域常用的工具和框架。

1、区别

  • Kafka 是一个分布式消息系统,主要用于构建实时数据管道和事件驱动的应用程序。它提供了高吞吐量、持久性、可伸缩性和容错性,主要用于数据的发布和订阅。

  • Storm 是一个分布式实时计算系统,用于处理实时数据流。它提供了低延迟、高吞吐量的实时计算能力,适用于实时数据处理和流式计算。

  • Flink 是一个流处理引擎,提供了精确一次的状态处理和事件时间处理等特性。它支持流处理和批处理,并提供了统一的 API 和运行时环境。

  • Spark 是一个通用的大数据处理框架,提供了批处理和流处理的功能。Spark 提供了丰富的数据处理和计算功能,包括 SQL 查询、机器学习、图处理等。

2、Kafka 的优势

  • 持久性和可靠性:Kafka 提供了数据持久化的功能,能够确保数据不丢失,并且支持数据的持久存储和重放。

  • 可伸缩性:Kafka 集群可以很容易地进行水平扩展,支持大规模数据处理和高并发访问。

  • 灵活性:Kafka 可以与各种不同的数据处理框架集成,作为数据源或数据目的地,使其在实时数据处理的场景中具有广泛的适用性。

总的来说,Kafka 的优势在于其高吞吐量、持久性和可靠性,以及灵活的集成能力,使其成为构建实时数据管道和事件驱动应用程序的理想选择。

2.1 Stream 处理拓扑

2.1.1 流

流是 Kafka Streams 提出的最重要的抽象概念:它表示一个无限的,不断更新的数据集。流是一个有序的,可重放(反复的使用),不可变的容错序列,数据记录的格式是键值对(key-value)。这里的 key 主要记录的是 value 的索引,决定了 Kafka 和 Kafka Streams 中数据的分区,即数据如何路由到 Topic 的特定分区。value 是主要后续处理器要处理的数据。

图片

2.1.2 处理器拓扑

处理器拓扑是一个由流(边缘)连接的流处理(节点)的图。通过 Kafka Streams ,我们可以编写一个或多个的计算逻辑的处理器拓扑,用于对数据进行多步骤的处理。

2.1.3 流处理器

流处理器是处理器拓扑中的一个节点;它表示一个处理的步骤,用来转换流中的数据(从拓扑中的上游处理器一次接受一个输入消息,并且随后产生一个或多个输出消息到其下游处理器中)。

在拓扑中有两个特别的处理器:

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个 Kafka 主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • sink 处理器(Sink Processor):sink 处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的 Kafka 主题。

图片

(图片来源: Kafka 官网)

Kafka Streams 提供2种方式来定义流处理器拓扑:Kafka  Streams DSL 提供了更常用的数据转换操作,如 map 和 filter;低级别  Processor API 允许开发者定义和连接自定义的处理器,以及和状态仓库交互。处理器拓扑仅仅是流处理代码的逻辑抽象。

2.2 时间

在流处理方面有一些重要的时间概念,它们是建模和集成一些操作的重要元素,例如定义窗口的时间界限。

时间在流中的常见概念如下:

  • 事件时间 - 当一个事件或数据记录发生的时间点,就是最初创建的“源头”。

  • 处理时间 - 事件或数据消息发生在流处理应用程序处理的时间点。即,记录已被消费。处理时间可能是毫秒,小时,或天等。比原始事件时间要晚。

  • 摄取时间 - 事件或数据记录是 Kafka broker 存储在 topic 分区的时间点。与事件时间的差异是,当记录由 Kafka broker 追加到目标 topic 时,生成的摄取时间戳,而不是消息创建时间(“源头”)。与处理时间的差异是处理时间是流处理应用处理记录时的时间。比如,如果一个记录从未被处理,那么就没有处理时间,但仍然有摄取时间。

Kafka Streams 通过 TimestampExtractor 接口为每个数据记录分配一个时间戳。该接口的具体实现了基于数据记录的实际内容检索或计算获得时间戳,例如嵌入时间戳字段提供的事件时间语义,或使用其他的方法,比如在处理时返回当前的 wall-clock(墙钟)时间,从而产生了流应用程序的处理时间语义。因此开发者可以根据自己的业务需要选择执行不同的时间。例如,每条记录时间戳描述了流的时间增长(尽管记录在 stream 中是无序的)并利用时间依赖性来操作,如 join。

最后,当一个 Kafka Streams 应用程序写入记录到 Kafka 时,它将分配时间戳到新的消息。时间戳分配的方式取决于上下文:

  • 当通过处理一些输入记录(例如,在 process()函数调用中触发的 context.forward())生成新的输出记录时,输出记录时间戳直接从输入记录时间戳继承。

  • 当通过周期性函数(如 punctuate())生成新的输出记录时。输出记录时间戳被定义为流任务的当前内部时间(通过 context.timestamp() 获取)。

  • 对于聚合,生成的聚合更新的记录时间戳将被最新到达的输入记录触发更新。

本部分简要介绍了 Kafka Streams 的基本概念,下一部分将介绍 Kafka Streams 的在监控场景的应用实践。

三、Kafka Streams 在监控场景的应用

3.1 链路分布示意图

图片

3.2 示例:使用 Kafka Streams 来处理实时数据

流式处理引擎(如 Kafka Streams)与监控数据 ETL 可以为业务运维带来诸多好处,例如实时数据分析、实时监控、事件驱动的架构等。在本部分,我们将重点介绍  Kafka Streams 与监控数据 ETL 的集成,以及如何在监控数据 ETL 中利用 Kafka Streams 进行实时数据处理。

在监控数据ETL架构中,Kafka Streams 扮演着举足轻重的角色。它可以作为一个独立的数据处理服务来处理实时的数据流,并将处理结果输出到其他存储组件(例如,ES、VM等)中。同时,它也可以作为多个数据源之间的数据交换和通信的桥梁,扮演着数据总线的角色。Kafka Streams 的高可用性、高吞吐量和流式处理能力使得它成为监控数据ETL架构中的重要组件之一。

下面给出一个示例,演示了如何将 Kafka Streams 作为监控数据 ETL 来处理实时的数据。假设我们有一个监控数据流 TopicA,我们希望对这些数据进行实时的分析,并将分析结果输出到另一个 TopicB。我们可以创建一个 Kafka Streams 来处理这个需求:

//创建配置类
Properties props = new Properties();
//设置订阅者
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-service");
//设置servers地址
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
 
StreamsBuilder builder = new StreamsBuilder();
//构建流
KStream<String, String> userActions = builder.stream("TopicA");
//对流进行处理
KTable<String, Long> userClickCounts = userActions
 
    .filter((key, value) -> value.contains("click"))
 
    .groupBy((key, value) -> value.split(":")[0])
 
    .count();
//流写回Kafka
userClickCounts.toStream().to("TopicB", Produced.with(Serdes.String(), Serdes.Long()));
 
KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
streams.start();

在这个示例中,我们创建了一个 Kafka Streams 监控数据 ETL,用于处理实时的监控数据流。它对数据进行了过滤、分组和统计分析,并将结果输出到 TopicB。通过这个 ETL,我们可以很容易地实现实时的数据处理功能,并且能够与其他数据源和数据存储组件进行无缝的集成。

3.3 监控 ETL 的流处理示意图

图片

本部分介绍了 Kafka Streams 的在监控场景的应用实践,下一部分将深入探讨 Kafka Streams 的运作原理及实时数据处理的常见操作,并阐述 Kafka Streams 如何实现这些操作。

四、监控数据 ETL 中 Kafka Streams 的运作原理

4.1 架构

Kafka Streams 通过生产者和消费者,并利用 Kafka 自有的能力来提供数据平行性,分布式协调性,故障容错和操作简单性,从而简化了应用程序的开发,在本节中,我们将描述 Kafka Streams 是如何工作的。

下图展示了 Kafka Streams 应用程序的解剖图,让我们来看一下。

图片

(图片来源: Kafka 官网)

Kafka 消费者通过消费1个或多个 Topic 拿到数据,形成输入 Kafka 流,经过处理器拓扑对数据进行统一处理形成输出 Kafka 流,将数据写入1个或多个出流 Topic,这是 kafka 流整体的运行流程。

4.1.1 Stream 分区和任务

Kafka 分区数据的消息层用于存储和传输,Kafka Streams  分区数据用于处理, 在这两种情况下,这种分区规划和设计使数据具有弹性,可扩展,高性能和高容错的能力。Kafka Streams 使用了分区和任务的概念,基于 Kafka 主题分区的并行性模型。在并发环境里,Kafka  Streams 和 Kafka 之间有着紧密的联系:

  • 每个流分区是完全有序的数据记录队列,并映射到 Kafka 主题的分区。

  • 流的数据消息与主题的消息映射。

  • 数据记录中的 keys 决定了 Kafka 和 Kafka Streams  中数据的分区,即,如何将数据路由到指定的分区。

应用程序的处理器拓扑通过将其分成多个任务来进行扩展,更具体点说,Kafka Streams 根据输入流分区创建固定数量的任务,其中每个任务分配一个输入流的分区列表(即,Kafka 主题)。分区对任务的分配不会改变,因此每个任务是应用程序并行性的固定单位。然后,任务可以基于分配的分区实现自己的处理器拓扑;他们还可以为每个分配的分区维护一个缓冲,并从这些记录缓冲一次一个地处理消息。作为结果,流任务可以独立和并行的处理而无需手动干预。

重要的是要理解 Kafka Streams 不是资源管理器,而是可在任何地方都能“运行”的流处理应用程序库。多个实例的应用程序在同一台机器上执行,或分布多个机器上,并且任务可以通过该库自动的分发到这些运行的实例上。分区对任务的分配永远不会改变;如果一个应用程式实例失败,则这些被分配的任务将自动地在其他的实例重新创建,并从相同的流分区继续消费。

下面展示了2个分区,每个任务分配了输出流的1个分区。

图片

(图片来源: Kafka 官网)

4.1.2 线程模型

Kafka Streams 允许用户配置线程数,可用于平衡处理应用程序的实例。每个线程的处理器拓扑独立的执行一个或多个任务。例如,下面展示了一个流线程运行2个流任务。

图片

(图片来源: Kafka 官网)

启动更多的流线程或更多应用程序实例,只需复制拓扑逻辑(即复制代码到不同的机器上运行),达到并行处理处理不同的 Kafka 分区子集的目的。要注意的是,这些线程之间不共享状态。因此无需协调内部的线程。这使它非常简单在应用实例和线程之间并行拓扑。Kafka 主题分区的分配是通过 Kafka Streams 利用 Kafka 的协调功能在多个流线程之间透明处理。

如上所述,Kafka Streams 扩展流处理应用程序是很容易的:你只需要运行你的应用程序实例,Kafka Streams 负责在实例中运行的任务之间分配分区。你可以启动多个应用程序线程处理多个输入的 Kafka 主题分区。这样,所有运行中的应用实例,每个线程(即运行的任务)至少有一个输入分区可以处理。

4.1.3 故障容错

Kafka Streams 基于 Kafka 分区的高可用和副本故障容错能力。因此,当流数据持久到 Kafka,即使应用程序故障,如果需要重新处理它,它也是可用的。Kafka  Streams 中的任务利用 Kafka 消费者客户端提供的故障容错的能力来处理故障。如果任务故障,Kafka Streams 将自动的在剩余运行中的应用实例重新启动该任务。

此外,Kafka Streams 还确保了本地状态仓库对故障的稳定性。对于每个状态仓库都维持一个追踪所有的状态更新的变更日志主题。这些变更日志主题也分区,因此,每个本地状态存储实例,在任务访问仓里,都有自己的专用的变更日志分区。变更主题日志也启用了日志压缩,以便可以安全的清除旧数据,以防止主题无限制的增长。如果任务失败并在其他的机器上重新运行,则  Kafka Streams 在恢复新启动的任务进行处理之前,重放相应的变更日志主题,保障在故障之前将其关联的状态存储恢复。故障处理对于终端用户是完全透明的。

请注意,任务(重新)初始化的成本通常主要取决于通过重放状态仓库变更日志主题来恢复状态的时间。为了减少恢复时间,用户可以配置他们的应用程序增加本地状态的备用副本(即完全的复制状态)。当一个任务迁移发生时,Kafka Streams 尝试去分配任务给应用实例,提前配置了备用副本的应用实例就可以减少任务(重新)初始化的成本。

4.2 创建流

记录流(KStreams)或变更日志流(KTable或GlobalkTable)可以从一个或多个 Kafka 主题创建源流,(而 KTable 和 GlobalKTable,只能从单个主题创建源流)。

KStreamBuilder builder = new KStreamBuilder();
 
KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2");
KTable<String, GenericRecord> source2 = builder.table("topic3", "stateStoreName");
GlobalKTable<String, GenericRecord> source2 = builder.globalTable("topic4", "globalStoreName");

4.3 流回写 Kafka

在处理结束后,开发者可以通过 KStream.to 和 KTable.to 将最终的结果流(连续不断的)写回 Kafka 主题。

joined.to("topic4");

如果已经通过上面的to方法写入到一个主题中,但是如果你还需要继续读取和处理这些消息,可以从输出主题构建一个新流,Kafka Streams 提供了便利的方法,through:

// equivalent to
//
// joined.to("topic4");
// materialized = builder.stream("topic4");
KStream materialized = joined.through("topic4");

4.4 流程序的配置与启执行

除了定义的 topology,开发者还需要在运行它之前在 StreamsConfig 配置他们的应用程序,Kafka Streams 配置的完整列表可以在这里找到。

Kafka Streams 中指定配置和生产者、消费者客户端类似,通常,你创建一个 java.util.Properties,设置必要的参数,并通过 Properties 实例构建一个 StreamsConfig 实例。

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
Properties settings = new Properties();
// Set a few key parameters
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
 
// Any further settings
settings.put(... , ...);
 
// Create an instance of StreamsConfig from the Properties instance
StreamsConfig config = new StreamsConfig(settings);

除了 Kafka Streams 自己配置参数,你也可以为 Kafka 内部的消费者和生产者指定参数。根据你应用的需要。类似于 Streams 设置,你可以通过 StreamsConfig 设置任何消费者和/或生产者配置。请注意,一些消费者和生产者配置参数使用相同的参数名。例如,用于配置 TCP 缓冲的 send.buffer.bytes 或 receive.buffer.bytes。用于控制客户端请求重试的 request.timeout.ms 和 retry.backoff.ms。如果需要为消费者和生产者设置不同的值,可以使用 consumer. 或 producer. 作为参数名称的前缀。

Properties settings = new Properties();
 
// Example of a "normal" setting for Kafka Streams
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Customize the Kafka consumer settings
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
// Customize a common client setting for both consumer and producer
settings.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 100L);
// Customize different values for consumer and producer
settings.put("consumer." + ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
settings.put("producer." + ProducerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
 
// Alternatively, you can use
settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 1024 * 1024);
settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG), 64 * 1024);

你可以在应用程序代码中的任何地方使用 Kafka Streams ,常见的是在应用程序的 main() 方法中使用。

首先,先创建一个 KafkaStreams 实例,其中构造函数的第一个参数用于定义一个 topology builder(Streams DSL的KStreamBuilder,或 Processor API 的 TopologyBuilder)。

第二个参数是上面提到的 StreamsConfig 的实例。

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TopologyBuilder;
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
//
// OR
//
TopologyBuilder builder = ...; // when using the Processor API
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);

在这点上,内部结果已经初始化,但是处理还没有开始。你必须通过调用 start() 方法启动 Kafka Streams 线程:

// Start the Kafka Streams instance
streams.start();

捕获任何意外的异常,设置 java.lang.Thread.UncaughtExceptionHandler。每当流线程由于意外终止时,将调用此处理程序。

streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public uncaughtException(Thread t, throwable e) {
        // here you should examine the exception and perform an appropriate action!
    }
);

close() 方法结束程序。

// Stop the Kafka Streams instance
streams.close();

现在,运行你的应用程序,像其他的 Java 应用程序一样(Kafka Sterams 没有任何特殊的要求)。同样,你也可以打包成 jar,通过以下方式运行:

# Start the application in class com.example.MyStreamsApp
# from the fat jar named path-to-app-fatjar.jar.
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp

当应用程序实例开始运行时,定义的处理器拓扑将被初始化成1个或多个流任务,可以由实例内的流线程并行的执行。如果处理器拓扑定义了状态仓库,则这些状态仓库在初始化流任务期间(重新)构建。这一点要理解,当如上所诉的启动你的应用程序时,实际上 Kafka Streams 认为你发布了一个实例。现实场景中,更常见的是你的应用程序有多个实例并行运行(如,其他的 JVM 中或别的机器上)。在这种情况下,Kafka Streams 会将任务从现有的实例中分配给刚刚启动的新实例。

五、监控数据 ETL 中 Kafka Streams 参数及其调优

5.1 必配参数:

  1. bootstrap.servers
    :这是 Kafka 集群的地址列表,Kafka Streams 使用它来初始化与 Kafka 的连接。

  2. key.deserializer

    value.deserializer
    :这些配置定义了流中键和值的序列化和反序列化器。

  3. auto.offset.reset
    :当没有初始偏移量或偏移量无效时,这个配置定义了 Kafka Streams 如何处理。

  4. group.id
    :这对于使用 Kafka Streams 的消费者组来说很重要,它定义了消费者组的ID。

5.2 基础参数:

  1. num.stream.threads
    :定义 Kafka Streams 应用程序中的线程数,默认与处理器的逻辑核心数相等。

  2. state.dir
    :定义 Kafka Streams 存储状态的本地目录。

  3. threading.max.instances
    :定义每个主题分区的最大线程实例数,默认与分区数相等。

  4. threading.instances
    :定义每个主题分区的线程实例数,默认与分区数相等。

5.3 消费者参数:

  1. enable.auto.commit
    :自动提交偏移量,默认值为"true",建议设置为"false",以便更好地控制偏移量的提交。

  2. commit.interval.ms
    :提交偏移量的频率,默认值为5000ms,可以根据需要进行调整。

  3. max.poll.records
    :一次拉取的消息数量,默认值为1000,可以根据网络带宽和处理能力进行调整。

5.4 生产者参数:

  1. batch.size
    :批量发送消息的大小,默认值通常是16384(字节),可以根据网络带宽和 Kafka 集群的性能进行调整。

  2. linger.ms
    :消息在生产者缓冲区中的最小停留时间,默认值为100ms,可以根据需要进行调整。

  3. compression.type
    :压缩类型,可以提高网络带宽利用率,但会增加 CPU 开销。默认值为"none",可以根据需要设置为"gzip"、“snappy"或"lz4”。

对于 Kafka 的调优参数,可以根据实际的应用场景和性能需求进行调整,以达到最佳的性能和稳定性。

六、监控数据 ETL 中 Kafka Streams 的分区倾斜问题原因和解决方式

6.1 原因

分区倾斜是监控数据 ETL 的 Kafka Streams 在处理大规模数据流时遇到的常见问题。分区倾斜指的是在一个流处理应用程序中,某个分区的消息消费速度远远慢于其他分区,或某个分区的延迟积压数据远大于其他分区,导致  Kafka Streams 的实时性受到限制。

产生分区倾斜的原因可能包括:

  1. 数据分布不均匀:原始数据在 Kafka 主题的分区中分布不均匀,导致某些分区的消息量远大于其他分区。

  2. 消费者实例数量不足:在 Kafka Streams 应用程序中,消费者的实例数量不足,无法充分处理所有分区的消息。

  3. 消费者负载不均衡:消费者的负载不均衡(包括但不限于某些消费者实例处理的分区数大于其他实例),导致某些消费者实例处理的消息量远大于其他实例。

  4. 消费者实例负载不均衡:消费者实例性能不一致或性能被挤占,导致消费能力不均衡,消费速率异常小于平均消费速率

6.2 解决方案

  1. 数据均衡策略:在设计 Kafka 主题分区分配策略时,可以采用如轮询(Round-robin)或范围(Range)等均衡策略,使得数据在各个分区之间均匀分布。

  2. 增加消费者实例:根据应用程序的实际情况,适当增加消费者的实例数量,以提高整个系统的处理能力,例如扩容。

  3. 负载均衡策略:在消费者组内部实现负载均衡,如使用均匀分配消费者(Uniform Distribution Consumer)等策略,确保消费者实例之间的负载均衡,例如重启或剔除倾斜分区实例使 Kafka Streams 的分区进行重新分配。

  4. 优化消费者处理逻辑:分析消费者处理消息的速度慢的原因,优化处理逻辑,提高消费者的处理能力。

  5. 调整批次大小和窗口函数:通过调整 Kafka Streams 的批次大小和窗口函数等参数,降低消费者的处理压力。

  6. 使用侧输出:对于一些处理速度较慢的分区,可以考虑使用侧输出将部分消息引流至其他系统处理,减轻消费者负载。

七、总结

本文介绍了 Kafka Streams 在监控场景中的应用,阐述了 Kafka Streams 的基本概念,包括流、处理器拓扑、流处理器、时间概念等,举例说明了 Kafka Streams 在监控实时数据ETL中的具体应用,并详细解释了 Kafka Streams 的运作原理,包括其架构、创建流、流回写 Kafka、流程序配置与启执行等内容。文章还介绍了 Kafka Streams 的参数及其调优方法,以及可能出现的分区倾斜问题及其解决方法。

本文意在让读者对于 Kafka 流在监控业务的实际应用有所认识,并且了解 Kafka 流的基本概念和原理,阅读本文后对构建自己 Kafka 流应用程序有所帮助,能够理解在监控数据 ETL 常见分区倾斜的原理和解决方式。

引用:
Kafka 官网 https://kafka.apache.org/