2024年3月

之前对LLM 推理和应用了解不多,因此抽时间梳理了一下,我们从模型量化,模型推理,以及开发平台等三个层面来梳理分析。

模型量化

模型训练时为了进度,采用的32位浮点数,因此占用的空间较大,一些大的模型需要很大的显存才能加载,且计算推理过程较慢。为了减少内存占用,提升推理速度,可以将高精度的参数转为低精度的参数,例如从 32 位的浮点数转换为 8 位整数,这个技术就叫做模型量化。

模型量化是一种将浮点计算转成低比特定点计算的技术,可以有效的降低模型计算强度、参数大小和内存消耗,但往往带来巨大的精度损失。尤其是在极低比特(<4bit)、二值网络(1bit)、甚至将梯度进行量化时,带来的精度挑战更大。

量化带来的好处

  • 保持精度:量化会损失精度,这相当于给网络引入了噪声,但是神经网络一般对噪声是不太敏 感的,只要控制好量化的程度,对高级任务精度影响可以做到很小。
  • 加速计算:传统的卷积操作都是使用FP32浮点,低比特的位数减少少计算性能也更高,INT8 相 对比 FP32 的加速比可达到3倍甚至更高
  • 节省内存:与 FP32 类型相比,FP16、INT8、INT4 低精度类型所占用空间更小,对应存储空间 和传输时间都可以大幅下降。
  • 节能和减少芯片面积:每个数使用了更少的位数,做运算时需要搬运的数据量少了,减少了访 存开销(节能),同时所需的乘法器数目也减少(减少芯片面积)

量化的方法与原理

主要有三种量化方法:

  • 量化训练 (Quant Aware Training, QAT)
    • 量化训练让模型感知量化运算对模型精度带来的影响,通过 finetune 训练降低量化误差。
  • 动态离线量化 (Post Training Quantization Dynamic, PTQ Dynamic)
    • 动态离线量化仅将模型中特定算子的权重从FP32类型映射成 INT8/16 类型。
  • 静态离线量化 (Post Training Quantization Static, PTQ Static)
    • 静态离线量化使用少量无标签校准数据,采用 KL 散度等方法计算量化比例因子

模型量化的原理是,实现浮点数与定点数据转换。

如上图所示,将一个更大范围的浮点数,转换为范围更小的数。

FP16/INT8/INT4

在huggingface上去查看模型时,会看到一些带有
fp16

int8

int4
后缀的模型,比如
Llama-2-7B-fp16

chatglm-6b-int8

chatglm2-6b-int4
,其实这些模型就是量化后的模型,
fp16
表示模型的量化精度。

  • FP32(单精度浮点数):使用32位二进制表示,其中1位用于sign,8位用于exponent,23位用于fraction。其数值范围大约是1.18e-38到3.40e38,精度大约是6到9位有效数字
  • FP16(半精度浮点数):使用16位二进制表示,其中1位用于sign,5位用于exponent,10位用于fraction。其数值范围为
    [5.96×10^-8, 65504]
    ,但实际能表示的最大正值为65504,最小正值约为0.0000000596(非规格表示下),符号位为0时代表正数
  • INT8,八位整型占用1个字节,INT8是一种定点计算方式,代表整数运算,一般是由浮点运算量化而来。在二进制中一个“0”或者“1”为一bit,INT8则意味着用8bit来表示一个数字
  • int4占用4个字节(32位)

量化精度从高到低排列顺序是:
fp16
>
int8
>
int4
,量化的精度越低,模型的大小和推理所需的显存就越小,但模型的能力也会越差。

业界有一些开源的量化模型格式,下面来介绍。

GGML

https://github.com/ggerganov/ggml

GGML全称是
Georgi Gerganov Machine Learning
,是由Georgi Gerganov开发的一个张量库(tensor library),Georgi Gerganov开源项目
llama.cpp
的创始人。

GGML是一个C写的库,可以将LLM转为为GGML格式,通过量化等技术让LLM方便进行加载和推理

  • 采用
    量化技术
    ,将原有大模型预训练结果量化(即将原有大模型FP16精度压缩成INT8、INT6精度
  • 二进制文件编码
    ,将量化后的预训练结果通过一种指定的格式变成一个二进制文件

特性:

  • 用 C 语言编写
  • 支持 16 位浮点数
  • 支持整数量化(4 位、5 位、8 位等)
  • 自动微分
  • ADAM 和 L-BFGS 优化器
  • 针对 Apple 芯片进行了优化
  • 在 x86 架构上利用 AVX/AVX2 内在函数
  • 在 ppc64 架构上利用 VSX 内在函数
  • 无第三方依赖项
  • 运行时不进行内存分配

在 HuggingFace 上,如果看到模型名称带有
GGML
字样的,比如
Llama-2-13B-chat-GGML
,说明这些模型是经过 GGML 量化的。有些 GGML 模型的名字除了带有
GGML
字样外,还带有
q4

q4_0

q5
等,比如
Chinese-Llama-2-7b-ggml-q4
,这里面的
q4
其实指的是 GGML 的量化方法,从
q4_0
开始往后扩展,有
q4_0

q4_1

q5_0

q5_1

q8_0
,在
这里
可以看到各种方法量化后的数据。

GGUF

GGML是基础的文件格式,没有版本控制或数据对齐,适用于不需要考虑文件版本兼容性或内存对齐优化的场景。2023年8月份,Georgi Gerganov创建一个新的大模型文件格式GGUF,全称GPT-Generated Unified Format,用以取代GGML格式。GGUF 与 GGML 相比,GGUF 可以在模型中添加额外的信息,而原来的 GGML 模型是不可以的,同时 GGUF 被设计成可扩展,这样以后有新功能就可以添加到模型中,而不会破坏与旧模型的兼容性。

但这个功能是
Breaking Change
,也就是说 GGML 新版本以后量化出来的模型都是 GGUF 格式的,这意味着旧的 GGML 格式以后会慢慢被 GGUF 格式取代,而且也不能将老的 GGML 格式直接转成 GGUF 格式。

GPTQ

GPTQ
是一种模型量化的方法,可以将语言模型量化成 INT8、INT4、INT3 甚至 INT2 的精度而不会出现较大的性能损失,在 HuggingFace 上如果看到模型名称带有
GPTQ
字样的,比如
Llama-2-13B-chat-GPTQ
,说明这些模型是经过 GPTQ 量化的。以
Llama-2-13B-chat
为例,该模型全精度版本的大小为 26G,使用 GPTQ 进行量化成 INT4 精度后的模型大小为 7.26G。

现在更流行的一个 GPTQ 量化工具是
AutoGPTQ
,它可以量化任何 Transformer 模型而不仅仅是
Llama
,现在 Huggingface 已经将 AutoGPTQ 集成到了 Transformers 中。

GPTQ vs GGML

GPTQ 和 GGML 是现在模型量化的两种主要方式,在实际运用中如何选择呢?

两者有以下几点异同:

  • GPTQ 在
    GPU
    上运行较快,而 GGML 在
    CPU
    上运行较快
  • 同等精度的量化模型,GGML 的模型要比 GPTQ 的稍微大一些,但是两者的推理性能基本一致
  • 两者都可以量化 HuggingFace 上的 Transformer 模型

因此,如果目标模型是在 GPU 上运行,那么优先使用GPTQ 进行量化,如果你的模型是在 CPU 上运行,那么建议使用 GGML 进行量化

模型推理框架

llama.cpp

llama.cpp 是GGML作者创始开发的一款纯C/C++的模型推理引擎,支持量化推理,支持多种设备、操作系统,最早是为了支持llama的推理,现在已经支持主流的开源模型。

llama.cpp 的一个显著特点是其对硬件的高效利用。无论是Windows/Linux用户还是macOS用户,都可以通过编译优化来提高模型推理的速度。对于Windows/Linux用户,推荐与BLAS(或cuBLAS如果有GPU)一起编译,可以显著提升prompt处理速度。而macOS用户则无需额外操作,因为llama.cpp 已对ARM NEON做优化,并且已自动启用BLAS。M系列芯片推荐使用Metal启用GPU推理,以显著提升速度。

llama.cpp 支持在本地CPU上部署量化后的模型,也就是结合上面提到的GGML,这样在超低配的硬件也能运行LLM。

chatglm_cpp

https://github.com/li-plus/chatglm.cpp

国产的chatglm模型开源后,有作者参考llama.cpp,开发了支持chatglm推理的chatglm_cpp,底层依然是基于GGML,当前支持
ChatGLM-6B
,
ChatGLM2-6B
,
ChatGLM3-6B
,
CodeGeeX2
,
Baichuan-13B
,
Baichuan-7B
,
Baichuan-13B
,
Baichuan2
,
InternLM
这些国产开源模型。

vLLM

https://github.com/vllm-project/vllm

vLLM 是来自加州大学伯克利分校的,面向GPU的大模型推理框架。

vLLm 运行大模型非常快主要使用以下方法实现的:

  1. 通过PageAttention 对attention key & value 内存进行有效的管理
  2. 连续批处理
  3. 优化的CUDA kernels

当前支持NVIDIA GPUs 和 AMD GPUs,量化方面支持
GPTQ
,
AWQ
,
SqueezeLLM
, FP8 KV Cache

MLC LLM

https://github.com/mlc-ai/mlc-llm

M
achine
L
earning
C
ompilation for
L
arge
L
anguage
M
odels (MLC LLM) 是一个高性能的通用部署解决方案,支持任何大语言模型的原生部署。 MLC LLM支持以下平台和硬件: AMD GPU、 NVIDIA GPU、 Apple GPU、 Intel GPU、 Linux / Win、 macOS、 Web 浏览器、 iOS / iPadOS、 Android。

这个框架是陈天奇(tvm发起者)团队开发的,最大的特性是可以部署到iOS 和 Android 设备上,还能在浏览器上运行SD模型和LLM模型。

DeepSpeed

微软出品的高性能推理框架,DeepSpeed-FastGen 利用分块 KV 缓存和动态分割融合连续批处理,提供了比vLLM更好的吞吐。

DeepSpeed-FastGen 支持的模型:

DeepSpeed 沿用了业界主流的
分块 KV 缓存, 连续批处理
技术,同时引入了
动态 SplitFuse
技术,这是一种新颖的提示和生成组合策略,利用动态提示和生成分解, 统一来进一步改善连续批处理和系统吞吐量。详情可参见
https://github.com/microsoft/DeepSpeed/blob/master/blogs/deepspeed-fastgen/chinese/README.md

推理框架小结

  • 如果CPU推理,llama.cpp 结合模型int4量化,最佳的选择
  • GPU推理,微软的
    DeepSpeed-FastGen
    是一个好的选择
  • 手机终端推理,MLC LLM可以作为候选

大模型应用开发平台

之所以称之为开发平台,是这些工具除了支持基本的模型推理,还有标准化的api,以及配套管理工具,可以方便去开发和管理AI应用。

Xorbits Inference

https://github.com/xorbitsai/inference/blob/main/README_zh_CN.md

Xorbits Inference(Xinference)是一个性能强大且功能全面的分布式推理框架。可用于大语言模型(LLM),语音识别模型,多模态模型等各种模型的推理。通过 Xorbits Inference,你可以轻松地一键部署你自己的模型或内置的前沿开源模型。无论你是研究者,开发者,或是数据科学家,都可以通过 Xorbits Inference 与最前沿的 AI 模型,发掘更多可能。

官方介绍的主要功能:

building-a-simple-redis-server-with-python

前几天我想到,写一个简单的东西会很整洁
雷迪斯
-像数据库服务器。虽然我有很多 WSGI应用程序的经验,数据库服务器展示了一种新颖 挑战,并被证明是学习如何工作的不错的实际方法 Python中的套接字。在这篇文章中,我将分享我在此过程中学到的知识。

我项目的目的是
编写一个简单的服务器
我可以用 我的任务队列项目称为
Huey
。 Huey使用Redis作为默认存储引擎来跟踪被引用的工作, 完成的工作和其他结果。就本职位而言, 我进一步缩小了原始项目的范围,以免造成混乱 使用代码的水域,您可以很容易地自己写,但是如果您 很好奇,你可以看看
最终结果 这里
(
文件
)。

我们将要构建的服务器将能够响应以下命令:

  • GET
    <key>
  • SET
    <key>
    <value>
  • DELETE
    <key>
  • FLUSH
  • MGET
    <key1>
    ...
    <keyn>
  • MSET
    <key1>
    <value1>
    ...
    <keyn>
    <valuen>

我们还将支持以下数据类型:

  • Strings
    and
    Binary Data
  • Numbers
  • NULL
  • Arrays (which may be nested)
  • Dictionaries (which may be nested)
  • Error messages

为了异步处理多个客户端,我们将使用
gevent
, 但是您也可以使用标准库的
SocketServer
模块与 要么
ForkingMixin

ThreadingMixin

骨架

让我们为服务器设置一个框架。我们需要服务器本身,以及 新客户端连接时要执行的回调。另外,我们需要 某种逻辑来处理客户端请求并发送响应。

这是一个开始:

from gevent import socket
from gevent.pool import Pool
from gevent.server import StreamServer

from collections import namedtuple
from io import BytesIO
from socket import error as socket_error


# We'll use exceptions to notify the connection-handling loop of problems.
class CommandError(Exception): pass
class Disconnect(Exception): pass

Error = namedtuple('Error', ('message',))


class ProtocolHandler(object):
    def handle_request(self, socket_file):
        # Parse a request from the client into it's component parts.
        pass

    def write_response(self, socket_file, data):
        # Serialize the response data and send it to the client.
        pass


class Server(object):
    def __init__(self, host='127.0.0.1', port=31337, max_clients=64):
        self._pool = Pool(max_clients)
        self._server = StreamServer(
            (host, port),
            self.connection_handler,
            spawn=self._pool)

        self._protocol = ProtocolHandler()
        self._kv = {}

    def connection_handler(self, conn, address):
        # Convert "conn" (a socket object) into a file-like object.
        socket_file = conn.makefile('rwb')

        # Process client requests until client disconnects.
        while True:
            try:
                data = self._protocol.handle_request(socket_file)
            except Disconnect:
                break

            try:
                resp = self.get_response(data)
            except CommandError as exc:
                resp = Error(exc.args[0])

            self._protocol.write_response(socket_file, resp)

    def get_response(self, data):
        # Here we'll actually unpack the data sent by the client, execute the
        # command they specified, and pass back the return value.
        pass

    def run(self):
        self._server.serve_forever()

希望以上代码相当清楚。我们已经分开了担忧,以便 协议处理属于自己的类,有两种公共方法:
handle_request

write_response
。服务器本身使用协议 处理程序解压缩客户端请求并将服务器响应序列化回 客户。The
get_response()
该方法将用于执行命令 由客户发起。

仔细查看代码
connection_handler()
方法,你可以 看到我们在套接字对象周围获得了类似文件的包装纸。这个包装器 让我们抽象一些
怪癖
通常会遇到使用原始插座的情况。函数输入 无穷循环,读取客户端的请求,发送响应,最后 客户端断开连接时退出循环(由
read()
返回 一个空字符串)。

我们使用键入的异常来处理客户端断开连接并通知用户 错误处理命令。例如,如果用户做错了 对服务器的格式化请求,我们将提出一个
CommandError
, 哪个是 序列化为错误响应并发送给客户端。

在继续之前,让我们讨论客户端和服务器将如何通信。

线程

我面临的第一个挑战是如何处理通过 线。我在网上找到的大多数示例都是毫无意义的回声服务器,它们进行了转换 套接字到类似文件的对象,并且刚刚调用
readline()
。如果我想 用新线存储一些腌制的数据或字符串,我需要一些 一种序列化格式。

在浪费时间尝试发明合适的东西之后,我决定阅读 有关文档
Redis协议
, 其中 事实证明实施起来非常简单,并且具有 支持几种不同的数据类型。

Redis协议使用请求/响应通信模式与 客户。来自服务器的响应将使用第一个字节来指示 数据类型,然后是数据,以回车/线路进给终止。

数据类型.png

让我们填写协议处理程序的类,使其实现Redis 协议。

class ProtocolHandler(object):
    def __init__(self):
        self.handlers = {
            '+': self.handle_simple_string,
            '-': self.handle_error,
            ':': self.handle_integer,
            '$': self.handle_string,
            '*': self.handle_array,
            '%': self.handle_dict}

    def handle_request(self, socket_file):
        first_byte = socket_file.read(1)
        if not first_byte:
            raise Disconnect()

        try:
            # Delegate to the appropriate handler based on the first byte.
            return self.handlers[first_byte](socket_file)
        except KeyError:
            raise CommandError('bad request')

    def handle_simple_string(self, socket_file):
        return socket_file.readline().rstrip('\r\n')

    def handle_error(self, socket_file):
        return Error(socket_file.readline().rstrip('\r\n'))

    def handle_integer(self, socket_file):
        return int(socket_file.readline().rstrip('\r\n'))

    def handle_string(self, socket_file):
        # First read the length ($<length>\r\n).
        length = int(socket_file.readline().rstrip('\r\n'))
        if length == -1:
            return None  # Special-case for NULLs.
        length += 2  # Include the trailing \r\n in count.
        return socket_file.read(length)[:-2]

    def handle_array(self, socket_file):
        num_elements = int(socket_file.readline().rstrip('\r\n'))
        return [self.handle_request(socket_file) for _ in range(num_elements)]

    def handle_dict(self, socket_file):
        num_items = int(socket_file.readline().rstrip('\r\n'))
        elements = [self.handle_request(socket_file)
                    for _ in range(num_items * 2)]
        return dict(zip(elements[::2], elements[1::2]))

对于协议的序列化方面,我们将执行与上述相反的操作: 将Python对象转换为序列化的对象!

class ProtocolHandler(object):
    # ... above methods omitted ...
    def write_response(self, socket_file, data):
        buf = BytesIO()
        self._write(buf, data)
        buf.seek(0)
        socket_file.write(buf.getvalue())
        socket_file.flush()

    def _write(self, buf, data):
        if isinstance(data, str):
            data = data.encode('utf-8')

        if isinstance(data, bytes):
            buf.write('$%s\r\n%s\r\n' % (len(data), data))
        elif isinstance(data, int):
            buf.write(':%s\r\n' % data)
        elif isinstance(data, Error):
            buf.write('-%s\r\n' % error.message)
        elif isinstance(data, (list, tuple)):
            buf.write('*%s\r\n' % len(data))
            for item in data:
                self._write(buf, item)
        elif isinstance(data, dict):
            buf.write('%%%s\r\n' % len(data))
            for key in data:
                self._write(buf, key)
                self._write(buf, data[key])
        elif data is None:
            buf.write('$-1\r\n')
        else:
            raise CommandError('unrecognized type: %s' % type(data))

将协议处理保持在其自己的类中的另一个好处是 我们可以重复使用
handle_request

write_response
建立方法 客户端库。

执行命令


Server
我们模拟的课程现在需要
get_response()
方法 已实施。命令将假定由客户端以简单方式发送 字符串或命令参数数组,因此
data
传递给
get_response()
将是字节或列表。为了简化处理,如果
data
这是一个简单的字符串,我们将通过拆分将其转换为列表 空格。

第一个参数将是命令名称,并带有任何其他参数 属于指定命令。就像我们对第一个的映射一样 字节给处理者
ProtocolHandler
, 让我们创建一个的映射 命令回调
Server
:

class Server(object):
    def __init__(self, host='127.0.0.1', port=31337, max_clients=64):
        self._pool = Pool(max_clients)
        self._server = StreamServer(
            (host, port),
            self.connection_handler,
            spawn=self._pool)

        self._protocol = ProtocolHandler()
        self._kv = {}

        self._commands = self.get_commands()

    def get_commands(self):
        return {
            'GET': self.get,
            'SET': self.set,
            'DELETE': self.delete,
            'FLUSH': self.flush,
            'MGET': self.mget,
            'MSET': self.mset}

    def get_response(self, data):
        if not isinstance(data, list):
            try:
                data = data.split()
            except:
                raise CommandError('Request must be list or simple string.')

        if not data:
            raise CommandError('Missing command')

        command = data[0].upper()
        if command not in self._commands:
            raise CommandError('Unrecognized command: %s' % command)

        return self._commands[command](*data[1:])

我们的服务器快完成了! 我们只需要执行六个命令 在
get_commands()
方法:

class Server(object):
    def get(self, key):
        return self._kv.get(key)

    def set(self, key, value):
        self._kv[key] = value
        return 1

    def delete(self, key):
        if key in self._kv:
            del self._kv[key]
            return 1
        return 0

    def flush(self):
        kvlen = len(self._kv)
        self._kv.clear()
        return kvlen

    def mget(self, *keys):
        return [self._kv.get(key) for key in keys]

    def mset(self, *items):
        data = zip(items[::2], items[1::2])
        for key, value in data:
            self._kv[key] = value
        return len(data)

而已! 我们的服务器现在可以开始处理请求了。在下一个 本节,我们将实现一个客户端与服务器进行交互。

客户端

要与服务器交互,让我们重新使用
ProtocolHandler
类到 实现一个简单的客户端。客户端将连接到服务器并发送 命令编码为列表。我们将同时使用
write_response()

handle_request()
编码请求和处理服务器响应的逻辑 分别。

class Client(object):
    def __init__(self, host='127.0.0.1', port=31337):
        self._protocol = ProtocolHandler()
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.connect((host, port))
        self._fh = self._socket.makefile('rwb')

    def execute(self, *args):
        self._protocol.write_response(self._fh, args)
        resp = self._protocol.handle_request(self._fh)
        if isinstance(resp, Error):
            raise CommandError(resp.message)
        return resp


execute()
方法上,我们可以传递任意参数列表,这些参数将被编码为数组并发送到服务器。来自服务器的响应被解析并作为Python对象返回。为了方便起见,我们可以为各个命令编写客户端方法:

class Client(object):
    # ...
    def get(self, key):
        return self.execute('GET', key)

    def set(self, key, value):
        return self.execute('SET', key, value)

    def delete(self, key):
        return self.execute('DELETE', key)

    def flush(self):
        return self.execute('FLUSH')

    def mget(self, *keys):
        return self.execute('MGET', *keys)

    def mset(self, *items):
        return self.execute('MSET', *items)

为了测试我们的客户端,让我们配置Python脚本以启动服务器 直接从命令行执行时:

测试服务器

要测试服务器,只需从命令行执行服务器的Python模块即可。在另一个终端中,打开Python解释器并导入
Client
来自服务器模块的类。安装客户端将打开连接,您可以开始运行命令!

>>> from server_ex import Client
>>> client = Client()
>>> client.mset('k1', 'v1', 'k2', ['v2-0', 1, 'v2-2'], 'k3', 'v3')
3
>>> client.get('k2')
['v2-0', 1, 'v2-2']
>>> client.mget('k3', 'k1')
['v3', 'v1']
>>> client.delete('k1')
1
>>> client.get('k1')
>>> client.delete('k1')
0
>>> client.set('kx', {'vx': {'vy': 0, 'vz': [1, 2, 3]}})
1
>>> client.get('kx')
{'vx': {'vy': 0, 'vz': [1, 2, 3]}}
>>> client.flush()
2

完整代码

from gevent import socket
from gevent.pool import Pool
from gevent.server import StreamServer

from collections import namedtuple
from io import BytesIO
from socket import error as socket_error
import logging


logger = logging.getLogger(__name__)


class CommandError(Exception): pass
class Disconnect(Exception): pass

Error = namedtuple('Error', ('message',))


class ProtocolHandler(object):
    def __init__(self):
        self.handlers = {
            '+': self.handle_simple_string,
            '-': self.handle_error,
            ':': self.handle_integer,
            '$': self.handle_string,
            '*': self.handle_array,
            '%': self.handle_dict}

    def handle_request(self, socket_file):
        first_byte = socket_file.read(1)
        if not first_byte:
            raise Disconnect()

        try:
            # Delegate to the appropriate handler based on the first byte.
            return self.handlers[first_byte](socket_file)
        except KeyError:
            raise CommandError('bad request')

    def handle_simple_string(self, socket_file):
        return socket_file.readline().rstrip('\r\n')

    def handle_error(self, socket_file):
        return Error(socket_file.readline().rstrip('\r\n'))

    def handle_integer(self, socket_file):
        return int(socket_file.readline().rstrip('\r\n'))

    def handle_string(self, socket_file):
        # First read the length ($<length>\r\n).
        length = int(socket_file.readline().rstrip('\r\n'))
        if length == -1:
            return None  # Special-case for NULLs.
        length += 2  # Include the trailing \r\n in count.
        return socket_file.read(length)[:-2]

    def handle_array(self, socket_file):
        num_elements = int(socket_file.readline().rstrip('\r\n'))
        return [self.handle_request(socket_file) for _ in range(num_elements)]
    
    def handle_dict(self, socket_file):
        num_items = int(socket_file.readline().rstrip('\r\n'))
        elements = [self.handle_request(socket_file)
                    for _ in range(num_items * 2)]
        return dict(zip(elements[::2], elements[1::2]))

    def write_response(self, socket_file, data):
        buf = BytesIO()
        self._write(buf, data)
        buf.seek(0)
        socket_file.write(buf.getvalue())
        socket_file.flush()

    def _write(self, buf, data):
        if isinstance(data, str):
            data = data.encode('utf-8')

        if isinstance(data, bytes):
            buf.write('$%s\r\n%s\r\n' % (len(data), data))
        elif isinstance(data, int):
            buf.write(':%s\r\n' % data)
        elif isinstance(data, Error):
            buf.write('-%s\r\n' % error.message)
        elif isinstance(data, (list, tuple)):
            buf.write('*%s\r\n' % len(data))
            for item in data:
                self._write(buf, item)
        elif isinstance(data, dict):
            buf.write('%%%s\r\n' % len(data))
            for key in data:
                self._write(buf, key)
                self._write(buf, data[key])
        elif data is None:
            buf.write('$-1\r\n')
        else:
            raise CommandError('unrecognized type: %s' % type(data))


class Server(object):
    def __init__(self, host='127.0.0.1', port=31337, max_clients=64):
        self._pool = Pool(max_clients)
        self._server = StreamServer(
            (host, port),
            self.connection_handler,
            spawn=self._pool)

        self._protocol = ProtocolHandler()
        self._kv = {}

        self._commands = self.get_commands()

    def get_commands(self):
        return {
            'GET': self.get,
            'SET': self.set,
            'DELETE': self.delete,
            'FLUSH': self.flush,
            'MGET': self.mget,
            'MSET': self.mset}

    def connection_handler(self, conn, address):
        logger.info('Connection received: %s:%s' % address)
        # Convert "conn" (a socket object) into a file-like object.
        socket_file = conn.makefile('rwb')

        # Process client requests until client disconnects.
        while True:
            try:
                data = self._protocol.handle_request(socket_file)
            except Disconnect:
                logger.info('Client went away: %s:%s' % address)
                break

            try:
                resp = self.get_response(data)
            except CommandError as exc:
                logger.exception('Command error')
                resp = Error(exc.args[0])

            self._protocol.write_response(socket_file, resp)

    def run(self):
        self._server.serve_forever()

    def get_response(self, data):
        if not isinstance(data, list):
            try:
                data = data.split()
            except:
                raise CommandError('Request must be list or simple string.')

        if not data:
            raise CommandError('Missing command')

        command = data[0].upper()
        if command not in self._commands:
            raise CommandError('Unrecognized command: %s' % command)
        else:
            logger.debug('Received %s', command)

        return self._commands[command](*data[1:])

    def get(self, key):
        return self._kv.get(key)

    def set(self, key, value):
        self._kv[key] = value
        return 1

    def delete(self, key):
        if key in self._kv:
            del self._kv[key]
            return 1
        return 0

    def flush(self):
        kvlen = len(self._kv)
        self._kv.clear()
        return kvlen

    def mget(self, *keys):
        return [self._kv.get(key) for key in keys]

    def mset(self, *items):
        data = zip(items[::2], items[1::2])
        for key, value in data:
            self._kv[key] = value
        return len(data)


class Client(object):
    def __init__(self, host='127.0.0.1', port=31337):
        self._protocol = ProtocolHandler()
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.connect((host, port))
        self._fh = self._socket.makefile('rwb')

    def execute(self, *args):
        self._protocol.write_response(self._fh, args)
        resp = self._protocol.handle_request(self._fh)
        if isinstance(resp, Error):
            raise CommandError(resp.message)
        return resp

    def get(self, key):
        return self.execute('GET', key)

    def set(self, key, value):
        return self.execute('SET', key, value)

    def delete(self, key):
        return self.execute('DELETE', key)

    def flush(self):
        return self.execute('FLUSH')

    def mget(self, *keys):
        return self.execute('MGET', *keys)

    def mset(self, *items):
        return self.execute('MSET', *items)


if __name__ == '__main__':
    from gevent import monkey; monkey.patch_all()
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.DEBUG)
    Server().run()

各位看官,如对你有帮助欢迎点赞,收藏,转发,关注公众号【Python魔法师】获取更多Python魔法知识~

qrcode.jpg

开源项目地址:
https://gitee.com/lowcodexaf/rules-engine-editor

前言

本项目是基于XAFBlazor的规则引擎编辑器,规则引擎采用的是微软开源的RulesEngine

RulesEngine项目地址:
https://github.com/microsoft/RulesEngine

背景

在软件开发中,规则引擎扮演着至关重要的角色。它允许开发人员将业务规则从代码中分离出来,以便更灵活地管理和调整规则。然而,随着规则节点数量和复杂性的增加,手动编辑规则变得越来越繁琐。因此,我决定开发一款规则引擎编辑器。

演示

功能

规则节点

每一个规则是一个节点,使得规则的创建、编辑和管理变得更加简单明了。每个规则节点不仅可以表示一个独立的规则,还可以容纳子规则,形成复杂的规则结构,从而满足各种业务场景下的需求。

规则关系

规则之间的关系可以灵活地进行定义,支持或与并的逻辑关系,同时子规则的结果决定了父级规则的结果,也就是说如果一个规则包含了子规则,那它返回的结果是子规则的结果,同时忽略它自身的表达式。

规则验证

编辑器内置了规则验证,可以对规则进行实时的验证,并在验证失败时及时提供反馈。当规则验证失败时,编辑器会在节点的右上角显示一个红色图标,将鼠标悬停在图标上,即可查看详细的验证结果,单击图标将会弹出一个验证结果对话框。

规则设置

规则的设置可以通过多种方式进行,菜单的属性菜单项、双击规则节点,它都会弹出一个规则设置对话框。

规则操作

包括规则节点的上移、下移、置顶、置底等,还可以对规则节点进行复制,剪切,粘贴等操作

导航功能

可以通过平移、缩放、适应内容等操作,自由地调整画布的显示,以便更好地查看和编辑规则。此外,编辑器还提供了MiniMap功能,能够在画布上显示全局视图,并支持通过MiniMap对画布进行平移或缩放,进一步提高了编辑效率。

LDBC(Linked Data Benchmark Council)Social Network Benchmark,简称 LDBC SNB,是一种针对社交网络场景的评估图数据库性能的基准测试。

LDBC 简介

除了 Social Network Benchmark,LDBC 旗下目前还有其他几种基准测试:Graphalytics Benchmark,Financial Benchmark 和 Semantic Publish Benchmark,分别针对图分析、金融和 RDF 的场景。Social Network Benchmark 是 LDBC 最早的提出的基准测试,已经成为国内外最主流的图数据库基准测试,在国内很多图数据库招标也会将 LDBC SNB 作为性能测试的一项。但需要说明的是,LDBC 本身作为一个非盈利组织,只提供官方审计。
不同图数据库可能受到运行环境以及基准测试的相关参数影响,因此测试结果的横向对比没有任何意义

LDBC SNB 主要包括三个主要部分:

  1. Data Generator
    :这是一个数据生成工具,用于生成具有社交网络特性的大规模复杂数据。这些数据包括人、帖子、评论、地理位置、组织和其他一些社交网络的典型实体和关系。
  2. Interactive Workload
    :主要针对 OLTP,模拟了用户在社交网络上的日常活动,例如发布帖子、添加好友、点赞等。读请求以查询以一到两跳为主,同时可能会伴随一些写请求。
  3. Business Intelligence Workload
    :主要针对 OLAP,模拟了对社交网络数据进行深入分析,以全图查询为主。例如分析用户的社交行为、社区的形成和演变,以及其他一些需要复杂分析和大量数据处理的任务。

LDBC SNB
的论文里还提到了一个 SNB Algorithms,顾名思义主要是跑图算法的,如 PageRank、社区发现、广度搜索等。但论文是 2015 年发表的,当时描述这个场景还在起草中,目前已经将这部分移到了 Graphalytics Benchmark。

此外,想要运行 LDBC SNB 测试,还需要一个官方提供的 Driver。不同的数据库需要基于 Driver 的接口实现相应的 Connector,用来连接 Driver 和数据库。之后 Driver 会根据 Benchmark 的相关参数生成 Workload(这里可以理解为一系列的查询语句),并驱动待测数据库执行这些查询语句,最终得到性能测试结果。

整个 LDBC SNB 基准测试的流程如下,主要分成
准备阶段

基准测试

结果输出
这三个阶段。

准备阶段主要执行数据生成,包括初次导入的全量数据,以及后续实时更新的数据。此外在官方审计中,还需要在 SF10 Dataset 上进行 Validation,因此这一阶段也会生成用于校验的数据。

基准测试阶段会先在 SF10 Dataset(在本文文末介绍了何为 SF)上进行 Validation,之后会在 SF30 或者 SF100 Dataset 进行性能测试。Validation 的过程就是在数据导入之后,由 Driver 根据之前准备阶段的一系列 query 和期望结果,对数据库的查询结果进行校验,以确保数据库的查询结果正确。Validation 的这个过程没有时间要求。而之后的性能测试分为导入、预热、性能测试,数据库可以有 30 分钟的预热时间,而在性能测试至少要持续两个小时,最终将测试结果汇总并输出。

figure

由于篇幅限制,我们这一系列重点介绍 SNB Interactive Workload 相关内容。这一篇,我们主要会结合论文,介绍 SNB 的 Schema 以及数据生成,也就是准备阶段。

LDBC SNB Schema 生成

为了和 SNB 中的数据命名统一,本文相关名称我会用英文,所以读起来可能会有些怪怪的。为了降低理解成本,每个英文单词首次出现后面会跟随对应的中文注释讲解。

SNB 的数据主要是模拟了一个类似 Facebook 的社交网络。其中数据都是围绕
Person
(人)构建而来,
Person
之间会构成
Friendship
(情谊)网络。每个
Person
可能会有若干
Forum
(特定讨论区),
Person
可以在
Forum
中下面发送若干
Post
(帖子),其他
Person
可能会
likes
(点赞)其中一些
Message
(消息)。

以上这些元素的数据量主要会受
Person
和时间的影响:

  • 有更多朋友的人会发送更多的评论或点赞
  • 时间越长,会结交更多的朋友,评论或点赞数量也会上升

还有一部分数据不会随
Person
数量而变化,主要包括一些
Organization
(组织,这里主要是学校)以及
Place
(地方,这里主要是居住城市、国家等地理信息)。这部分数据会在数据生成时起一些作用,比如在同一时期在同一个学校上学的人更有可能称为朋友。

SNB 的完整 Schema 如下图所示:

figure

大多数图数据库在进行测试时候,会将实体建模为点,而不同关系会建模为边。但这只是一个惯例,
SNB 的数据建模和实际数据库中的 Schema 可以不同,只要数据库能够完成相应 Workload 的查询即可

LDBC SNB 数据生成

SNB 的一个重要部分是 Data Generator(下文称为 DataGen),用来生成满足上面 Schema 的数据。Generator 生成的数据由以下三个参数决定:

  1. Person
    的个数
  2. 模拟多少年的数据
  3. 从哪一年开始模拟

根据官方文档,DataGen 生成的数据有以下性质:

  • 现实性
    :生成数据模拟了一个真实的社交网络。一方面,生成数据中的属性、基数、数据相关性和分布经过精心设置,从而能够模拟 Facebook 等真实社交网络。另一方面,其原始数据来自于
    DBpedia
    ,保证数据中的属性值真实且相关。
  • 可扩展性
    :针对不同规模和预算的系统,DataGen 能够生成不同大小的数据集(GB 到 TB 级),此外 DataGen 可以在单机,或者是一个集群中完成数据生成。
  • 确定性
    :无论用来生成数据的机器数量多少、机器配置是高还是低,DataGen 生成的数据都是确定且相同的。这一重要功能确保了任意一个数据系统都能使用相同的数据集,保证不同系统环境之间的测评比较公平且基准测试结果可重复。
  • 易用性
    :DataGen 被设计得尽可能易于使用。

整个数据生成的流程图如下所示,我们会分解为几部分介绍:

figure

生成属性分布

第一步是初始化。DataGen 使用的原始数据来自于 DBpedia,针对每一个属性,DataGen 会根据以下方面决定属性的分布:

  • 有多少种可能的属性值
  • 每一种属性值出现的概率

最终将属性的分布情况作为资源文件以及 DataGen 的参数保存下来。

生成 Person 和 Friendship

前面也提到过,SNB 的 Schema 的核心是
Person
,这也体现在数据生成过程中。接下来 DataGen 就会生成所有
Person
,以及
Person
中一部分后续操作所需要的信息,比如每个
Person
有多少
Friendship
(这个值非常重要,其分布满足 Power law(幂定律)),
Person
所就读的大学,
Person
所就职的公司等。

接下来,DataGen 会创建每个
Person

Friendship
关系(即流程图中的
knows
)。和真实社交网络一样,有相同兴趣或者行为的人,很有可能会连接在一起。为了模拟这样的社交网络,SNB 在生成
Friendship
时会考虑以下三个维度:

  1. Person
    所就读的大学,就读时间,以及大学所在城市
  2. Person
    的兴趣
  3. 每个
    Person
    会生成一个随机值,随机值越相近代表其越类似(这是为了模拟不是所有朋友都是通过大学和兴趣结交的)

三个维度分别占每个
Person

Friendship
关系权重的 45%,45% 和 10%,也就将
Person
之间建边的过程分成了三个子步骤。

DataGen 会依次根据三个维度将所有
Person
进行排序(每次只按一个维度进行排序),然后将排序过后的
Person
切分为不相交的多个部分,分发给不同 Worker 进程。即便是切分之后,每个 Worker 线程负责的
Person
可能也可能超过内存大小。因此,Worker 线程会维护一个滑动窗口,滑动窗口内的
Person
之间建立
Friendship
关系的概率满足几何分布。

如下图所示:

figure

假设现在根据就读大学这个维度进行了排序,得到了一个
Person
有序序列。之后 Worker 就会维护一个滑动窗口,每次为滑动窗口最左侧的人生成
Friendship
关系(上图当前是 P2),滑动窗口内的其他人和窗口第一个人建立
Friendship
的比例满足几何分布。

直到滑动窗口的第一个人建立了足够多的
Friendship
之后,滑动窗口的起点会移到下一个人。

这里没有深究滑动窗口的大小、几何分布的参数甚至是随机生成器的参数,不知道在出现滑动窗口内无法生成足够多 Friendship 关系时,DataGen 如何处理。

将三个维度都经过排序、分发、按滑动窗口建边之后,DataGen 就进入了下一阶段。

生成社交活动

生成完
Person

Friendship
之后,DataGen 就开始生成每个
Person
的社交活动,包括
Forum

Post

Comment
。这部分数据也有一些相关性存在:

  1. 有越多
    Friendship

    Person
    在社交网络上会越活跃
  2. 每个
    Person
    更可能在自己感兴趣或者就读大学相关的
    Forum
    进行
    Post
    或者
    Comment
  3. 社交活动和时间是有相关性的,比如接近世界杯,足球相关的讨论就会激增

最终输出

经过以上步骤之后,DataGen 完成了数据生成,模拟的社交网络图会分成两部分进行输出:

  • Dataset:90% 的数据用于初始导入
  • Update Streams:10% 的数据用于后续实时更新

除此之外,还会生成后续 Workload 中请求的参数(主要是起点)。关于参数生成我们会在下一篇详细解释,这里简单描述一下 SNB 的读请求 Workload。Interactive Workload 主要的查询希望在一秒以内得到查询结果,所有读 query 都是从图中的一个点出发,获取很小一部分的子图信息。另外,因不同起点的出入度不同,基本上也就决定了这次读请求会访问的数据量。

为了测试不同系统和场景,SNB 定义了比例因子(Scale Factor,即所谓的 SF)用来控制最终生成的数据量大小。比如,SF1 原始数据大小为 1 GB,同理 SF0.1 和 SF300 的大小为 100 MB 和 300 GB。不同比例因子的各个类型的点边数据量如下表所示:

figure

最终生成的 Dataset 分为两大类:Static 和 Dynamic,格式都是 CSV。根据 DataGen 配置的线程数量大小,最终生成的数据也会分为多个分片。Static 包含
Organization

Place

Tag
等,都是基于 DBpedia 生成的静态数据,其数量不会随着比例因子变化而变化。换而言之,这部分数据与
Person
的个数无关。而 Dynamic 部分主要包括
Person

knows
(即前面数据生成部分描述的
Friendship
)、
Forum

Post

Comment
等。

而 Update Streams 中包含了所有更新的操作,主要就是模拟实时注册新用户、评论、点赞、加好友等等行为。

Reference

到这里准备阶段大概就介绍完了,在准备阶段最终生成的请求参数部分我们会在下一篇讲述Workload时再展开。

关于 NebulaGraph
NebulaGraph 是一款开源的分布式图数据库,自 2019 年开源以来,先后被美团、京东、360 数科、快手、众安金融等多家企业采用,应用在智能推荐、金融风控、数据治理、知识图谱等等应用场景。GitHub 地址:
https://github.com/vesoft-inc/nebula

作者:
critical27

前言

经常有小伙伴在技术群里问:
有什么好用的Redis可视化管理工具推荐的吗?
, 今天大姚给大家分享一款我一直在用的开源、免费(MIT License)、跨平台的Redis可视化管理工具:Another Redis Desktop Manager。

Redis介绍

Redis (Remote Dictionary Server) 是一个使用 C 语言编写的,开源的 (遵守 BSD 协议) 高性能的、支持网络、可基于内存亦可持久化的日志型、Key-Value的NoSQL数据库。

工具介绍

Another Redis Desktop Manager是一款更快、更好、更稳定的Redis桌面(GUI)管理客户端,兼容Windows、Mac、Linux,性能出众,轻松加载海量键值。

支持哨兵, 集群, ssh通道, ssl认证, stream, subscribe订阅, 树状视图, 命令行, 以及暗黑模式; 多种格式化方式, 甚至能够自定义格式化脚本, 满足你的一切需求。

工具下载

工具界面展示