2023年4月

坊间有传MacOs系统不适合机器(ml)学习和深度(dl)学习,这是板上钉钉的刻板印象,就好像有人说女生不适合编程一样的离谱。现而今,无论是
Pytorch框架的MPS模式
,还是最新的Tensorflow2框架,都已经可以在M1/M2芯片的Mac系统中毫无桎梏地使用GPU显卡设备,本次我们来分享如何在苹果MacOS系统上安装和配置Tensorflow2框架(CPU/GPU)。

Tensorflow2深度学习环境安装和配置

首先并不需要任何虚拟环境,直接本地安装Python3.10即可,请参见:
一网成擒全端涵盖,在不同架构(Intel x86/Apple m1 silicon)不同开发平台(Win10/Win11/Mac/Ubuntu)上安装配置Python3.10开发环境
,这里不再赘述。

随后安装Tensorflow本体:

pip3 install tensorflow-macos

这里系统会自动选择当前Python版本的Tensorflow安装包:

➜  ~ pip install tensorflow-macos  
Collecting tensorflow-macos  
  Downloading tensorflow_macos-2.12.0-cp310-cp310-macosx_12_0_arm64.whl (200.8 MB)  
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 200.8/200.8 MB 4.7 MB/s eta 0:00:00

安装包大小为200兆左右,如果下载不了,可以选择在pip官网直接下载基于python3.10的安装包:pypi.org/project/tensorflow-macos/#files

然后直接将whl文件拖拽到终端安装即可。

接着安装Tensorflow的GPU插件:tensorflow-metal,它是一个TensorFlow的后端,使用苹果的Metal图形API来加速神经网络计算。Metal是一种高性能图形和计算API,专门为苹果设备的GPU设计,可以实现更快的神经网络计算。使用tensorflow-metal可以显著提高在苹果设备上运行TensorFlow的性能,尤其是在使用Macs M1和M2等基于苹果芯片的设备时。

pip3 install --user tensorflow-metal

注意这里安装命令必须带上--user参数,否则可能会报这个错误:



Non-OK-status: stream_executor::MultiPlatformManager::RegisterPlatform( std::move(cplatform)) status: INTERNAL: platform is already registered with name: "METAL"


安装好之后,在Python终端运行命令:

import tensorflow  
tensorflow.config.list_physical_devices()

程序返回:

>>> import tensorflow  
>>> tensorflow.config.list_physical_devices()  
[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'), PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

可以看到,Tensorflow用于计算的物理设备既支持CPU,也支持GPU,也就是显卡。

接着,在编写一个完整的测试脚本 test.py:

import sys  
import tensorflow.keras  
import pandas as pd  
import sklearn as sk  
import scipy as sp  
import tensorflow as tf  
import platform  
print(f"Python Platform: {platform.platform()}")  
print(f"Tensor Flow Version: {tf.__version__}")  
print(f"Keras Version: {tensorflow.keras.__version__}")  
print()  
print(f"Python {sys.version}")  
print(f"Pandas {pd.__version__}")  
print(f"Scikit-Learn {sk.__version__}")  
print(f"SciPy {sp.__version__}")  
gpu = len(tf.config.list_physical_devices('GPU'))>0  
print("GPU is", "available" if gpu else "NOT AVAILABLE")

这里打印出深度学习场景下常用的库和版本号:

➜  chatgpt_async git:(main) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/chatgpt_async/tensof_test.py"  
Python Platform: macOS-13.3.1-arm64-arm-64bit  
Tensor Flow Version: 2.12.0  
Keras Version: 2.12.0  
  
Python 3.10.9 (main, Dec 15 2022, 17:11:09) [Clang 14.0.0 (clang-1400.0.29.202)]  
Pandas 1.5.2  
Scikit-Learn 1.2.0  
SciPy 1.10.0  
GPU is available

一望而知,在最新的macOS-13.3.1系统中,基于Python3.10.9玩儿Tensorflow2.1没有任何问题。

至此,Tensorflow2就配置好了。

Tensorflow框架GPU和CPU测试

为什么一定要让Tensorflow支持GPU?GPU或图形处理单元与CPU类似,同样具有许多核心,允许它们同时进行更快的计算(并行性)。这个特性非常适合执行大规模的数学计算,如计算图像矩阵、计算特征值、行列式等等。

简而言之,GPU可以以并行方式运行代码并获得简明的结果,同时由于能够处理高强度的计算,因此可以比CPU更快的获得计算结果。

这里我们通过CIFAR-10项目进行测试,TensorFlow CIFAR-10项目是一个经典的计算机视觉项目,旨在训练一个模型,能够对CIFAR-10数据集中的图像进行分类。CIFAR-10数据集包含60,000张32x32像素的彩色图像,分为10个类别,每个类别包含6,000张图像。该项目的目标是训练一个深度神经网络模型,能够对这些图像进行准确的分类:

import tensorflow as tf  
from tensorflow import keras  
import numpy as np  
import matplotlib.pyplot as plt  
(X_train, y_train), (X_test, y_test) = keras.datasets.cifar10.load_data()  
  
X_train_scaled = X_train/255  
X_test_scaled = X_test/255  
# one hot encoding labels  
y_train_encoded = keras.utils.to_categorical(y_train, num_classes = 10, dtype = 'float32')  
y_test_encoded = keras.utils.to_categorical(y_test, num_classes = 10, dtype = 'float32')  
  
def get_model():  
    model = keras.Sequential([  
        keras.layers.Flatten(input_shape=(32,32,3)),  
        keras.layers.Dense(3000, activation='relu'),  
        keras.layers.Dense(1000, activation='relu'),  
        keras.layers.Dense(10, activation='sigmoid')      
    ])  
    model.compile(optimizer='SGD',  
              loss='categorical_crossentropy',  
              metrics=['accuracy'])  
    return model

首先测试CPU性能:

%%timeit -n1 -r1  
# CPU  
with tf.device('/CPU:0'):  
    model_cpu = get_model()  
    model_cpu.fit(X_train_scaled, y_train_encoded, epochs = 10)

这段代码使用了%%timeit -n1 -r1魔术命令来测试在CPU上训练模型的时间。-n1表示只运行一次,-r1表示只运行一轮。如果没有指定这些参数,则会运行多次并计算平均值。/CPU:0指的是第一个CPU(如果计算机只有一个CPU,则是唯一的CPU)。

这里使用get_model()函数获取模型,使用model_cpu.fit()方法在CPU上训练模型,使用X_train_scaled和y_train_encoded作为输入数据,并在10个epoch内进行训练。最后,使用%%timeit命令来测试训练模型所需的时间,以便比较不同设备的性能。

程序返回:

50000/50000 [==========================] - 80s 2ms/sample  
  
14min 9s

需要14分钟。

接着测试GPU性能:

%%timeit -n1 -r1  
# GPU  
with tf.device('/GPU:0'):  
    model_gpu = get_model()  
    model_gpu.fit(X_train_scaled, y_train_encoded, epochs = 10)

程序返回:

50000/50000 [==========================] - 11s 227us/sample  
  
1min 55s

一分多钟,很明显在GPU上训练模型比在CPU上训练模型更快,因为GPU可以同时处理多个任务。

结语

苹果MacOs系统可以承担深度学习任务,但术业有专攻,算力层面还是比不上配置N卡的其他平台,这是不争的事实。没错,更好的选择是RTX3090,甚至是4090,但一块RTX4090显卡的价格是1500刀左右,这还意味着CPU、内存、主板和电源都得单买,而一台m2芯片的Mac book air的价格是多少呢?

本文地址
https://www.cnblogs.com/zichliang/p/17303759.html

github地址:
https://github.com/dgrijalva/jwt-go

何为 jwt token?

什么是JSON Web Token?
JSON Web Token(JWT)是一个开放标准(RFC 7519),它定义了一种紧凑且自包含的方式,用于在各方之间以JSON方式安全地传输信息。由于此信息是经过数字签名的,因此可以被验证和信任。可以使用秘密(使用HMAC算法)或使用RSA或ECDSA的公钥/私钥对对JWT进行签名。
直白的讲jwt就是一种用户认证(区别于session、cookie)的解决方案。

jwt的优势与劣势

优点:

  1. 多语言支持
  2. 通用性好,不存在跨域问题
  3. 数据签名相对安全。
  4. 不需要服务端集中维护token信息,便于扩展。

缺点:
1、用户无法主动登出,只要token在有效期内就有效。这里可以考虑redis设置同token有效期一直的黑名单解决此问题。

2、token过了有效期,无法续签问题。可以考虑通过判断旧的token什么时候到期,过期的时候刷新token续签接口产生新token代替旧token

JWT的构成

Header是头部
Jwt的头部承载两部分信息:
声明类型,这里是jwt
声明加密的算法 通常直接使用 HMAC SHA256

Playload(载荷又称为Claim)

playload可以填充两种类型数据
简单来说就是 比如用户名、过期时间等,

  1. 标准中注册的声明

iss: 签发者
sub: 面向的用户
aud: 接收方
exp: 过期时间
nbf: 生效时间
iat: 签发时间
jti: 唯一身份标识

  1. 自定义声明

Signature(签名)

是由header、payload 和你自己维护的一个 secret 经过加密得来的
签名的算法:

HMACSHA256(
    base64UrlEncode(header) + "." +
    base64UrlEncode(payload),
    secret
)

golang-jwt/jwt

安装

go get -u github.com/golang-jwt/jwt/v4

这里注意 **最新版是V5 但是我们使用的V4, V5 的用法 也一样 不过需要实现Claims的接口方法 一共有六个左右。并且更加严谨了 **

注册声明结构体

注册声明是JWT声明集的结构化版本,仅限于注册声明名称

type JwtCustomClaims struct {
	ID   int
	Name string
	jwt.RegisteredClaims
}

生成Token

首先需要初始化Clamins 其次在初始化结构体中注册并且设置好过期时间 主题 以及生成时间等等。。
然后会发现 jwt.RegisteredClaims
在这个方法中 还需要实现Claims接口 还需要定义几个方法

如上图所示
然后我们使用
使用HS256 的签名加密方法使用指定的签名方法和声明创建一个新的[Token]
代码如下

// 本文地址 https://www.cnblogs.com/zichliang/p/17303759.html
// GenerateToken 生成Token
func GenerateToken(id int, name string) (string, error) {
	// 初始化
	iJwtCustomClaims := JwtCustomClaims{
		ID:   id,
		Name: name,
		RegisteredClaims: jwt.RegisteredClaims{
			// 设置过期时间 在当前基础上 添加一个小时后 过期
			ExpiresAt: jwt.NewNumericDate(time.Now().Add(viper.GetDuration("jwt.TokenExpire") * time.Millisecond)),
			// 颁发时间 也就是生成时间
			IssuedAt: jwt.NewNumericDate(time.Now()),
			//主题
			Subject: "Token",
		},
	}
	
	token := jwt.NewWithClaims(jwt.SigningMethodHS256, iJwtCustomClaims)
	return token.SignedString(stSignKey)
}

还有一个小坑 这里的
stsignKey
必须是byte字节的
所以我们在设置签名秘钥 必须要使用byte强转

像这个样子。

然后我们去执行
传入一个ID 和一个name

token, _ := utils.GenerateToken(1, "张三")
fmt.Println(token)


得到如下值
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6MSwiTmFtZSI6IuW8oOS4iSIsIlJlZ2lzdGVyZWRDbGFpbXMiOnsic3ViIjoiVG9rZW4iLCJleHAiOjE2ODExODI2MDYsImlhdCI6MTY4MTE4MjYwNn19.AmOf60S2xby6GmlGgNo4Q5b01cRoAqXWhGorzxbJ2-Q

解析Token

https://jwt.io/
在写代码之前,我们把上面的token丢到上面网站中解析一下

可以发现 有三部分被解析出来了

  1. Header 告诉我们用的是什么算法,类型是什么
  2. PayLoad 我们自定义的一些数据
  3. Signature 之后服务器解析做的签名验证

代码解析token

  1. 声明一个空的数据声明
  2. 调用 jwt.ParseWithClaims 方法
  3. 传入token 数据声明接口,
  4. 判断Token是否有效
  5. 返回token
// ParseToken 解析token
func ParseToken(tokenStr string) (JwtCustomClaims, error) {
	// 声明一个空的数据声明
	iJwtCustomClaims := JwtCustomClaims{}
	//ParseWithClaims是NewParser().ParseWithClaims()的快捷方式
	//第一个值是token ,
	//第二个值是我们之后需要把解析的数据放入的地方,
	//第三个值是Keyfunc将被Parse方法用作回调函数,以提供用于验证的键。函数接收已解析但未验证的令牌。
	token, err := jwt.ParseWithClaims(tokenStr, &iJwtCustomClaims, func(token *jwt.Token) (interface{}, error) {
		return stSignKey, nil
	})

	// 判断 是否为空 或者是否无效只要两边有一处是错误 就返回无效token
	if err != nil && !token.Valid {
		err = errors.New("invalid Token")
	}
	return iJwtCustomClaims, err
}

返回成功如下图所示

由于我们主动抛了个错,那我们如果手动传入错的token 看他是否会抛出错误提示呢?

jwtCustomClaim, err := utils.ParseToken(token + "12312323123")

结果:

答案是会。

完整代码

完整代码
package utils

import (
	"errors"
	"fmt"
	"github.com/golang-jwt/jwt/v4"
	"github.com/spf13/viper"
	"time"
)

// 把签发的秘钥 抛出来
var stSignKey = []byte(viper.GetString("jwt.SignKey"))

// JwtCustomClaims 注册声明是JWT声明集的结构化版本,仅限于注册声明名称
type JwtCustomClaims struct {
	ID               int
	Name             string
	RegisteredClaims jwt.RegisteredClaims
}

func (j JwtCustomClaims) Valid() error {
	return nil
}

// GenerateToken 生成Token
func GenerateToken(id int, name string) (string, error) {
	// 初始化
	iJwtCustomClaims := JwtCustomClaims{
		ID:   id,
		Name: name,
		RegisteredClaims: jwt.RegisteredClaims{
			// 设置过期时间 在当前基础上 添加一个小时后 过期
			ExpiresAt: jwt.NewNumericDate(time.Now().Add(viper.GetDuration("jwt.TokenExpire") * time.Minute)),
			// 颁发时间 也就是生成时间
			IssuedAt: jwt.NewNumericDate(time.Now()),
			//主题
			Subject: "Token",
		},
	}
	token := jwt.NewWithClaims(jwt.SigningMethodHS256, iJwtCustomClaims)
	return token.SignedString(stSignKey)
}

// ParseToken 解析token
func ParseToken(tokenStr string) (JwtCustomClaims, error) {
	iJwtCustomClaims := JwtCustomClaims{}
	//ParseWithClaims是NewParser().ParseWithClaims()的快捷方式
	token, err := jwt.ParseWithClaims(tokenStr, &iJwtCustomClaims, func(token *jwt.Token) (interface{}, error) {
		return stSignKey, nil
	})

	if err == nil && !token.Valid {
		err = errors.New("invalid Token")
	}
	return iJwtCustomClaims, err
}

func IsTokenValid(tokenStr string) bool {
	_, err := ParseToken(tokenStr)
	fmt.Println(err)
	if err != nil {
		return false
	}
	return true
}

dgrijalva/jwt-go

安装

go get -u "github.com/dgrijalva/jwt-go"

生成JWT

这里需要传入用户名和密码
然后根据SHA256 去进行加密 从而吧payload生成token

// 本文地址 https://www.cnblogs.com/zichliang/p/17303759.html
func Macke(user *Userinfo) (token string, err error) {
	claims := jwt.MapClaims{ //创建一个自己的声明
		"name": user.Username,
		"pwd":  user.Password,
		"iss":  "lva",
		"nbf":  time.Now().Unix(),
		"exp":  time.Now().Add(time.Second * 4).Unix(),
		"iat":  time.Now().Unix(),
	}

	then := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)

	token, err = then.SignedString([]byte("gettoken"))

	return
}

制定解析规则


在自己写的这个函数中 我们点进源码看返回值

解析方法使用此回调函数提供用于验证的键。函数接收已解析但未验证的令牌。
这允许您使用令牌Header中的属性(例如' kid ')来识别使用哪个键。

上述是源码的意思 而本人理解是制定一个类型规则然后去做解析。不然源码不知道你是制作token 还是解析token

func secret() jwt.Keyfunc {
	//按照这样的规则解析
	return func(t *jwt.Token) (interface{}, error) {
		return []byte("gettoken"), nil
	}
}

解析token

首先需要传入一个token,然后把解析规则传入
然后需要验证Token的正确性以及有效性。
如果二者都是没问题的
然后才能解析出 用户名和密码 或者是其他的一些值

// 解析token
func ParseToken(token string) (user *Userinfo, err error) {
	user = &Userinfo{}
	tokn, _ := jwt.Parse(token, secret())

	claim, ok := tokn.Claims.(jwt.MapClaims)
	if !ok {
		err = errors.New("解析错误")
		return
	}
	if !tokn.Valid {
		err = errors.New("令牌错误!")
		return
	}
	//fmt.Println(claim)
	user.Username = claim["name"].(string) //强行转换为string类型
	user.Password = claim["pwd"].(string)  //强行转换为string类型
	return
}

完整代码

完整代码
// 本文地址 https://www.cnblogs.com/zichliang/p/17303759.html
package main

import (
	"errors"
	"fmt"
	"github.com/dgrijalva/jwt-go"
	"time"
)

type Userinfo struct {
	Username string `json:"username"`
	Password string `json:"password"`
}

// Macke 生成jwt 需要传入 用户名和密码
func Macke(user *Userinfo) (token string, err error) {
	claims := jwt.MapClaims{ //创建一个自己的声明
		"name": user.Username,
		"pwd":  user.Password,
		"iss":  "lva",
		"nbf":  time.Now().Unix(),
		"exp":  time.Now().Add(time.Second * 4).Unix(),
		"iat":  time.Now().Unix(),
	}

	then := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)

	token, err = then.SignedString([]byte("gettoken"))

	return
}

// secret 自己解析的秘钥
func secret() jwt.Keyfunc {
	//按照这样的规则解析
	return func(t *jwt.Token) (interface{}, error) {
		return []byte("gettoken"), nil
	}
}

// 解析token
func ParseToken(token string) (user *Userinfo, err error) {
	user = &Userinfo{}
	tokn, _ := jwt.Parse(token, secret())

	claim, ok := tokn.Claims.(jwt.MapClaims)
	if !ok {
		err = errors.New("解析错误")
		return
	}
	if !tokn.Valid {
		err = errors.New("令牌错误!")
		return
	}
	//fmt.Println(claim)
	user.Username = claim["name"].(string) //强行转换为string类型
	user.Password = claim["pwd"].(string)  //强行转换为string类型
	return
}

func main() {
	var use = Userinfo{"zic", "admin*123"}
	tkn, _ := Macke(&use)
	fmt.Println("_____", tkn)
	// time.Sleep(time.Second * 8)超过时间打印令牌错误
	user, err := ParseToken(tkn)
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println(user.Username)
}

本文地址
https://www.cnblogs.com/zichliang/p/17303759.html

这里需要注意
用户请求时带上token,服务器解析token后可以获得其中的用户信息,如果token有任何改动,都无法通过验证.

大家好,我是小富~

前言

忙里偷闲学习了点技术写了点demo代码,打算提交到我那
2000Star
的Github仓库上,居然发现有5个
Issues
,最近的一条日期已经是2022/8/1了,以前我还真没留意过这些,我这人懒得很,本地代码提交成功基本就不管了。

仓库地址:
https://github.com/chengxy-nds/Springboot-Notebook

5个
Issues
中,有个标题"
优化本仓库大小的建议
"吸引我了,赶紧点进去看看啥情况,这个哥们说我这个仓库太大了,仓库700多M,半天也拉不下来,然后还给我附上了修改建议(不要太贴心哦)。

我不信

开始我还有点不信,这仓库提交的是我平时写文章中跑的代码demo,每个项目也就十来个文件几十KB,怎么会有700M这么多,不信邪的我赶紧看了下仓库大小
Setting->Repositoriess
,居然真有683M这么多。

于是我赶紧
Clone
仓库到本地,看看是不是我提交了什么大文件,下载过程确实挺折磨人,不仅慢还经常中断,在经过不知道多少次重试之后,终于下载下来了。

翻了翻各个项目发现里边的确有许多应该忽略的文件
.idea

.mvn

target
都被我提交了,直接删除文件,但并没有什么卵用,项目整体
size
变化不大,应该不是这些文件的原因。

我用
du
命令看了下各目录的文件大小,发现
./springboot-seata-transaction
目录45M,因为里边有一个jar直接删除就行了;而
.git
足足有688M,看来问题就出在这个目录,重点优化下。

这目录好大

那么
.git
目录是存放什么的?为什么会这么大呢?


Git
系统中,.git目录中存储了整个代码仓库的元数据信息(包括提交历史记录、分支、标签等)和文件对象。

我在该目录上用
du
命令看了一下,发现
objects
目录居然有683M,那么问题就是出在它身上了。

objects
目录之所以这么大,是因为它保存了仓库中的所有历史版本和数据对象,也就是
blobs
文件内容,
trees
目录结构,
commits
提交历史,它们是 Git 中的三个核心对象类型。

其中:

  • Blobs:
    每一个文件都被视为一个二进制对象(Blob 对象),它保存了文件的所有内容和一些元数据信息,如文件名、文件类型、文件大小等。当文件发生变化时,Git 会自动计算并存储新的 Blob 对象,并将其与先前的 Blob 对象做比较,以确定文件的变化情况。

  • Trees:
    目录也被视为一个对象(Tree 对象),它保存了目录中包含的所有文件和子目录的列表,每个列表项包括了文件或目录的名称、类型、权限等信息,以及对应文件或目录的 Blob 或 Tree 对象的 SHA 校验和。

  • Commits:
    我们每次提交代码,就会产生一次Commit, Commit对象保存了一次代码变更的相关信息,包括作者、提交时间、父提交记录的 SHA 值、描述信息等。每个 Commit 对象还包含指向对应代码快照的 Tree 对象的 SHA 校验和。

上边三种类型的对象组成了Git中的基本数据单元,通过这些对象的组合和链接,才构建出完整的提交历史,并跟踪代码库中每个文件的变化历史。这个目录是Git中最重要的目录之一,所以对它操作要谨慎,不然很容易丢失历史记录。

瘦身利器

尽管我们知道了大文件的位置,可具体该删除哪些文件无从下手啊,
objects
目录下都是些压缩文件,弄不好整个仓库都得报废。

好在提
Issues
的兄弟还给推荐了个Git仓库瘦身的工具,该说不说真的贴心啊。

这个Git仓库清理工具叫
BFG Repo-Cleaner
,可以帮助我们筛选、清理大文件对象,官方文档地址:
https://rtyley.github.io/bfg-repo-cleaner
,接下来看看这工具咋用。

注意
:在做以下操作之前,
一定要备份!!!
一定要备份!!!
一定要备份!!!

1、下载安装

BFG 是以java -jar的方式启动,首先安装不低于
JDK8+
的环境,并下载 BFG 的
Jar
包。

$ wget https://repo1.maven.org/maven2/com/madgag/bfg/1.14.0/bfg-1.14.0.jar

2、clone 仓库

在 clone 仓库的时候推荐使用
--mirror
参数,将源仓库的所有分支、标签以及提交历史都完整地克隆到本地,只会拉取
.git
目录的相关文件。

$ git clone --mirror git://example.com/Springboot-Notebook.git

3、查找大文件


bfg-1.14.0.jar

Springboot-Notebook.git
放在同一级目录下,执行下边的命令过滤出大于20M的文件对象。

# 查找出大于20M的需要清理的文件
$ java -jar bfg-1.14.0.jar --strip-blobs-bigger-than 20M Springboot-Notebook.git

看到过滤出了很多大文件,包含了
Issues
中兄弟提到的那个Jar包。

4、删除大文件

使用
--delete-files
命令逐一的将大文件删除,如果提示分支是被保护的可以加上
--no-blob-protection
命令执行。

# 查找出大于20M的需要清理的文件
$ java -jar bfg.jar --delete-files Cyrillic.traineddata  Springboot-Notebook.git 

5、GC 回收垃圾

上边我们虽然过滤、删除了大文件,此时仓库的大小仍然没有变化,进入Springboot-Notebook.git仓库内,执行
git reflog expire

git gc
命令,压缩 Git 库中的历史版本、清除废弃的、过期的对象,这个过程会耗费较长的时间。

# 进入目录
$ cd Springboot-Notebook.git

# 执行git gc 回收垃圾
$ git reflog expire --expire=now --all && git gc --prune=now --aggressive

# 推送
$ git push

最后推送代码到远程仓库,在看仓库的大小已经降到了108M,效果挺显著,剩下的108M应该就是我频繁提交导致的了,查了下我居然有127次提交之多。

每次提交都会生成一次快照,这些快照可能包含大量的文件和代码,频繁提交会导致版本库中快照的数量增加。Git使用的是一种增量存储方式,每次提交只存储发生了变化的文件和代码。

但是,如果频繁提交的变化较小,比如只是修改了几个字符或者行末空格,Git可能无法正确地检测出这些变化,而将整个文件都存储起来,也会增加了.git文件的大小。

彻底一点

后来我想了想这个仓库存在的意义,不就是让大家看着文章跑demo代码嘛,能
快速的下载并成功运行
才是他们在乎的,谁又会在意提交记录。那我干脆彻底一点,清理掉仓库的所有提交记录,一劳永逸。

那么操作起来就得小心一点了,毕竟2000star,别把仓库弄嘎了,既要清除一个分支上的所有提交记录,同时又不能删除这个分支本身,其实我们可以迂回一下。

我们首先使用
--orphan
命令创建一个全新的分支
new-branch
,这个新建的分支和其他分支没有任何关系,它不会包含任何先前的提交记录或者历史记录。相当于新建了一个干净的空分支,并让该分支指向一个全新的根节点。

git checkout --orphan <new-branch-name>

然后 commit 全部的项目文件到这个分支,暂不需要推到远程仓库。

git add -A
git commit -am "Initial commit"

接着删除旧的分支,并把新建的分支名改成旧分支名称,推到远程仓库就行了。

# 

git branch -D <old-branch-name>
git branch -m <old-branch-name>
git push -f origin <old-branch-name>

在推完代码后我再次去看了下仓库的大小,现在就只有6.33M了,如果不是人脸识别项目中有几个必要的SDK包,应该还能在小点。

总结

以我自己的亲身经历做为反面教材,提醒大家操作Git要谨慎,工作中严禁提交该忽略的文件;提交代码要控制节奏,不能随心所欲,尤其是团队协作开发;如果发现
.git
文件太大,推荐使用
Git LFS
来管理大文件,千万不要像我这么操作,毕竟随意删除提交历史记录,在哪个公司都是不合规的。

重点感谢下提出建议的大兄弟

1 网络同步机制

UE 提供了强大的网络同步机制:

  • RPC :可以在本地调用,对端执行
  • 属性同步:标记一个属性为
    UPROPERTY(Replicated)
    就可以自动将其修改后的值同步到客户端
  • 移动复制:Actor 开启了移动复制后会自动复制位置,旋转和速度
  • 创建和销毁:Server 创建 Actor 时根据其
    权限
    会在所有连接客户端生成远程代理
    UE 基本上都是基于 Actor 进行同步的。
    Actor 同步的前提需要标记 Actor 为 bReplicated
    。首先来了解下如何应用 UE 中的属性同步。

2 Actor 同步

2.1 如何同步一个 Actor

首先思考一下,如何创建一个 Actor 然后让他同步到各个客户端?

  • 在哪里创建?
    创建 Actor 的操作显然需要在服务端执行
    ,如果在客户端执行,这个 Actor 只会在这个客户端可见。

image.png|625

  • 如何让 Actor 同步? 标记 Actor 的
    bReplicated
    为 True。

2.2 如何同步 Actor 的属性

创建并同步完 Actor 之后,下一步是能够支持 Actor 的数据能够正常同步到客户端,首先在应用层如何支持这一操作?
假设我们有一把武器,需要同步武器的弹药数量,那么需要进行如下定义

/** weapon.h **/
class AWeapon : public  {
	UPROPERTY(replicatedUsing=OnRep_Ammo) // 可选属性,当 Ammo 成功同步后会调用该函数
	int32 Ammo; // 弹药数量
	UFUNCTION()  
	virtual void OnRep_Ammo();
	virtual void GetLifetimeReplicatedProps(TArray< FLifetimeProperty > & OutLifetimeProps) const override; // 属性复制条件控制
}
/** weapon.cpp **/
void AWeapon::GetLifetimeReplicatedProps(TArray< FLifetimeProperty > & OutLifetimeProps) const {
   Super::GetLifetimeReplicatedProps(OutLifetimeProps);  
   DOREPLIFETIME(AWeapon, Ammo); // 具体的复制属性
}

上述定义主要有如下特点:

  • Actor 支持同步时,如果有自定义需要同步的属性,
    需要重写 GetLifetimeReplicatedProps 函数,并在其中标注要复制的具体属性
  • 同步属性时,可以通过
    URPOPERTY
    宏中
    replicatedUsing
    属性来指定同步后要执行的回调函数

2.3 Actor 同步流程

现在我们需要考虑一下,Actor 的属性在什么情况下会被复制?通常来说我们只需要在 Actor 属性被修改时就需要同步到客户端,但是什么时候会被修改我们并不知道,因此引擎中会根据
Actor 复制频率
来做同步检查。参考后文中 4.4 优先级和复制频率 的内容。我们可以梳理出如下流程:

image.png|350

基本上每帧都需要检查有哪些 Actor 需要同步,显然这种检查也是比较耗时的,由此 UE 也引入了 PushModel 技术,手动标记 Actor 哪些属性已修改需要更新,从而节约检查属性的消耗。

2.4 小结

如何创建,同步一个 Actor 的应用层流程基本梳理完毕,但是显然需要知道其后面的原理,由此引出如下问题在后续的文章中解决:





Actor 同步只能从 Server 同步到 Client,Client 唯一向 Server 发送请求的方式只有 RPC,
属性同步是单向的

3 RPC 使用分析

3.1 什么是 RPC

RPC(Remote Procedure Call,远程过程调用)是一种用于实现分布式应用程序的技术。通过 RPC,可以使分布式应用程序中的各个部分像本地代码一样交互,即使它们不在同一台计算机或在不同的网络上。
在 RPC 中,一个应用程序可以调用另一个应用程序中的函数或方法,就像调用本地函数一样。这些函数和方法在不同的进程或计算机上执行,但对调用方来说,它们是透明的。调用方不需要了解远程代码的具体实现细节,只需要知道如何调用它们并处理返回值。

RPC 的使用有一些前提准则,必须满足这些条件才能调用

  1. 它们必须从 Actor 上调用。
  2. Actor 必须被复制。
  3. 如果 RPC 是从服务器调用并在客户端上执行,则只有实际拥有这个 Actor 的客户端才会执行函数。
  4. 如果 RPC 是从客户端调用并在服务器上执行,客户端就必须拥有调用 RPC 的 Actor。
  5. 多播 RPC 则是个例外:
  • 如果它们是从服务器调用,服务器将在本地和所有已连接的客户端上执行它们。
  • 如果它们是从客户端调用,则只在本地而非服务器上执行。
  • 现在,我们有了一个简单的多播事件限制机制:在特定 Actor 的网络更新期内,多播函数将不会复制两次以上。按长期计划,我们会对此进行改善,同时更好的支持跨通道流量管理与限制。

3.2 RPC 的种类

UE 中有 3 种 RPC :

  • Server : 仅在 Server 上调用
  • Client :仅在 Client 上调用
  • NetMulticast :在与服务器连接的所有客户端及服务器本身上调用
    这三种 RPC 只需要在函数调用的声明中加上对应的标记即可。

3.2.1 如何确定 RPC 在哪里被执行

当 RPC 函数在
服务器上调用
时,有如下情况:


Actor 所有权 未复制 NetMulticast Server Client
Client Owned Actor 在服务器上运行 在服务器和所有客户端上运行 在服务器上运行 在 actor 的所属客户端上运行
Server Owned Actor 在服务器上运行 在服务器和所有客户端上运行 在服务器上运行 在服务器上运行
Unowned Actor 在服务器上运行 在服务器和所有客户端上运行 在服务器上运行 在服务器上运行

当 RPC 函数在
客户端上调用
时,如下:


Actor 所有权 未复制 NetMulticast Server Client
Owned By Invoking Client 在执行调用的客户端上运行 在执行调用的客户端上运行 在服务器上运行 在执行调用的客户端上运行
Owned By a different client 在执行调用的客户端上运行 在执行调用的客户端上运行 丢弃 在执行调用的客户端上运行
Server Owned Actor 在执行调用的客户端上运行 在执行调用的客户端上运行 丢弃 在执行调用的客户端上运行
Unowned Actor 在执行调用的客户端上运行 在执行调用的客户端上运行 丢弃 在执行调用的客户端上运行


事实上最终判断 RPC 在哪里被执行,主要根据如下三个条件:

  1. 调用端是谁(Client/Server)
  2. 调用的 Actor 属于哪个连接
  3. RPC 的类型(Server/Client/NetMulticast)

举一个例子,有两个客户端 c1 和 c2 各自有 Pawn p1 和 p2,c1 的客户端上能够获取到 p2 这个对象,但是无法利用 p2 调用 RPC,因为在 c1 上 p2 只是一个普通的 Pawn,其没有对应的 c2 的 PlayerController(参考 [[总体框架#3. PlayerController|PlayerController 定义]])。也没有对应的 Connection,因此无法执行 RPC。

  1. 实际上是否会调用到对端,主要根据
    UObject::GetFunctionCallspace
    这个接口返回的枚举来判定的。
  2. 其次根据 Actor 所属的 Connection,如果 Actor 不属于任何一个 Connection(Owner 递归查找找不到 PlayerController),那么也是无法调用 RPC 的。

3.3 RPC 的使用

UE 中,一个 RPC 函数的声明和定义如下(以 Client 调用 Server 执行的 RPC 为例):

/** weapon.h **/
class AWeapon : public  {
	UFUNCTION(Server)
	void Fire();
}

/** weapon.cpp **/
void AWeapon::Fire_Implementation() {
	/** do weapon fire **/ 
}

此时只需要在 Client 端使用如下操作:

AWeapon* Weapon = GetWeapon();
Weapon->Fire();

就能直接调用 Server 端的 Fire 接口了。关于其背后实现的原理,可以参考 [[原理#4. QA#4.5 RPC 函数如何执行的|RPC函数执行原理]]。
这里需要注意一点,UE 的 RPC 是没有返回值的,统一都是 void。个人如果需要获取返回值,那么就需要一个类似协程的概念,来获取返回值,否则只能阻塞等待或者异步等待,后者显然代码可读性也不是很好。

3.4 小结

RPC 与属性同步有些不同,RPC 可以 Server To Client 也可以 Client To Server,是一种双向的通信方式,而属性同步只能 Server To Client,属于单向同步。对于 RPC 的实现,有如下问题可以再进行深究:




4 Actor 同步概念

4.1 NetRole

每个 Actor 都有一个 LocalRole 和 RemoteRole 的概念,分别对应于 Actor 在本地和在对端的 Role,Role 主要分为 3 种:

  • ROLE_SimulatedProxy
  • ROLE_AutonomousProxy
  • ROLE_Authority
    通常
    LocalRole=Authority
    只存在于服务器(但是客户端也有可能存在,比如 Spawn 一个 Actor 但是不标记为 Replicated)。关于各种 Role 常见的设置可以参考下图:
    image.png|850

4.1.1 AutonomousProxy 和 SimulatedProxy 的区别

  • AutonomousProxy 和 SimulatedProxy 基本只存在于客户端,
    ROLE_AutonomousProxy 用于处理本地玩家的输入
    ,并将这些输入发送到服务器进行处理,而
    ROLE_SimulatedProxy 用于处理其他玩家的输入
    ,并在客户端上模拟 Actor 在服务器上的运行。因此通常 AutonomousProxy 只存在于 PlayerController 和其 Possess 的 Pawn。
  • SimulatedProxy 是标准的模拟途径,通常是根据上次获得的速率对移动进行推算。当服务器为特定的 actor 发送更新时,客户端将向着新的方位调整其位置,然后利用更新的间歇,根据由服务器发送的最近的速率值来继续移动 actor。
  • AutonomousProxy 通常只用于 PlayerController 所拥有的 actor。这说明此 actor 会接收来自真人控制者的输入,所以在我们进行推算时,我们会有更多一些的信息,而且能使用真人输入内容来补足缺失的信息(而不是根据上次获得的速率来进行推算)。

4.1.2 小结

那么这个 Role 有什么用呢?个人认为有如下用处:

  • 在 C/S 模式下,基本可以认为 LocalRole 为 Authority 的 Actor 当前就是处于服务器环境下,用来区分服务器还是客户端
  • 引擎对于 AutonomousProxy 和 SimulatedProxy 做了区分,用来更好的模拟玩家输入

就目前而言,只有服务器能够向已连接的客户端同步 Actor (客户端永远都不能向服务器同步)。始终记住这一点,
只有
服务器才能看到
Role == ROLE_Authority

RemoteRole == ROLE_SimulatedProxy
或者
ROLE_AutonomousProxy

4.2 关联连接

UE 中
Actor

关联连接
的概念,即这个
Actor
属于哪个连接。在传统的 C/S 服务器中,每个客户端和服务器会有一条连接,在 UE 中会为每个连接创建一个
PlayerController
,这样这个
PlayerController
就归这条连接所有。
而如果一个
Actor
的 Owner 为
PlayerController
或者为
Pawn
并且这个
Pawn
拥有一个
PlayerController
,那么这个
Actor
就归属于拥有这个
PlayerController
的连接。
这里的关联连接有什么用呢?
考虑如下三种情况:

  • 需要确定哪个客户端将执行运行于客户端的 RPC
  • Actor 复制与连接相关性(比如 bOnlyRelevantToOwner 为 True 的 Actor,只有拥有这个 Actor 的 Connection 才会收到这个 Actor 的属性更新,比如 PlayerController)
  • 涉及 Owner 的 Actor 属性复制条件(比如 COND_OnlyOwner 只能复制给 Owner)

连接所有权

4.3 相关性

相关性是用于判断 Actor 是否需要进行同步的重要依据。其主要判断相关性的接口为
AActor::IsNetRelevantFor
。个人认为相关性最重要的一点是可以
有效的节约带宽和同步操作所带来的 CPU 消耗

比如场景的规模可能比较大,玩家特定时刻只能看到关卡中部分 Actor。被服务器认为可见或者能够影响客户端的 Actor 组会被是为该客户端的相关 Actor 组,服务器只会让客户端知道其相关组内的 Actor。

  1. 如果 Actor 是
    bAlwaysRelevant
    、归属于 Pawn 或 PlayerController、本身为 Pawn 或者 Pawn 是某些行为(如噪音或伤害)的发起者,则其具有相关性。
  2. 如果 Actor 是
    bNetUseOwnerRelevancy
    且拥有一个所有者,则使用所有者的相关性。
  3. 如果 Actor 是
    bOnlyRelevantToOwner
    且没有通过第一轮检查,则不具有相关性。
  4. 如果 Actor 被附加到另一个 Actor 的骨架模型,它的相关性将取决于其所在基础的相关性。
  5. 如果 Actor 是不可见的 (
    bHidden == true
    ) 并且它的 Root Component 并没有碰撞,那么则不具有相关性,
    • 如果没有 Root Component 的话,
      AActor::IsNetRelevantFor()
      会记录一条警告,提示是否要将它设置为
      bAlwaysRelevant=true
  6. 如果
    AGameNetworkManager
    被设置为使用基于距离的相关性,则只要 Actor 低于净剔除距离,即被视为具有相关性。

Pawn 和 PlayerController 将覆盖
AActor::IsNetRelevantFor()
并最终具有不同的相关性条件。

4.4 优先级和复制频率

4.4.1 优先级

每个 Actor 都有一个名为
NetPriority
的浮点变量。这个变量的数值越大,Actor 相对于其他"同伴"的带宽就越多。和优先级为 1.0 的 Actor 相比,优先级是 2.0 的 Actor 可以得到两倍的更新频度。唯一影响优先顺序的就是它们的比值。
计算 Actor 的当前优先级时使用了函数
AActor::GetNetPriority
。为避免出现饥荒(starvation),
AActor::GetNetPriority
使用 Actor 上次复制后经过的时间去乘以 NetPriority。同时,
GetNetPriority
函数还考虑了 Actor 与观察者的相对位置以及两者之间的距离。

4.4.2 复制频率

Actor 不是每一帧都进行复制的,每个 Actor 有个自己的每秒复制频率 NetUpdateFrequency,每次检查 Tick 的 DeltaTime > 1/NetUpdateFrequency,满足条件才可以进行下一步复制检查。
比如默认 PlayerState 每秒更新 1 次,而 Pawn 每秒更新 100 次(默认情况下服务器 30 fps 运行,基本上每帧都会做复制检查)。

定义

发布订阅模式是基于一个事件(主题)通道,希望接收通知的对象
Subscriber
(订阅者)
通过自定义事件订阅主题,被激活事件的对象
Publisher
(发布者)

通过发布主题事件的方式通知订阅者
Subscriber
(订阅者)

对象。

简单说就是
发布者与订阅者通过事件来通信
,这里的发布者是之前观察者模式中的被观察者,订阅者是观察者模式中的观察者,他们角色定位是等价的,只不过是不同的叫法。

发布订阅与观察者模式

平时我们在微博中关注某个大v,这个大v 并不关心我这个订阅者具备什么特征,我只是通过微博这个平台关注了他,他也只是把他要分享的话题通过微博发出来,我和他之间并不存在直接的联系,然后我自动就能看到这个大v发布的消息,这就是发布订阅模式。

发布订阅者模式与观察者模式类似,但是两者并不完全相同,发布订阅者模式与观察者相比多了一个中间层
事件调度中心
,用来对发布者发布的信息进行处理,再通知到各个特定的订阅者,大致过程如下图所示

发布者只是发布某事件,无需知道哪些订阅者,订阅者只需要订阅自己感兴趣的事件,无需关注发布者。

发布者完全不用感知订阅者,不用关心它怎么实现回调方法,
事件的注册和触发都发生在独立于双方的第三方平台(调度中心)上,发布-订阅模式下,实现了完全地解耦。

通过之前对观察者模式的实现,我们的
Subject

类中是持有
observer
对象的,因此并没有实现两个类的完全解耦。通过添加中间层的调度中心类,我么可以将订阅者和发布者完全解耦,两者不再有直接的关联,而是通过调度中心关联起来。下面我们实现一个发布订阅者模式。

传统写法模拟发布订阅模式

按照上面思路,我们需要写下如下三个类,然后事件中心对象是发布者、订阅者之间的桥梁,我们很快写下如下代码:

  1. 发布者 ---- 观察者模式中的【被观察者】
  2. 订阅者 ---- 观察者模式中的【订阅者】
  3. 事件中心 ---- 类似公共的一个平台
/*
发布者:发布、注册xxx事件 or 主题
订阅者:自己的行为,取消订阅,订阅
事件中心:注册发布者的某事件、取消注册发布者的某事件、注册订阅者、取消订阅者、发布事件(通知订阅者)
*/
// 发布者
class Pulisher {
  constructor (name, evtCenter) {
    this.name = name;
    this.evtCenter = evtCenter;
  }
  // 向事件调度中心-注册某事件
  register (evtName) {
    this.evtCenter.registerEvt(evtName)
  }
  unregister (evtName) {
    this.evtCenter.unRegisterEvt(evtName)
  }
  // 向事件调度中心-发布某事件
  publish (evtName, ...params) {
    this.evtCenter.publish(evtName, ...params)
  }
}

// 订阅者
class Subscriber {
  constructor (name,evtCenter) {
    this.name = name;
    this.evtCenter = evtCenter;
  }
  //订阅
  subscribe(evtName) {
    this.evtCenter.addSubscribe(evtName, this);
  }
  //取消订阅
  unSubscribe(evtName) {
    this.evtCenter.unAddSubscribe(evtName, this);
  }
  //接收
  update(params) {
    console.log(`我接收到了,${params}`);
  }
}

// 事件调度中心
class EvtCenter {
  constructor (name) {
    this.name = name;
    this.evtHandle = {}
  }
  // 注册发布者要发布的事件
  registerEvt (evtName) {
    if (!this.evtHandle[evtName]) {
      this.evtHandle[evtName] = []
    }
  }
  // 取消注册发布者要发布的事件
  unRegisterEvt (evtName) {
    delete this.evtHandle[evtName];
  }
  // 增加订阅者-注册观察者
  addSubscribe(evtName, sub) {
    if (this.evtHandle[evtName]) {
      this.evtHandle[evtName].push(sub);
    }
  }
   // 取消订阅者-移除注册观察者
   unAddSubscribe(evtName, sub) {
    this.evtHandle[evtName].forEach((item, index) => {
      if (item === sub) {
        this.evtHandle[evtName].splice(index, 1);
      }
    });
  }

  // 事件调度中心-发布某事件
  publish (evtName, ...params) {
    this.evtHandle[evtName] && this.evtHandle[evtName].forEach((item) => {
      item.update(...params);
    });
  }
}

// 测试
const evtCenter1 = new EvtCenter('报社调度中心1')

const pulisher1 = new Pulisher('报社1', evtCenter1)

const sub1 = new Subscriber('我是sub1, 我对日报感兴趣', evtCenter1)
const sub2 = new Subscriber('我是sub2, 我对日报感兴趣', evtCenter1)
const sub3 = new Subscriber('我是sub3, 我对中报感兴趣', evtCenter1)
const sub4 = new Subscriber('我是sub4, 我对晚报感兴趣', evtCenter1)

// 发布者-注册三个事件到事件中心
pulisher1.register('广州日报')
pulisher1.register('广州中报')
pulisher1.register('广州晚报')

// 订阅者可以自己订阅,当然也可以直接操作事件中心
sub1.subscribe('广州日报')
sub2.subscribe('广州日报')
sub3.subscribe('广州中报')
sub4.subscribe('广州晚报')

// 现在开始发布事件
pulisher1.publish('广州日报', '广州日报')
pulisher1.publish('广州中报', '广州中报')
pulisher1.publish('广州晚报', '广州晚报')

pulisher1.unregister('广州日报')

// 再一次发布事件
console.log('再一次发布事件,这次我取消了日报') // 没有输出广州日报
pulisher1.publish('广州日报', '广州日报')
pulisher1.publish('广州中报', '广州中报')
pulisher1.publish('广州晚报', '广州晚报')

简单写法--面向事件调度中心编程

在js中函数是第一等公民,天生适合回调函数,所以可以直接面向事件调度中心编码即可。我们要做的事情其实就是触发什么事件,执行什么动作。

// 事件调度中心
class PubSub  {
  constructor () {
    this.evtHandles = {}
  }
  // 订阅
  subscribe (evtName, callback) {
    if (!this.evtHandles[evtName]) {
      this.evtHandles[evtName] = [callback];
    }
    this.evtHandles[evtName].push(callback);
  }
  // 发布
  publish(evtName, ...arg) {
    if (this.evtHandles[evtName]) {
      for(let fn of this.evtHandles[evtName]) {
        fn.call(this, ...arg);
      }
    }
  }
  unSubscribe (evtName, fn) {     // 取消订阅
    let fnList = this.evtHandles[evtName];
    if (!fnList) return false;

    if (!fn) {
      // 不传入指定取消的订阅方法,则清空所有key下的订阅
      this.evtHandles[evtName] = []
    } else {
      fnList.forEach((item, index) => {
        if (item === fn) {
          fnList.splice(index, 1);
        }
      })
    }
  }
}
// 先订阅在发布
const pub1 = new PubSub()

// 订阅三个事件
pub1.subscribe('onWork', time => {
  console.log(`上班了:${time}`);
})
pub1.subscribe('onWork', time => {
  console.log(`上班了:${time},开始打开待办事项`);
})

pub1.subscribe('onOffWork', time => {
  console.log(`下班了:${time}`);
})

pub1.subscribe('onLaunch', time => {
  console.log(`吃饭了:${time}`);
})

// 发布对应的事件
pub1.publish('onWork', '09:00:00');
pub1.publish('onLaunch', '12:00:00');
pub1.publish('onOffWork', '18:00:00');
// 取消onWork 事件
pub1.unSubscribe('onWork');


// 取消订阅
pub1.unSubscribe('onWork');
console.log(`取消 onWork`);
pub1.publish('onWork', '09:00:00'); // 不会执行

小结

  1. 发布者不直接触及到订阅者、而是由统一的第三方来完成实际的通信的操作,叫做
    发布订阅模式

  2. 发布者
    (被观察者)
    直接操作订阅者的操作,叫做
    观察者模式
  3. 发布订阅模式,发布者完全不用感知订阅者,不用关心它怎么实现回调方法,事件的注册和触发都发生在独立于双方的第三方平台(事件调度中心)上,发布-订阅模式下,实现了完全地解耦。
  4. 发布订阅核心通过事件来通信,在调度中心中派发给具体的订阅者。