2024年2月

在目标检测任务中,数据集的划分通常分为训练集和验证集,以便在训练模型时评估模型的性能。这个过程对于有效训练和评估目标检测模型非常重要。下面是划分目标检测数据集的一般步骤:
``

  1. 数据集组织:
    确保你的数据集中包含图像文件(通常为 ".jpg" 或 ".png")和相应的标签文件,用于描述图像中的目标位置和类别。标签文件可以是类似于 YOLO 格式的 ".txt" 文件,也可以是其他目标检测任务中使用的格式。

  2. 划分比例选择:
    确定训练集和验证集的划分比例。通常,常见的比例为 80-90% 的数据用于训练,而剩下的 10-20% 用于验证。这个比例可以根据你的数据量和任务的特性进行调整。

  3. 数据集划分代码:
    编写代码来将数据集划分为训练集和验证集。这通常涉及将图像文件和相应的标签文件分别移动到两个不同的文件夹中。

  4. 文件夹结构:
    为了方便管理,创建一个包含两个子文件夹(例如 "images" 和 "labels")的文件夹,分别用于存储图像和标签文件。训练集和验证集各自都应该有这样的文件夹结构。

  5. 随机化和保持一一对应:
    在划分数据集之前,通常会对数据集进行随机化处理,以确保训练集和验证集中的样本都是随机选择的。在划分过程中,要保持图像和标签的一一对应关系,以确保训练和验证的一致性。

  6. 数据集划分结果验证:
    验证划分的结果是否符合预期。你可以检查训练集和验证集的文件数量,确保图像和标签仍然保持一一对应,并且确保文件夹结构正确。
    !!! 注意以下代码中all_data应含有图片与对应标签(在同一个文件夹中,且名称相同仅拓展名不同,分为.jpg和.txt文件)

import os
import random
import shutil

def split_dataset(srcDir, trainDir, valDir, split_ratio=0.9):
    """
    将数据集划分为训练集和验证集,并保存到相应的文件夹中。

    Parameters:
    - srcDir: 原始数据集文件夹路径,包含图像和标签文件。
    - trainDir: 训练集文件夹路径,包含 'images' 和 'labels' 子文件夹。
    - valDir: 验证集文件夹路径,包含 'images' 和 'labels' 子文件夹。
    - split_ratio: 数据集划分比例,默认为 0.9,表示将 90% 的数据用于训练集,10% 用于验证集。
    """
    os.makedirs(os.path.join(trainDir, 'images'), exist_ok=True)
    os.makedirs(os.path.join(trainDir, 'labels'), exist_ok=True)
    os.makedirs(os.path.join(valDir, 'images'), exist_ok=True)
    os.makedirs(os.path.join(valDir, 'labels'), exist_ok=True)

    # 获取数据集中所有文件的列表
    file_list = os.listdir(srcDir)
    random.shuffle(file_list)

    # 根据划分比例计算训练集和验证集的边界索引
    split_index = int(len(file_list) * split_ratio)
    train_files = file_list[:split_index]
    val_files = file_list[split_index:]

    # 将训练集数据移动到相应文件夹
    for file in train_files:
        if file.endswith('.jpg'):
            img_src = os.path.join(srcDir, file)
            label_src = os.path.join(srcDir, file[:-4] + '.txt')
            shutil.move(img_src, os.path.join(trainDir, 'images', file))
            shutil.move(label_src, os.path.join(trainDir, 'labels', file[:-4] + '.txt'))

    # 将验证集数据移动到相应文件夹
    for file in val_files:
        if file.endswith('.jpg'):
            img_src = os.path.join(srcDir, file)
            label_src = os.path.join(srcDir, file[:-4] + '.txt')
            shutil.move(img_src, os.path.join(valDir, 'images', file))
            shutil.move(label_src, os.path.join(valDir, 'labels', file[:-4] + '.txt'))

if __name__ == '__main__':
    # 输入文件夹路径
    srcDir = r"C:\Users\86159\Desktop\hat\all_data"  
    trainDir = r'C:\Users\86159\Desktop\hat\train'  
    valDir = r'C:\Users\86159\Desktop\hat\val'  

    # 调用函数划分数据集
    split_dataset(srcDir, trainDir, valDir)

本文重点分析 Nebula Siwi 智能问答思路,具体代码可参考[2],使用的数据集为 Basketballplayer[3]。部分数据和 schema 如下所示:


一.智能问答可实现的功能


1.Nebula Siwi 源码整体结构

主要包括前段(Vue)和后端(Flask)代码结构,整体结构如下所示:


2.Basketballplayer 数据集介绍

Basketballplayer 数据集简介可参考文献[7]
NebulaGraph 手工和 Python 操作
,将该数据集通过 Python 脚本导入 NebulaGraph 中参考脚本
import_data.py
,当然也可以通过 NebulaGraph 命令进行导入(不做介绍,参考文献[8]
什么是 nGQL - NebulaGraph Database 手册
)。如下所示:

说明:因为本地数据库已经有了一个 basketballplayer 的图空间了,索引本次导入数据使用的是 basketballplayers 图空间。


3.QA 可实现的功能

"""
Sorry I don't understand your questions for now. Here are supported question patterns:  
# 对不起,我现在还不明白你的问题。以下是支持的问题模式:

relation:  # 关系
    - What is the relationship between Yao Ming and Lakers?  # 姚明和湖人的关系是什么?
    - How does Yao Ming and Lakers connected?  # 姚明和湖人是怎么连接的?
serving:  # 服务
    - Which team had Yao Ming served?  # 姚明曾经效力过哪些球队?
friendship:  # 友谊
    - Whom does Tim Duncan follow?  # 邓肯关注了哪些人?
    - Who are Yao Ming's friends?  # 姚明的朋友有哪些?
"
""

使用微信开发工具测试效果,如下所示:


二.智能问题功能实现思路


1.前端实现思路[4]


1.1
安装依赖包

WebStorm 打开 Vue 项目后,执行命令安装包
npm install --save-dev @vue/cli-service
,然后再执行命令
npm install

npm run dev
,如下所示:


1.2
修改 package.json 文件

需要说明的是需要修改 package.json,如下所示:

{
  "name""siwi_frontend",
  "version""1.0.0",
  "description""",
  "scripts": {
    "i""npm install",
    "dev""vue-cli-service serve",
    "build""vue-cli-service build"
  },
  "dependencies": {
    "@vue/cli-service-global""^4.5.19",
    "axios""^0.21.1",
    "vue-bot-ui""^0.2.6",
    "vue-web-speech""^0.2.3"
  },
  "devDependencies": {
    "@vue/cli-service""^5.0.8"
  }
}


1.3
增加
vue.config.js
文件

同时还需要增加一个
vue.config.js
文件,用来解决前端 8080 端口到后端 5000 端口的跨域问题,如下所示:

module.exports = {
    devServer: {
        proxy: 'http://localhost:5000',
    },
};


1.4
通过浏览器进行对话

使用浏览器打开链接 http://localhost:8080/,如下所示:


2.后端实现思路[2]

PyCharm 打开 Flask 项目,直接运行
python F:/ConversationSystem/nebula-siwi/src/siwi/app/__init__.py
,如下所示:


三.后端代码具体实现


1.整体思路

整体流程应该从
F:\ConversationSystem\nebula-siwi\src\siwi\app\__init__.py
看起,初始化连接池和机器人(初始化分类器和动作),然后启动 Flask 服务。如下所示:

其中,初始化连接池使用的 Python 类库为 nebula3,目录结构如下所示:

初始化机器人又包括初始化分类器(classifier)和动作(action)2 个部分。


2.分类器(classifier)实现思路


2.1 增加 main()函数

加了一个 main()函数来测试分类器类,如下所示:


2.2 根据 sentence 识别 intent

分类器实现比较简单,主要是根据 sentence 识别 intent,实现代码如下所示:

def get_matched_intents(self, sentence: str) -> tuple:
    """
    根据sentence识别intent
    "
""

    intents_matched = set()
    for word in self.intents_map.keys():
        if word in sentence:
            intents_matched.add(
                self.intents_map[word])


2.3 根据 sentence 返回 entity

除此之外,还有根据 sentence 返回 entity。如下所示:

def get_matched_entities(self, sentence: str) -> dict:
    """
    消耗一个句子与ahocorasick匹配
    返回一个dict:{entity: entity_type}
    "
""
    entities_matched = []
    for item in self.entity_tree.iter(sentence):
        entities_matched.append(item[1][1])
    return {
        entity: self.entity_type_map[entity] for entity in entities_matched
        }

self.entity_tree.iter(sentence)

ahocorasick.Automaton()
类的一个方法,用于在 Aho-Corasick 自动机中迭代搜索输入的字符串。在这段代码中,
self.entity_tree
是一个 Aho-Corasick 自动机,
sentence
是要搜索的字符串。
iter()
方法会在自动机中搜索
sentence
,并返回一个迭代器。这个迭代器的每个元素都是一个元组,包含两个元素:匹配的单词在
sentence
中的结束位置和该单词在
add_word()
方法中设置的值。

例如,如果在自动机中添加了单词 "apple",并将其值设置为
(0, "apple")
,然后在字符串 "I have an apple" 中搜索,那么
iter()
方法将返回一个迭代器,其中包含一个元组
(13, (0, "apple"))
。这表示 "apple" 在 "I have an apple" 中的结束位置是 13,其值是
(0, "apple")

因此,
self.entity_tree.iter(sentence)
被用于在输入的句子中搜索实体,这段代码将在句子中搜索所有的实体,并将找到的实体添加到
entities_matched
列表中。这里的实体类型包括球员(player)和球队(team)2 种类型。


2.4 自动机实现过程

(1)self.entity_tree 自动机对象

重点详细介绍下 self.entity_tree 是实现过程,如下所示:

self.entity_tree = ahocorasick.Automaton()  # 构建实体树,自动机
for index, entity in enumerate(self.entity_type_map.keys()):  # 遍历实体类型映射
    self.entity_tree.add_word(entity, (index, entity))  # 添加实体
self.entity_tree.make_automaton()  # 构建自动机

(2)
ahocorasick.Automaton()
自动机介绍

ahocorasick.Automaton()
是一个来自
pyahocorasick
库的方法,用于创建一个 Aho-Corasick 自动机。Aho-Corasick 算法是一种用于在输入文本中查找多个模式串的高效算法。

在这段代码中,
self.entity_tree = ahocorasick.Automaton()
创建了一个 Aho-Corasick 自动机实例,并将其赋值给
self.entity_tree
。这个自动机将用于后续的实体识别和意图识别。

Aho-Corasick 自动机的工作原理是,首先构建一个有向图(通常称为 "trie" 或 "前缀树"),其中每个节点代表一个模式串的前缀。然后,对于输入文本中的每个字符,自动机都会沿着图的边移动,匹配尽可能长的模式串。

这种方法的优点是,无论模式串的数量或长度如何,匹配过程的时间复杂度都是线性的,即与输入文本的长度成正比。这使得 Aho-Corasick 算法非常适合于处理大量模式串和大量输入文本的情况。

(3)
add_word()
方法介绍

add_word()

ahocorasick.Automaton()
类的一个方法,用于向 Aho-Corasick 自动机中添加单词(或者说模式串)。


add_word()
方法中,需要传入两个参数:
word

value

word
是想要添加到自动机中的单词,
value
是与这个单词关联的值。这个值可以是任何想要的数据类型,它将在后续的搜索过程中返回。
add_word()
方法被用于添加实体到自动机中:

for index, entity in enumerate(self.entity_type_map.keys()):  # 遍历实体类型映射
    self.entity_tree.add_word(entity, (index, entity))  # 添加实体

在这段代码中,
entity
是要添加的单词,
(index, entity)
是与这个单词关联的值。这样,在后续的搜索过程中,当匹配到这个单词时,就可以返回这个值,从而知道这个单词在原始数据中的位置和内容。

(4)
make_automaton()
方法介绍

make_automaton()

ahocorasick.Automaton()
类的一个方法,用于构建 Aho-Corasick 自动机。在添加所有单词(或模式串)到 Aho-Corasick 自动机后,需要调用
make_automaton()
方法来构建自动机。这个方法会创建额外的失败链接,这些链接在搜索过程中用于在不匹配的情况下跳转到其它可能的匹配位置。

在本文的代码中,
make_automaton()
方法在添加所有实体到自动机后被调用:

self.entity_tree.make_automaton()  # 构建自动机

这段代码将构建一个完整的 Aho-Corasick 自动机,该自动机可以用于在输入文本中高效地查找实体。


2.5 意图(intent)到动作(action)映射


3.动作(action)实现思路


3.1 增加 main()函数

加了一个 main()函数来测试各种动作类,如下所示:

然后就是根据意图(intent)+ 实体(entity)得到动作(action)的类型,包括
FallbackAction(SiwiActionBase)、RelationshipAction(SiwiActionBase)、ServeAction(SiwiActionBase)和FollowAction(SiwiActionBase)

其中,
self.intent_map
对应的数据结构和内容,如下所示:


3.2
FallbackAction(SiwiActionBase)

class FallbackAction(SiwiActionBase):
    def __init__(self, intent):
        super().__init__(intent)  # 使用intent初始化SiwiActionBase

    def execute(self, connection_pool=None):
        """
        TBD: query some information via nbi_api in fallback case:
        https://github.com/swar/nba_api/blob/master/docs/examples/Basics.ipynb
        "
""
        return """
Sorry I don't understand your questions for now. Here are supported question patterns:  
# 对不起,我现在还不明白你的问题。以下是支持的问题模式:

relation:  # 关系
    - What is the relationship between Yao Ming and Lakers?  # 姚明和湖人的关系是什么?
    - How does Yao Ming and Lakers connected?  # 姚明和湖人是怎么连接的?
serving:  # 服务
    - Which team had Yao Ming served?  # 姚明曾经效力过哪些球队?
friendship:  # 友谊
    - Whom does Tim Duncan follow?  # 邓肯关注了哪些人?
    - Who are Yao Ming's friends?  # 姚明的朋友有哪些?
"
""

这个是 fallback 对应的动作 FallbackAction,就是当 intent 不在 relation、serving 和 friendship 中时,要提示用户应该怎么提问。


3.3
RelationshipAction(SiwiActionBase)

以"Yao Ming 和 Rockets 的关系是什么?"为例进行介绍,核心代码如下所示:

query = (  # 查询语句,查找两个实体之间的关系
    f'USE basketballplayer;'
    f'FIND NOLOOP PATH '
    f'FROM "{self.left_vid}" TO "{self.right_vid}" '
    f'OVER * BIDIRECT UPTO 4 STEPS YIELD path AS p;'
    )
# 举个例子
USE basketballplayer;
FIND NOLOOP PATH FROM "player133" TO "team202" OVER * BIDIRECT UPTO 4 STEPS YIELD path AS p;

在图中查找从球员"player133"到球队"team202"的最长 4 步内无循环路径,并将路径存储在变量"path"中,以别名"p"返回。详细命令解释如下所示:

命令 解释
FIND NOLOOP PATH 这部分指定了查询的类型,即查找无循环路径的图查询。
FROM "player133" TO "team202" 指定了查询的起始点和终点,即从"player133"(球员编号为 133)出发,找到通往"team202"(球队编号为 202)的路径。
OVER * 这部分表示沿着所有类型的边进行遍历。通常,"*"用于表示所有边的类型(follow 和 serve)。
BIDIRECT 表示查询是双向的,即可以沿着边的两个方向进行遍历。
UPTO 4 STEPS 限定了路径的最大步数为 4 步,即查找包括最多 4 个边的路径。
YIELD path AS p 这部分定义了查询结果的输出格式,将路径存储在一个名为"path"的变量中,并使用别名"p"返回。

输出结果如下所示:

[DEBUG] RelationshipAction intent: {'entities': {'Yao Ming': 'player133', 'Rockets': 'team202'}, 'intents': ['relationship']}
[DEBUG] query for RelationshipAction :
        USE basketballplayers;FIND NOLOOP PATH FROM "player133" TO "team202" OVER * BIDIRECT UPTO 4 STEPS YIELD path AS p;
There are at least 6 relations between Yao Ming and Rockets, one relation path is: Yao Ming serves Rockets.

使用 NebulaGraph Studio 查询结果,如下所示:


3.4
ServeAction(SiwiActionBase)

以"姚明曾经效力过哪些球队?"为例进行介绍,核心代码如下所示:

query = (
    f'USE basketballplayers;'
    f'MATCH p=(v)-[e:serve*1]->(v1) '
    f'WHERE id(v) == "{ self.player0_vid }" '
    f'    RETURN p LIMIT 100;'
    )
#举个例子
USE basketballplayers;
MATCH p=(v)-[e:serve*1]->(v1) WHERE id(v) == "player133" RETURN p LIMIT 100;

这个 Nebula Graph 查询的目的是查找从指定球员节点(ID 为"player133")出发,通过"serve"边到达其他节点的路径,并返回这些路径。查询结果将包含路径信息,其中路径由节点 v、边 e 和节点 v1 组成。这个查询最多返回 100 条符合条件的路径。详细命令解释,如下所示:

命令 解释
MATCH p=(v)-[e:serve*1]->(v1) 这部分指定了查询的模式。它创建了一个模式 p,其中包含了一个从节点 v 出发、经过边 e(带有 serve 标签,*1 表示 1 跳,即一步)到达节点 v1 的路径。这表示查询从一个球员节点(v)通过"serve"边到达另一个节点(v1)的路径。
WHERE id(v) == "player133" 这部分是一个过滤条件,限定了节点 v 的 ID 必须等于"player133"。这用于筛选起始节点是"player133"的路径。
RETURN p 这部分指定了要返回的结果。在这里,返回整个路径 p。
LIMIT 100 这是一个可选的限制条件,用于限制返回的结果数量,防止返回太多结果。

接口调用如下所示:


3.5
FollowAction(SiwiActionBase)

以"姚明的朋友有哪些?或姚明关注了哪些人?"为例进行介绍,核心代码如下所示:

query = (
    f'USE basketballplayers;'
    f'MATCH p=(v)-[e:follow*1]->(v1) '
    f'WHERE id(v) == "{ self.player0_vid }" '
    f'    RETURN p LIMIT 100;'
    )
# 举个例子
USE basketballplayers;
MATCH p=(v)-[e:follow*1]->(v1) WHERE id(v) == "player133" RETURN p LIMIT 100;

这个 Nebula Graph 查询的目的是查找从指定球员节点(ID 为"player133")出发,通过"follow"边到达其它节点的路径,并返回这些路径。查询结果将包含路径信息,其中路径由节点 v、边 e 和节点 v1 组成。这个查询最多返回 100 条符合条件的路径。详细命令解释,如下所示:

命令 解释
MATCH p=(v)-[e:follow*1]->(v1) 这部分指定了查询的模式。它创建了一个模式 p,其中包含了一个从节点 v 出发、经过边 e(带有 follow 标签,*1 表示 1 跳,即一步)到达节点 v1 的路径。这表示查询从一个球员节点(v)通过"follow"边到达另一个节点(v1)的路径。
WHERE id(v) == "player133" 这部分是一个过滤条件,限定了节点 v 的 ID 必须等于"player133"。这用于筛选起始节点是"player133"的路径。
RETURN p 这部分指定了要返回的结果。在这里,返回整个路径 p。
LIMIT 100
:
这是一个可选的限制条件,用于限制返回的结果数量,防止返回太多结果。

接口调用如下所示:


参考文献

[1] Nebula Siwi 基于图数据库的智能问答助手:
https://siwei.io/nebula-siwi/

[2] Nebula Siwi GitHub:
https://github.com/wey-gu/nebula-siwi/

[3] 示例数据 Basketballplayer:
https://docs.nebula-graph.com.cn/2.6.2/3.ngql-guide/1.nGQL-overview/1.overview/#basketballplayer

[4] Siwi Frontend:
https://github.com/wey-gu/nebula-siwi/tree/main/src/siwi_frontend

[5] pyahocorasick:
https://pyahocorasick.readthedocs.io/en/latest/#aho-corasick-methods

[6] NLP 模式高效匹配技术总结:
https://hub.baai.ac.cn/view/22048

[7] NebulaGraph 手工和 Python 操作:
https://z0yrmerhgi8.feishu.cn/wiki/YnYDwyU05iT0SAkZNtocArffniM

[8] 什么是 nGQL:
https://docs.nebula-graph.com.cn/3.6.0/3.ngql-guide/1.nGQL-overview/1.overview/

[9] Nebula Siwi:基于图数据库的智能问答助手思路分析(源码链接):
https://github.com/ai408/nlp-engineering/tree/main/``知识工程-知识图谱/NebulaGraph实战/19-Nebula Siwi:基于图数据库的智能问答助手思路分析





NLP工程化

1.本公众号以对话系统为中心,专注于Python/C++/CUDA、ML/DL/RL和NLP/KG/DS/LLM领域的技术分享。
2.本公众号Roadmap可查看飞书文档:https://z0yrmerhgi8.feishu.cn/wiki/Zpewwe2T2iCQfwkSyMOcgwdInhf

NLP工程化(公众号)

NLP工程化(星球号)

RocketMQ—RocketMQ发送同步、异步、单向、延迟、批量、顺序、批量消息、带标签消息

发送同步消息

同步消息

生产者发送消息,mq进行确认,然后返回给生产者状态。这就是同步消息。

前文demo程序就是发送的同步消息。

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知。

代码如下:

/**
     * 异步消息测试
     */
@Test
public void simpleAsyncProducer() throws Exception {
    //创建一个生产者,并指定一个组名
    DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    //启动
    producer.start();


    //指定topic,创建一个消息
    Message message = new Message("asyncTopic1", "这是一条异步消息".getBytes());

    //发送异步消息,并设置回调内容
    producer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("回调内容,发送成功");
        }

        @Override
        public void onException(Throwable throwable) {
            log.info("回调内容,发送失败");
        }
    });


    log.info("主线程执行中=========");

    System.in.read();
}

运行结果

从运行结果可以看到是不同的线程输出的内容。

发送单向消息

这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送。

代码如下:

@Test
public void oneWayMessageTest() throws Exception {
    //创建一个生产者,并指定一个组名
    DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    //启动
    producer.start();


    //指定topic,创建一个消息
    Message message = new Message("onewayTopic1", "这是一条单向消息".getBytes());

    //发送单向消息
    producer.sendOneway(message);

    producer.shutdown();
}

发送延迟消息

消息放入mq后,过一段时间,才会被监听到,然后消费.

比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

代码如下

@Test
public void msMessageTest() throws Exception{
    //创建一个生产者,并指定一个组名
    DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    //启动
    producer.start();

    //指定topic,创建一个消息
    Message message = new Message("msTopic1", "这是一条单向消息".getBytes());
    //给消息设置一个延迟时间
    message.setDelayTimeLevel(3);

    //发送延时消息
    producer.sendOneway(message);

    producer.shutdown();
}

延时等级如下:

消息延时等级

发送批量消息

代码如下:

@Test
public void testBatchProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-batch-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    // 启动实例
    producer.start();
    List<Message> msgs = Arrays.asList(
        new Message("batchTopicTest", "我是一组消息的A消息".getBytes()),
        new Message("batchTopicTest", "我是一组消息的B消息".getBytes()),
        new Message("batchTopicTest", "我是一组消息的C消息".getBytes())

    );
    SendResult send = producer.send(msgs);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

这些消息会被放到同一个队列中。

发送顺序消息

可以想象一个场景,我们在网上购物时,需要先完成下订单操作,然后再去发短信,再进行发货,需要保证顺序的。

前文我们讲的都是
并发消息
,这种消息并不能完成上述的场景逻辑。比如一个topic里有10个消息,分别在4个队列中;

  • 如果消费者,同时有20个线程在消费,可能A线程拿到消息1了,B线程拿到消息2了,但是B线程可能完成的比A线程早,这就没办法上述场景的顺序了。
  • 如果消费者只有一个线程,轮询消费四个队列中的消息时,也不能保证是网购场景中的顺序的。

这就要引出
顺序消息
:把消费者变为单线程,把下订单消息、发短信消息、发货消息放到同一个队列就可以了。

代码

消息封装成实体类如下:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageModel {
    //订单id
    private String orderId;
    //用户id
    private String userId;
    //消息描述
    private String description;
}

发送顺序消息的生产者代码如下:

/**
     * 顺序消息
     */

@Test
public void testOrderlyProducer() throws Exception {

    List<MessageModel> messageModelList = Arrays.asList(
        //用户1的订单
        new MessageModel("order-666666","user-1","下单"),
        new MessageModel("order-666666","user-1","发短信"),
        new MessageModel("order-666666","user-1","发货"),

        //用户2的订单
        new MessageModel("order-222","user-2","下单"),
        new MessageModel("order-222","user-2","发短信"),
        new MessageModel("order-222","user-2","发货")
    );

    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-orderly-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    // 启动实例
    producer.start();

    //发送顺序消息时 发送时相同用户的消息要保证有序,并且发到同一个队列里
    messageModelList.forEach(
        messageModel->{
            Message message = new Message("orderlyTopic", messageModel.toString().getBytes());
            try {
                //发送消息,相同订单号去相同队列
                producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {
                        //producer.send(message,selector,arg),第三个参数订单号会传给selector要实现的方法的arg
                        //在这里选择队列
                        int hashCode = Math.abs(arg.toString().hashCode());
                        int index = hashCode % mqs.size();
                        return mqs.get(index);
                    }
                }, messageModel.getOrderId());
            } catch (Exception e) {
                log.error("有错误发生",e);
            }
        }
    );
    // 关闭实例
    producer.shutdown();
    log.info("发送完成");
}

消费顺序消息的消费者代码如下:

//消费者
@Test
public void orderlyConsumer() throws Exception {
    //创建一个消费者,并指定一个组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-orderly-consumer-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    //订阅一个主题 *号表示订阅这个主题中所有的消息
    consumer.subscribe("orderlyTopic","*");
    //设置一个监听器(一直监听,异步回调方式)
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            log.info("线程id"+Thread.currentThread().getId());
            log.info("消息内容:"+new String(msgs.get(0).getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });

    //启动消费者
    consumer.start();

    //挂起当前jvm,防止主线程结束,让监听器一直监听
    System.in.read();

}

运行结果如下:

运行结果

可以看到同一个订单是顺序消费的。

其他问题

如果我们的消息消费失败了怎么办?

如果是并发模式,消费失败会进行重试,重试16次后还会没消费成功,会被放到死信队列里。

如果是顺序模式,如果重试失败,会无限重试,是int的最大值。

发送带标签的消息,消息过滤

如果我们有衣服订单的消息、手机订单的消息,如果我们只使用topic进行区分,就要使用两个topic;但是它们都是订单,所以在同一个topic中会好一些,Rocketmq就提供了消息过滤功能,通过tag或者key进行区分。

生产者代码如下:

@Test
public void testTagProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-tag-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    // 启动实例
    producer.start();
    Message messageTopic1 = new Message("tagTopic", "tag1", "这是topic1的消息".getBytes());
    Message messageTopic2 = new Message("tagTopic", "tag2", "这是topic2的消息".getBytes());
    producer.send(messageTopic1);
    producer.send(messageTopic2);
    // 关闭实例
    producer.shutdown();
}

消费tag1的消费者

//消费tag1的消费者
@Test
public void tagConsumer1() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    consumer.subscribe("tagTopic", "tag1");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println("我是tag1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

消费tag1和tag2的消费者

//消费tag1和tag2的消费者
@Test
public void tagConsumer2() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    consumer.subscribe("tagTopic", "tag1 || tag2");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println("我是tag1和tag2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

带key的消息

消息都会有自己的MessageId的,如下图:

messageid

那我们能否指定id呢?

在发送消息时可以指定key:

@Test
public void testKeyProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-key-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);

    String key = UUID.randomUUID().toString();

    // 启动实例
    producer.start();
    Message messageTopic1 = new Message("keyTopic", "tag1",key, "这是topic1的消息".getBytes());
    producer.send(messageTopic1);
    // 关闭实例
    producer.shutdown();
}

消费者获取key:

@Test
public void testKeyConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    consumer.subscribe("keyTopic","*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println("我们设置的key:" + msgs.get(0).getKeys());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

输出如下:

输出

1.死锁

img
要提交快照的时候由于没有人取走applyCh通道里面的东西,导致死锁。

具体解释:

2D的测试代码中在日志达到一定大小时会调用snapshot,该函数需要申请rf.mu这个互斥锁。而在提交普通的日志条目时,错误地没有先释放锁,导致snapshot无法进行下去,相关的进程卡在rf.mu这个锁上,无法完成快照,更无法处理applyCh通道的下一个日志条目。这导致了向通道中提交日志条目也会因为applyCh已满而被阻塞。

2.快速定位错误

img
可以看到打印的日志中出现了“whe: u4"的信息,就可以推知:相关的错误发现在被u4标记的代码处,

在访问日志具体条目的代码处,传入相应的标记,这样当调用getEntry函数失败时,能快速定位目标。

3.由于截断日志增加的处理

3.1

img

发生u4报错,定位到相应代码。

在leader方,由于prevLogIndex处的日志条目被截取,小于rf.log.start(), 在运行getEntry函数时发生报错。

解决方法:

1.设置prevLogIndex = rf.log.start()

2.应发送给follower的日志条目被删除,直接发送快照给follower。

3.2

出错的点在于follower这边:leader方发送出去的时候prevLogIndex没有低于其自身的start,故没有发送快照,但是接收方收到日志条目之后,由于已经截断了日志,并不能匹配prevLogIndex。

显然接收方对这种情况也需要处理,并不能仅仅返回个error就完事了。

解决方案:

设置XLen为start()+1, 即日志中的第一个条目,leader在收到回应的时候会执行nextIndex[i] = XLen, 这样就将nextIndex设置为follower方的日志第一个条目。

3.3

当prevLogIndex等于start时候,由于不匹配可能导致添加条目无法成功。

在截断日志的时候需要设置占位的条目的term为snapshot.term;无论是安装快照的时候,还是自己截断日志、生成快照的时候.

4关于应用(apply)时索引的思考

这是崩溃之前发送的消息,可以看出发送的最后一个索引号是223

这是重启之后,接收快照后而开始apply的快照,索引为229。

显然:

当start/restart后, 如果先发送的是日志条目,索引只能从1开始;但是如果是快照的话,索引可以从任意值开始,而其后的日志条目的索引值只需从该值递增即可。

一个有趣的发现:

TestSnapshotInstallUnCrash2D 每次只会使一个server崩溃,而TestSnapshotAllCrash2D 将会使得所有server同时崩溃;前者会使得commitIndex能够维持在正确值,而不回退,而后者会使得commitIndex从0开始。

5.发现不能通过从快照中恢复的测试

增加打印语句之后,发现restart之时都是没有带上快照的,这已经很说明问题了。

我并没有实现从稳定存储中读取快照的方法。

增添语句后,即可通过所有测试。

最终效果

整个实验2大致需要花费364秒左右,还是很满意的。