2023年4月

虽然doccano的自动标注
使用默认的UIE模型
可以识别出一定的实体,但是在特定领域或者因为实体类别名不能被理解很多实体是识别不了的,所以我们可以通过自己标注的数据对模型进行微调来满足我们Auto Labeing的需求。

预处理doccano标注的数据

该章节详细说明如何通过
doccano.py
脚本对doccano平台导出的标注数据进行转换,一键生成训练/验证/测试集。

在本地部署UIE模型

下载模型压缩包:

本来是要通过一定的方法(GitZip)才能在整体的大项目仓库中下载的,不详细讲,这里直接给压缩包。

PaddleNLP-model_zoo.zip - 蓝奏云 (lanzoub.com)

抽取式任务数据转换

  • 当标注完成后,在 doccano 平台上导出
    JSONL(relation)
    形式的文件,并将其重命名为
    doccano_ext.json
    后,放入
    ./data
    目录下。
  • 通过
    doccano.py
    脚本进行数据形式转换,然后便可以开始进行相应模型训练。
python doccano.py \
    --doccano_file ./data/doccano_ext.json \
    --task_type "ext" \
    --save_dir ./data \
    --negative_ratio 5

当然也可以将这个命令保存为
tran.sh
文件

训练UIE模型

  • 使用标注数据进行小样本训练,模型参数保存在
    ./checkpoint/
    目录。

tips:
推荐使用GPU环境,否则可能会内存溢出。CPU环境下,可以修改model为
uie-tiny
,适当调下batch_size。

增加准确率的话:--num_epochs 设置大点多训练训练

可配置参数说明:

  • model_name_or_path
    :必须,进行 few shot 训练使用的预训练模型。可选择的有 "uie-base"、 "uie-medium", "uie-mini", "uie-micro", "uie-nano", "uie-m-base", "uie-m-large"。
  • multilingual
    :是否是跨语言模型,用 "uie-m-base", "uie-m-large" 等模型进微调得到的模型也是多语言模型,需要设置为 True;默认为 False。
  • output_dir
    :必须,模型训练或压缩后保存的模型目录;默认为
    None
  • device
    : 训练设备,可选择 'cpu'、'gpu' 、'npu'其中的一种;默认为 GPU 训练。
  • per_device_train_batch_size
    :训练集训练过程批处理大小,请结合显存情况进行调整,若出现显存不足,请适当调低这一参数;默认为 32。
  • per_device_eval_batch_size
    :开发集评测过程批处理大小,请结合显存情况进行调整,若出现显存不足,请适当调低这一参数;默认为 32。
  • learning_rate
    :训练最大学习率,UIE 推荐设置为 1e-5;默认值为3e-5。
  • num_train_epochs
    : 训练轮次,使用早停法时可以选择 100;默认为10。
  • logging_steps
    : 训练过程中日志打印的间隔 steps 数,默认100。
  • save_steps
    : 训练过程中保存模型 checkpoint 的间隔 steps 数,默认100。
  • seed
    :全局随机种子,默认为 42。
  • weight_decay
    :除了所有 bias 和 LayerNorm 权重之外,应用于所有层的权重衰减数值。可选;默认为 0.0;
  • do_train
    :是否进行微调训练,设置该参数表示进行微调训练,默认不设置。
  • do_eval
    :是否进行评估,设置该参数表示进行评估。

该示例代码中由于设置了参数
--do_eval
,因此在训练完会自动进行评估。

微调命令

export finetuned_model=./checkpoint/model_best

python finetune.py  \
    --device gpu \
    --logging_steps 10 \
    --save_steps 100 \
    --eval_steps 100 \
    --seed 42 \
    --model_name_or_path uie-base \
    --output_dir $finetuned_model \
    --train_path ./data/train.txt \
    --dev_path ./data/dev.txt  \
    --max_seq_length 512  \
    --per_device_eval_batch_size 16 \
    --per_device_train_batch_size  16 \
    --num_train_epochs 20 \
    --learning_rate 1e-5 \
    --label_names "start_positions" "end_positions" \
    --do_train \
    --do_eval \
    --do_export \
    --export_model_dir $finetuned_model \
    --overwrite_output_dir \
    --disable_tqdm True \
    --metric_for_best_model eval_f1 \
    --load_best_model_at_end  True \
    --save_total_limit 1

训练完成的结果:

验证UIE模型效果

通过运行以下命令进行
模型评估

python evaluate.py \
    --model_path ./checkpoint/model_best \
    --test_path ./data/dev.txt \
    --batch_size 16 \
    --max_seq_len 512

根据我们手动标注的数据训练微调后,再次测试自有标注的领域数据,返回的准确率为88%.

部署微调后的UIE模型

本地终端定制模型一键预测

paddlenlp.Taskflow
装载定制模型,通过
task_path
指定模型权重文件的路径,路径下需要包含训练好的模型权重文件
model_state.pdparams

from pprint import pprint
from paddlenlp import Taskflow
schema = ['出发地', '目的地', '费用', '时间']#根据自身实体类别修改
# 设定抽取目标和定制化模型权重路径
my_ie = Taskflow("information_extraction", schema=schema, task_path='./checkpoint/model_best')
pprint(my_ie("城市内交通费7月5日金额114广州至佛山"))

经过测试,
原本无法预测出来的类型在引入自己标注的模型之后就可以识别出来了
。自此我们可以实现标注的数据用于训练,训练的模型又可以提升我们标注的速率。

模型快速服务化部署

在UIE的服务化能力中我们提供基于PaddleNLP SimpleServing 来搭建服务化能力,通过几行代码即可搭建服务化部署能力。

在上一篇文章
如何使用doccano+flask+花生壳+服务器实现命名实体识别ner自动标注 - 孤飞 - 博客园 (cnblogs.com)
中的部署代码里,我们修改
task_path
模型路径为刚刚微调过后的即可完成识别。

from flask import Flask, request, jsonify
from paddlenlp import Taskflow


app = Flask(__name__)

# 在这里定义你想要识别的实体类型
# UIE具有zero-shot能力,所以类型可以随便定义,但是识别的好坏不一定
schema = ['出发地', '目的地', '费用', '时间']#根据自身实体类别修改

# 第一运行时,联网状态下会自动下载模型
# device_id为gpu id,如果写-1则使用cpu,如果写0则使用gpu
ie = Taskflow('information_extraction', schema=schema, device_id=0,task_path='./uie/checkpoint/model_best/')#添加了tesk_path指向新的模型


def convert(result):
    result = result[0]
    formatted_result = []
    for label, ents in result.items():
        for ent in ents:
            formatted_result.append(
                {
                    "label": label,
                    "start_offset": ent['start'],
                    "end_offset": ent['end']
                })

    return formatted_result


@app.route('/', methods=['POST'])
def get_result():
    text = request.json['text']
    print(text)
    result = ie(text)
    formatted_result = convert(result)

    return jsonify(formatted_result)


if __name__ == '__main__':
	# 这里写端口的时候一定要注意不要与已有的端口冲突
	# 这里的host并不是说访问的时候一定要写0.0.0.0,但是这里代码要写0.0.0.0,代表可以被本网络中所有的看到
	# 如果是其他机器访问你创建的服务,访问的时候要写你的ip
    app.run(host='0.0.0.0', port=88)

参考文章

PaddleNLP/doccano 数据转换 GitHub

PaddleNLP/README.md 模型微调 GitHub

信息抽取UIE(二)--小样本快速提升性能(含doccona标注-阿里云开发者社区 (aliyun.com)

解决报错:cannot import name 'strtobool' from 'paddlenlp.trainer.argparser'

Question 使用uie,标好数据再用doccano.py脚本的时候发生错误:cannot import name 'strtobool' from 'paddlenlp.trainer.argparser' · Issue #5257 · PaddlePaddle/PaddleNLP (github.com)

一、方法

  1. 什么是方法:

方法(method)是程序中最小的执行单元

  1. 实际开发中,什么时候用到方法:

重复的代码、具有独立功能的代码可以抽取到方法中

  1. 实际开发中,方法有什么好处:

可以提高代码的复用性

可以提高代码的可维护性

方法的定义格式:

public static 返回值类型 方法名(参数){
			方法体;
			return 返回值;
		
}

二、方法的调用:

方法名();

注意:先定义后调用

package com.itheima.Method;
//简单的打印出朋友的所有信息
public class Demo02 {
    public static void printGFInfo(){
        System.out.println("张三");
        System.out.println("男");
        System.out.println("18岁");
    }

    public static void main(String[] args) {
        printGFInfo();
    }
}
package com.itheima.Method;
//需求:在方法里面定义两个变量并求和打印
public class Demo03 {
    public static void sum(){
        int a = 10;
        int b = 20;
        int sum = a+b;
        System.out.println(sum);
    }

    public static void main(String[] args) {
        sum();
    }
}

当我们要计算不明确的数的时候,就可以用带参数的方法进行计算

带参数的方法定义:

单个参数格式:

public static void  方法名(参数){

}

eg:

public static void  method(int number){

}

多个参数格式:

public static void  方法名(参数1,参数2,....){

}
public static void getsum(int number1,int number2){

}

带参数方法的调用

单个参数格式:

方法名(参数);

eg:

method(10);
method(变量);

多个参数格式:

方法名(参数1,参数2,...);

eg:

getSum(10,20);
getSum(变量1,变量2);

注意:

方法调用时,参数的数量与类型必须与方法定义中小括号里面的变量一一对应,否则程序将报错

eg:

package com.itheima.Method;
//掌握带参数方法定义的格式和调用的格式
public class Demo04 {
    public static void getSum(int num1,int num2) {
        int sum = num1+num2;
        System.out.println(sum);
    }

    public static void main(String[] args) {
        getSum(12,20);
    }
}

运行结果:
32

三、形参和实参

形参:全称形式参数,是指方法定义中的参数

实参:全称实际参数,方法调用中的参数

注意:

方法调用时,行参和实参必须一一对应,否则程序将报错

方法定义的小技巧:

  1. 我要干什么?

  2. 我干这件事情需要什么才能完成?

  3. 需不需要最后得出结果,如果需要,要有返回值;不需要的话,直接定义一个空类型方法

    eg1:

package com.itheima.Method;
//需求:定义一个方法,求长方形的周长,将结果在方法中进行打印
public class Demo05 {
    public static void getLength(int length,int width){
        int c = 2*(length+width);
        System.out.println("长方形的周长="+c);
    }
    public static void main(String[] args) {
        getLength(10,20);
    }
}

运行结果:

60

eg2:

package com.itheima.Method;
//需求:定义一个方法,求圆的面积,将结果在方法中进行打印
public class Demo06 {
    public static void getArea(double r) {
        double Area=3.14*r*r;
        System.out.println(Area);
    }
    public static void main(String[] args) {
        getArea(1.5);
    }
}

运行结果:

7.0649999999999995

Uniswap V2 — 从代码解释 DeFi 协议

为了理解我们在分析代码时将要经历的不同组件,首先了解哪些是主要概念
以及
它们
的作用
是很重要的。所以,和我一起裸露吧,因为这是值得的。

我在 5 个段落中总结了您需要了解的主要重要概念,您将在本文结束时理解这些概念。

Uniswap 是一种
去中心化交易
协议
。该协议是一套持久的、不可升级的智能合约,它们共同创建了一个自动化的做市商。

Uniswap 生态系统

贡献流动性的流动性提供者、交换代币的交易员和与智能合约交互以开发代币新交互的开发人员组成。

每个 Uniswap
智能合约或对管理一个
由两个 ERC-20 代币储备组成的流动资金池。

每个流动性池重新平衡以保持 50/50 比例的加密货币资产,这反过来又决定了资产的价格。

流动性提供者
可以是任何能够向 Uniswap 交易合约提供等值的 ETH 和 ERC-20 代币的人。作为回报,他们从交易合约中获得
流动性提供者代币
(LP 代币代表流动性提供者拥有的池的份额),可用于随时提取其在流动性池中的比例。

他们存储库中的主要智能合约是:

  • UniswapV2ERC20
    — 用于 LP 令牌的扩展 ERC20 实现。它还实施了 EIP-2612 以支持链下传输批准。
  • UniswapV2Factory
    — 与 V1 类似,这是一个工厂合约,它创建配对合约并充当它们的注册表。注册表使用 create2 来生成对地址——我们将详细了解它是如何工作的。
  • UniswapV2Pair
    — 负责核心逻辑的主合约。值得注意的是,工厂只允许创建独特的货币对,以免稀释流动性。
  • UniswapV2Router
    — Uniswap UI 和其他在 Uniswap 之上工作的网络和去中心化应用程序的主要入口点。
  • UniswapV2Library
    — 一组实现重要计算的辅助函数。

在这篇文章中,我们将提及所有这些,但我们将主要关注浏览
UniswapV2Router

UniswapV2Factory
编码,尽管
UniswapV2Pair
并且
UniswapV2Library
会涉及很多。

UniswapV2Router02.sol

该合约使创建货币对、添加和删除流动性、计算所有可能的掉期变化的价格以及执行实际掉期变得更加容易。路由器适用于通过工厂合约部署的所有对

您需要在合约中创建一个实例才能调用 addLiquidity、removeLiquidity 和 swapExactTokensForTokens 函数

address private constant ROUTER = 0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D;ROUTER = 0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D;

IUniswapV2Router02 public uniswapV2Router;
uniswapV2Router = IUniswapV2Router02(ROUTER);

现在让我们看看流动性管理:

函数
addLiquidity
():

function addLiquidity(
    address tokenA,
    address tokenB,
    uint amountADesired,
    uint amountBDesired,
    uint amountAMin,
    uint amountBMin,
    address to,
    uint deadline
) external returns (uint amountA, uint amountB, uint liquidity);
  • tokenA

    tokenB
    :是我们需要获取或创建我们想要增加流动性的货币对的代币。
  • amountADesired

    amountBDesired
    是我们要存入流动资金池的金额。
  • amountAMin

    amountBMin
    是我们要存入的最小金额。
  • to
    address 是接收 LP 代币的地址。
  • 截止日期
    ,最常见的是
    block.timestamp

在内部 _addLiquidity() 中,它将
检查这两个令牌中的一对是否已经存在
,如果不存在,它将创建一个新令牌

if (IUniswapV2Factory(factory).getPair(tokenA, tokenB) == address(0)) {
    IUniswapV2Factory(factory).createPair(tokenA, tokenB);
}

然后它需要获取现有的代币数量或也称为
reserveA
and
reserveB
,我们可以通过 UniswapV2Pair 合约访问它

IUniswapV2Pair(pairFor(factory, tokenA, tokenB)).getReserves()

现在,外部函数 addLiquidity, 返回
(uint amountA, uint amountB, uint liquidity)
,那么它是如何计算的呢?

通过UniswapV2Library拿到上面提到的reserves
之后,还有一系列的检查

如果
该对不存在,并且新创建
amountA

amountB
返回一个新的,则将
amountADesired
作为
amountBDesired
参数传递(见上文)。

否则,它会做这个操作

amountBOptimal = amountADesired.mul(reserveB) / reserveA;

如果
amountB
小于
或等于

amountBDesired
那么它将返回:

(uint amountA, uint amountB) = (amountADesired, amountBOptimal)

否则,它将返回

(uint amountA, uint amountB) = (amountAOptimal, amountBDesired)

其中
amountAOptimal
的计算方式与
amountBOptimal

然后,要计算
liquidity
返回值将经过以下过程:

首先,它将使用现有/新创建的对的地址部署 UniswapV2Pair 合约。

它是如何做到的?它计算一对的 CREATE2 地址而无需进行任何外部调用:(
阅读有关 CREATE2 Opcode 的更多信息

pair = address(uint(keccak256(abi.encodePacked(address(uint(keccak256(abi.encodePacked(
    hex'ff',
    factory,
    keccak256(abi.encodePacked(token0, token1)),
    hex'96e8ac4277198ff8b6f785478aa9a39f403cb768dd02cbee326c3e7da348845f' // init code hash
))));

然后,它获取新部署合约的地址,我们需要用它来从这对代币中铸造代币。

当您向货币对添加流动性时,合约
会生成 LP 代币
;当你移除流动性时,LP 代币就会被销毁。

pairFor
因此,首先我们使用UniswapV2Library获取地址:

address pair = UniswapV2Library.pairFor(factory, tokenA, tokenB);UniswapV2Library.pairFor(factory, tokenA, tokenB);

因此,稍后可以铸造 ERC20 代币并计算返回的
流动性:

liquidity = IUniswapV2Pair(pair).mint(to);

如果您想知道为什么它最终成为 ERC20,在 mint 函数中它是
这样存储的
https://github.com/Uniswap/v2-core/blob/ee547b17853e71ed4e0101ccfd52e70d5acded58/contracts/UniswapV2Pair.sol#L112
)

uint balance0 = IERC20(token0).balanceOf(address(this));
uint balance1 = IERC20(token1).balanceOf(address(this));

****函数
removeLiquidity():

function removeLiquidity(
    address tokenA,
    address tokenB,
    uint liquidity,
    uint amountAMin,
    uint amountBMin,
    address to,
    uint deadline
) external returns (uint amountA, uint amountB);

从池中移除流动性意味着燃烧 LP 代币以换取一定数量的基础代币。

IUniswapV2Pair(pair).transferFrom(msg.sender, pair, liquidity);

然后,外部函数返回两个值
(uint amountA, uint amountB)
,这些值是使用传递给函数的参数计算的。

随提供的流动性返回的代币数量
计算如下

amount0 = liquidity.mul(balance0) / _totalSupply; 
amount1 = liquidity.mul(balance1) / _totalSupply;

然后它将
这些数量的代币转移到指定的地址

_safeTransfer(_token0, to, amount0);
_safeTransfer(_token1, to, amount1);

您的 LP 代币份额越大,销毁后获得的储备份额就越大。

上面的这些计算发生
在 burn 函数内部

IUniswapV2Pair(对).burn(对)

https://github.com/Uniswap/v2-periphery/blob/0335e8f7e1bd1e8d8329fd300aea2ef2f36dd19f/contracts/UniswapV2Router02.sol#L114
)

IUniswapV2Pair(pair).burn(to)

****函数
swapExactTokensForTokens()

function swapExactTokensForTokens(
    uint amountIn,
    uint amountOutMin,
    address[] calldata path,
    address to,
    uint deadline
) external returns (uint[] memory amounts);

Uniswap 的核心功能是交换代币,所以让我们弄清楚代码中发生了什么,以便更好地理解它

您很可能听说过流动资金池中使用的
神奇公式

X * Y = K

所以,这将首先发生在 swap 函数内部
getAmountOut()

里面用到的关键函数有:

TransferHelper.safeTransferFrom().safeTransferFrom()

代币金额发送
到配对代币的地方

在 UniswapV2Pair 合约的较低级别交换功能中,它将是

_safeTransfer(_token, to, amountOut);

这将
实际转移
回预期地址。

我知道信息量很大,但您将有足够的时间阅读所有内容,直到完全理解为止。所以……

UniswapV2Factory.sol

工厂合约是所有已部署对合约的注册表。这个合约是必要的,因为我们不希望有成对的相同代币,这样流动性就不会分成多个相同的对。

该合约还简化了配对合约的部署:无需通过任何外部调用手动部署配对合约,只需调用工厂合约中的方法即可。

好吧,让我们倒回去,因为在上面的这些行中已经说了非常重要的事情。我们把它们拆分开来分别分析:

该合约是所有已部署对合约的注册表

只部署了一个工厂合约,该合约用作 Uniswap 交易对的官方注册处。

现在,我们在代码中的什么地方看到了它以及发生了什么:

address[] public allPairs;

它有 的数组
allPairs
,如上所述,存储在这个合约中。这些对被添加到一个方法中,该方法
createPair()
通过
将新初始化的对推
送到数组来调用。

allPairs.push(pair);push(pair);

这个合约是必要的,因为
我们不想拥有成对的相同代币

mapping(address => mapping(address => address)) public getPair;

它具有该对的地址与构成该对的两个令牌的映射。这用于检查一对是否已经存在。

require(getPair[token0][token1] == address(0), 'UniswapV2: PAIR_EXISTS');

该合约还简化了配对合约的部署

这是一个更深层次的话题,但我将尝试总结一下这里发生的事情的重要性。

在以太坊中,合约可以部署合约。可以调用已部署合约的函数,该函数将部署另一个合约。

您不需要从您的计算机上编译和部署合约,您可以通过现有合约来执行此操作。

那么,Uniswap 是如何部署智能合约的呢?

通过使用操作码
CREATE2

bytes memory bytecode = type(UniswapV2Pair).creationCode;type(UniswapV2Pair).creationCode;
bytes32 salt = keccak256(abi.encodePacked(token0, token1));
assembly {
    pair := create2(0, add(bytecode, 32), mload(bytecode), salt)
}

在第一行,我们得到创建字节码
UniswapV2Pair

下一行创建了
salt
一个字节序列,用于确定性地生成新合约的地址。

最后一行是我们调用以使用+
create2
确定性地创建新地址的地方。部署。
bytecode``salt``UniswapV2Pair

并得到对地址,我们可以看到这是
createPair()
函数的返回值

function createPair(
  address tokenA, address tokenA, 
  address tokenB
) external returns (address pair)

当提供的标记不是现有的对
_addLiquidity()
时,它在内部函数中使用。

所以,这就是关于 Uniswap 代码的全部内容。

现在,为了看到我们测试的所有内容,我可以推荐您查看 Smart Contract Programmer 在他的
defi-by-example 内容
中实现的代码,他已经
在视频中进行了解释

在这里你可以看到我们可以增加流动性的方式

function addLiquidity(
  address _tokenA,
  address _tokenB,
  uint _amountA,
  uint _amountB
) external {
  IERC20(_tokenA).transferFrom(msg.sender, address(this), _amountA);
  IERC20(_tokenB).transferFrom(msg.sender, address(this), _amountB);

  IERC20(_tokenA).approve(ROUTER, _amountA);
  IERC20(_tokenB).approve(ROUTER, _amountB);

  (uint amountA, uint amountB, uint liquidity) =
    IUniswapV2Router(ROUTER).addLiquidity(
      _tokenA,
      _tokenB,
      _amountA,
      _amountB,
      1,
      1,
      address(this),
      block.timestamp
    );

  emit Log("amountA", amountA);
  emit Log("amountB", amountB);
  emit Log("liquidity", liquidity);
}

以及我们必须如何考虑
消除流动性

function removeLiquidity(address _tokenA, address _tokenB) external {
  address pair = IUniswapV2Factory(FACTORY).getPair(_tokenA, _tokenB);

  uint liquidity = IERC20(pair).balanceOf(address(this));
  IERC20(pair).approve(ROUTER, liquidity);

  (uint amountA, uint amountB) =
    IUniswapV2Router(ROUTER).removeLiquidity(
      _tokenA,
      _tokenB,
      liquidity,
      1,
      1,
      address(this),
      block.timestamp
    );

  emit Log("amountA", amountA);
  emit Log("amountB", amountB);
}

通过Github 获取更多区块链学习资料!

https://github.com/Manuel-yang/BlockChainSelfLearning

目的:分析xxl-job执行器的注册过程

流程:

  1. 获取执行器中所有被注解(
    @xxlJjob
    )修饰的
    handler
  2. 执行器注册过程
  3. 执行器中任务执行过程

版本:
xxl-job 2.3.1

建议:下载
xxl-job
源码,按流程图
debug
调试,
看堆栈信息并按文章内容理解执行流程

完整流程图:

img

查找Handler任务

部分流程图:

img

首先启动管理台界面(服务
XxlJobAdminApplication
),然后启动项目中给的执行器实例
(SpringBoot)
;

img

这个方法是扫描项目中使用
@xxlJob
注解的所有handler方法。接着往下走

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
        return;
    }
    //获取该项目中所有的bean,然后遍历
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {
        Object bean = applicationContext.getBean(beanDefinitionName);

        Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
        try {
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        //注意点★
                        @Override
                        public XxlJob inspect(Method method) {
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        }
        //没有跳过本次循环继续
        if (annotatedMethods==null || annotatedMethods.isEmpty()) {
            continue;
        }
    	//获取了当前执行器中所有@xxl-job的方法,获取方法以及对应的初始化和销毁方法
        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
            Method executeMethod = methodXxlJobEntry.getKey();
            XxlJob xxlJob = methodXxlJobEntry.getValue();
            // regist
            registJobHandler(xxlJob, bean, executeMethod);
        }
    }
}


Spring
案例执行器中有5个
handler
:

img

XxlJobExecutor.registJobHandler()中部分源码

String name = xxlJob.value();
//make and simplify the variables since they'll be called several times later
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) {
    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
}
if (loadJobHandler(name) != null) {
    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}

然后进行遍历注册;开始进行名字判断:

  1. 判断bean名字是否为空
  2. 判断bean是否被注册了(存在了)

loadJobHandler
校验方式会去该方法中查找:当bean注册完成后时存放到
jobHandlerRepository
一个
map
类型中;

private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler loadJobHandler(String name){
    return jobHandlerRepository.get(name);
}

executeMethod.setAccessible(true);
它实现了修改对象访问权限的功能,参数为true,则表示允许调用方在使用反射时忽略Java语言的访问控制检查.

往后走会判断该注解的生命周期方法(
init和destroy
)

  1. 未设置生命周期,则直接开始注册
//注意MethodJobHandler,后面会用到
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
//添加执行器名字及对应的hob方法信息(当前类、方法、init和destroy属性)
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
    logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
    return jobHandlerRepository.put(name, jobHandler);
}
  1. 有生命周期,设置init和destroy方法权限
if (xxlJob.init().trim().length() > 0) {
    try {
        initMethod = clazz.getDeclaredMethod(xxlJob.init());
        initMethod.setAccessible(true);
    } catch (NoSuchMethodException e) {
        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
    }
}
if (xxlJob.destroy().trim().length() > 0) {
    try {
        destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
        destroyMethod.setAccessible(true);
    } catch (NoSuchMethodException e) {
        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
    }
}

首先检查
@XxlJob
注解中的
init
属性是否存在且不为空。如果存在,则尝试获取该类中名为
init
的方法,并将其设置为
可访问状态
,以便后续调用。

同理,代码接下来也检查了
@XxlJob
注解中的
destroy
属性是否存在且不为空,如果是,则获取该类中名为
destroy
的方法,并设置其为
可访问状态

在这个过程中,如果某个方法不存在或者无法被访问,则会抛出
NoSuchMethodException
异常,并且使用
throw new RuntimeException
将其包装并抛出一个运行时异常。这样做的目的是为了提醒开发人员在任务处理器类中正确地设置
init和destroy
属性,并确保方法名称与属性值一致。

执行器的注册过程

部分流程图:

img

public void afterSingletonsInstantiated() {

    // init JobHandler Repository
    /*initJobHandlerRepository(applicationContext);*/

    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

在扫描完执行器中所有的任务后,开始进行执行器注册
XxlJobSpringExecutor中的super.start()
方法。

在初始化执行服务器启动之前,进行了四种操作,初始化日志、初始化
adminBizList
地址(可视化管理台地址)、初始化日志清除、初始化回调线程等。

这里需要注意的是第二步初始化地址,在初始化服务器启动的时候需要用到。

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

    // fill ip port
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

    // generate address
    if (address==null || address.trim().length()==0) {
        String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
        address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
    }

    // accessToken
    if (accessToken==null || accessToken.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
    }

    // start
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}

继续到
initEmbedServer
,开始初始化ip地址和端口等,
需要明白的是,这一步的参数获取方式其实是第一步读取
**XxlJobConfig**
获得的;
进行ip的校验和拼接等操作,开始进行真正的注册。

创建一个
嵌入式的HTTP服务器,
将当前执行器信息(包含应用名称和IP地址端口等)注册到注册中心,注册方式的实现在
ExecutorRegistryThread
中实现。

校验名字和注册中心,如果注册中心不可用,则等待一段时间后重新尝试连接。

// registry
while (!toStop) {
    try {
        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            try {
                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                    registryResult = ReturnT.SUCCESS;
                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                    break;
                } else {
                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                }
            } catch (Exception e) {
                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
            }

        }
    } catch (Exception e) {
        if (!toStop) {
            logger.error(e.getMessage(), e);
        }

    }

    try {
        //心跳检测,默认30s
        if (!toStop) {
            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
        }
    } catch (InterruptedException e) {
        if (!toStop) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
        }
    }
}

开启一个新线程,首先
构建注册参数(包含执行器分组、执行器名字、执行器本地地址及端口号),遍历注册中心地址,开始进行执行器注册,注册方式通过发送http的post请求。

@Override
public ReturnT<String> registry(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}


debug
的过程中,
XxlJobRemotingUtil
执行到
int statusCode = connection.getResponseCode();
才会跳转到
JobApiController.api
中的注册地址.

// services mapping
if ("callback".equals(uri)) {
    List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
    return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
    RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
    return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
    RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
    return adminBiz.registryRemove(registryParam);
} else {
    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}

最后进入到
JobRegistryHelper.registry()
方法中完成数据库的入库和更新操作。

通过更新语句判断该执行器是否注册,结果小于1,那么保存注册器信息,并向注册中心发送一个请求,更新当前执行器所属的应用名称、执行器名称和 IP 地址等信息,否则跳过。

public ReturnT<String> registry(RegistryParam registryParam) {
	//.......
    // async execute
    registryOrRemoveThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            //更新注册表信息
            int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
            if (ret < 1) {
                //保存执行器注册信息
                XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

                // fresh 刷新执行器状态
                freshGroupRegistryInfo(registryParam);
            }
        }
    });

    return ReturnT.SUCCESS;
}

至此执行器的注册流程分析完成。

执行器中的任务执行过程

img

部分流程图:

img

执行器中的任务流程比较简单,如果执行器启动的话,那么每次执行任务是通过
JobThread
通过
Cron
表达式进行操作的。

通过
handler.execute()
进行执行,是在框架内部通过反射机制调用作业处理器对象
handler
中的
execute()
方法实现的。在这个过程中,handler 对象表示被加载的作业处理器,并且已经调用了
init()
方法进行初始化。

method.invoke()
方法使用反射机制调用指定对象
target
中的方法
method
。在这个方法中,
target
表示作业处理器对象,
method
表示作业处理器中的
execute()
方法。

通过上述方法,获取到
SampleXxlJob.demoJobHandler
的任务,然后开始进行任务逻辑操作。

本文主要聊一聊云原生时代分布式转码系统实施过程中碰到的一些问题。
聊问题之前简单介绍一下我们的分布式转码方案。

云原生分布式转码

在计算资源招之即来的云计算时代,正在重构着软件架构的方方面面。

对软件架构师或者运维管理者影响比较大的一个点便是不需要在做容量规划,不需要提前评估为了应对某个活动应该准备多少台机器,这个特点也深刻影响软件架构的设计。

分布式转码方案

在之前的文章中有说到视频转码主要分为3个步骤:

  1. 切片:将输入的视频进行切片,切分成一个个较小的视频片段
  2. 转码:将一个个小的视频片段下发到不同的机器上进行转码,并行执行,充分利用多个实例的计算能力
  3. 合并:将转码后的小视频片段合并成一个视频

image.png


         切片                  转码                  合并
输入视频 ------> (n个)转码任务 ------> (n个)转码结果 -----> 输出视频


为什么要切片?

为了加快转码速度。
视频转码是一个非常耗时的操作,让不同的机器并行转码不同的视频片段,可以充分利用大规模计算资源来加快最耗时的转码流程。

举一个例子,假设1台4核8G的机器能够提供2倍速的转码速度,

  • 转码1个小时的视频则需要30分钟;
  • 如果将1个小时视频分成2个片段(每段30分钟),就能够并行在2台机器上执行,那么每个片段分别只需要15分钟就能够转码完成;
  • 如果将1个小时视频分成4个片段(每段15分钟),就能够并在在4台机器上执行,那么每个片段分别只需要7.5分钟就转码完成;

理论上对视频进行合理的切片加上充足的计算资源,能够极大的提高转码的速度。
虽然这种分布式切片转码方案优势这么明显,但是在实践过程中也发现了不少问题。
想借此文跟大家探讨一下我们碰到的部分问题以及解决方案,看看大家有没有更好的方案。
要是能够给大家带来一些帮助、少踩两个坑,目的就达到了。

碰到的问题

  1. 不聊工程上的问题,工程上的问题都比较好解决
  2. 主要聊ffmpeg切片、转码、合并过程中所遇到的问题
  3. 知识储备的原因,有些ffmpeg底层的原理可能会一笔带过。。

m3u8转码后有杂音

对于m3u8转码,我们在切片环节直接用ts文件作为切片,转码后再合并为最终的视频文件。

这么做的好处是大大缩短了切片的时间,如果用ffmpeg 进行切片需要将文件读取到内存再切成一个个小视频;
而直接用ts文件作为切片的好处是大大缩短了切片这个环节的耗时,相当于纯文本处理,把m3u8文件中的ts文件地址解析出来就行。

问题现象

m3u8转码后生成的视频有轻微的杂音,而原m3u8文件中

问题解释

因为采样率的原因,ts文件中的音视频流并不是完全对其的。如:

直接播放m3u8文件时,会将所有的ts文件都看成一个整体,即将每个ts片段中的视频流和音频流都连接起来的,所以播放时没有杂音。

以上图第一个ts为例,将ts文件转码为mp4后,缺失的音频流将默认会补齐,这就造成了杂音的出现。

解决方案

将音视频流分离,单独进行转码。

  1. 对于视频流,仍然采用切成小视频片段的方式进行转码
  2. 对于音频流,转码不耗资源,就不再进行切片,而是对整个音频流进行转码

image.png

这样m3u8转码出来就不会有杂音了。

附对应的切片跟合并命令

# 切片命令
ffmpeg -i input.mp4 -map 0:v -f segment -segment_time 15 -reset_timestamps 1 -c copy segment%d.mp4 -map 0:a -c:v vframes 1 -c:a copy audio.mp4 -y

# 合并视频命令
ffmpeg -f concat -safe 0 -i concat.txt -c copy concat.mp4

# 合并视频与音频命令
ffmpeg -i concat.mp4 -i audio.mp4 -map 0:v -map 1:a -c:v copy -c:a copy output.mp4 -y

转码后视频变长&音画不同步

问题现象

转码后的视频长度较输入视频长度变长了。
如输入视频是3600s,输出视频可能是3601s,多了1s;而且输入视频时长越长,误差越大。

问题解释

与上一个问题「m3u8转码后有杂音」的原因类似:

  1. 因为采样率的原因,切出来的视频片段中的音视频流时长不完全一致
  2. 对视频片段进行转码时取最长的流时长作为输出后的视频片段的时长
  3. 在合并环节直接将所有转码后的视频片段拼接在一起,所以输出的视频时长变长了

解决方案

与上一个问题「m3u8转码后有杂音」的解决方案类似:将音视频流分别抽离进行转码,最后在合并环节将音视频流合在一起。
这样避免了在切片环节音视频流时长互相影响。

m3u8文件切片起止时间不准

问题现象

举一个例子,从a.m3u8的第10s开始切5s视频出来,命令如下:

ffmpeg -ss 10 -t 5 -i a.m3u8 -c copy out.mp4

实际输出的out.mp4不一定从a.m3u8第10s开始的。

问题解释

在切片环节已经不准确了,合并出来的视频肯定也是不准的,所以我们要在切片环节把这个问题解决掉。

这里我引入最近大火的ChatGPT的回答:
m3u8格式的视频不支持随机访问,而且由于ts文件的长度不固定,ffmpeg很难精确定位到seek目标所在位置。

image.png

解决方案

我们是怎么解决的呢,可以参考ChatGPT给出来的4种解决方案的第4个方案:

  1. 将包含切片起止时间的最小ts文件集合筛选出来
  2. 将最小ts文件集合转封装为mp4
  3. 重新计算出在mp4上实际的起止时间,进行切片

想必大家肯定有很多疑问,我们一步一步来解释一下。

  1. 如何将包含切片起止时间的最小ts片段集合筛选出来?

    m3u8文件中的每一个ts文件都标明了该ts文件的时长,我们可以借助这个将最小的ts文件集合筛选出来

  2. 为什么要转封装为mp4?

    因为mp4支持随机访问

  3. m3u8转封装为mp4会不会出现m3u8中的音视频编码不支持mp4的情况?

    不会。可以参考维基百科:
    https://en.wikipedia.org/wiki/Comparison_of_video_container_formats

  4. 如何重新计算出在mp4上的实际起止时间?

    参考问题1的回答,可以通过每个ts文件的时间,计算出最终在mp4上实际的起止时间

PS:ChatGPT真的太强大了,要是ChatGPT早点出来会少走很多弯路。

输入视频的音视频编码不规范

问题现象

在切片环节切片失败

问题解释

用户输入的视频文件编码不规范:

  1. 视频编码正常,音频编码不规范。如将pcm编码的音频流封装到了mp4格式中
  2. 视频编码与音频编码均不规范。大概率是原视频被强行改了后缀,如将a.mp4改成a.mxf

因为切片环节设计到重新封装的操作,将1个mp4切成多个mp4就需要重新封装。
在重新封装时就会报错,不同的容器格式支持不同的音视频编码,可以参考维基百科:
https://en.wikipedia.org/wiki/Comparison_of_video_container_formats

有意思的是用户意识不到他们的视频文件有问题,因为音视频流都能够正常解码(即能够正常播放)。

解决方案

第2个问题在目前的分布式转码方案中还没有好的解决方案,再加上出现概率非常小(用户手动更改文件后缀),这里不做讨论。
主要讨论一下第1个问题的解决方案:在切片环节就将音频文件转码了。
image.png

为什么能够在切片环节对音频转码?因为音频转码不耗CPU,不会明显影响到切片环节的速度。

mxf不支持抽离音频流

问题现象

mxf格式的视频经过切片命令抽离出来的音频文件时长只有1帧(0.04s)。
因为我们切片命令里只给了1帧图片到音频文件中。

问题解释

mxf以视频流的长度作为整个视频的长度,意味着音频切片只有一帧图像的话,整个视频时长只有1帧的时长。
所以问题准确的描述应该为mxf不支持以这种方式抽离音频流。

解决方案

既然mxf不支持以这种方式抽离音频流,那么方案有2个:

  1. 将所有的视频流不转码,都copy到音频文件中,还是只对音频流进行转码。
  2. 看看有没有完全包含mxf支持的音视频编码格式并且支持这种方式抽离音频流的封装格式。

方案1相当于音频文件对输入视频进行了一次拷贝,如果输入视频特别大,那么音频文件也将会特别大。
我们最终采用的方案2。
有没有这种格式呢?即支持所有mxf 音视频编码格式,又支持这种方式抽离音频。
还真有。通过维基百科:
https://en.wikipedia.org/wiki/Comparison_of_video_container_formats
,可以发现avi格式能够支持所有mxf支持的音视频编码格式;而且经过测试,也支持这种方式将音频抽离出来。

所以对于mxf格式的视频,在切片环节切出来的音频文件设置为avi格式。

ffmpeg对图片支持不够友好

问题现象

对于头信息比较大的图片再加上不能准确读取后缀的话(如加签访问的场景),很大概率会解析失败,获取到错误的meta信息。
如将图片识别出2帧。

问题解释

ffmpeg确实对图片支持不太友好

解决方案

将图片下载到本地,再进行识别。
这种方案有一个缺点是耗时会非常长,如果业务方同步调用获取信息接口有可能会超时。

阿里云的转码方案

决定自研转码之前,我们是使用阿里云的转码服务的。
上面碰到的很多问题在阿里云转码服务中都不存在,这里面固然有阿里云在音视频转码领域的积累,但是我想也跟他们的转码方案有很大的关系。

经过长时间的观察,我猜阿里云没有使用
切片-转码-合并
这种方案,而是
使用高性能服务器,不切片直接对视频进行转码

为什么呢?因为上述问题中,基本上都是在切片环节出现的问题。尤其是「输入视频的音视频编码不规范」这个问题,只要使用
切片-转码-合并
这种方案,那么百分之百会碰到跟我们的问题。而阿里云不能使用我们这种非常规手段去解决。
因为视频封装格式、音视频编码格式非常多,出于稳定性考虑,不可能碰到问题了再case by case的去解决。

但是
使用高性能服务器,不切片直接对视频进行转码
这种方案固然能够避免很多切片环节的问题,但是也很容易造成转码任务阻塞。举一个例子:
在机器资源池不大的情况下,A用户输入了很多优先级低转码非常耗时的视频,把资源池都占满了;而随后B用户输入了1个优先级很搞的视频,虽然优先级很高,但是也得等待A用户正在转码的视频转完,让出1台机器才能执行。
这种情况对B用户体验就非常不好,在A用户转码完成之前对B用户来说服务是不可用的。

最后

好了,本篇将我们实施分布式转码过程中所碰到的比较难解的问题以及解决方案都聊了一遍。
当然,以后还会碰新的问题,我会将其放在一个系列文章里面讨论,希望能够给大家带来一些帮助或者少踩一些坑。