2024年1月

经过一段时间慢如蜗牛的准备,园子的第一款鼠标垫终于在园子的第一个网店(
博客园淘宝店
)上架啦,不知道这款简陋鼠标垫,是否是您值得拥有的周边。

鼠标垫定价:
¥19.00
,VIP会员优惠价:
¥12.00
,PLUS会员优惠价:
¥1.00

经历了万事开头难的
起名失败
,网店运营与
周边制作
的小白,印制过程的小意外(厂家库存材料不够),这款简陋鼠标垫终于登上了简装淘宝店的展台。


会员力量
点亮园子希望的同时,希望从这款鼠标垫开始的周边能给希望增添亮光。

注:
1)下单购买的鼠标垫由生产厂家直接从广东惠州发货
2)如果您不想在淘宝店购买,可以加园子的
企业微信
购买

一、实现方案

单独贴代码可能容易混乱,所以这里只讲实现思路,代码放在最后汇总了下。

想要实现一个简单的工业园区、主要包含的内容是一个大楼、左右两片停车位、四条道路以及多个可在道路上随机移动的车辆、遇到停车位时随机选择是否要停车,简单设计图如下

二、实现步奏

2.1 引入环境,天空和地面

引入天空有三种方式:

1) 第一种通过添加天空盒导入六个不同角度的天空图片可以形成,简单方便,缺点是在两个面之间会有视觉差

2) 第二种是设置scene的背景和环境是一张天空图片来实现的,缺点图片单一,而且在天、地斜街处很生硬

3) 不需要导入外部图片,通过在一个球体上添加渐变色实现,缺点球体只有一部分是天空颜色,内部为白色,需要设定旋转范围

4) 使用Three.js中的example中的Sky.js实现,效果比较完美

引入地面:给一个大平面添加一张草地纹理即可。

2.2 创建一块地基

创建一个固定长度的平面,然后绕X轴旋转即可

2.3 布置围墙

导入一个围墙模型作为一个围墙的基本单位A,算出围墙所占的长和宽,为了完整性,可以将园区的长和宽设定为A的整数倍。

2.4 办公楼、停车场、充电桩加载

1)导入一个办公大楼模型

2)创建一个停车场类Parking.js,主要用来创建单个停车位,其中需要计算出停车位的进入点,方便以后车辆进入。

3)导入一个充电桩,每两个停车位使用一个充电桩

2.5 添加办公楼前景观、树、公交站点

1)在指定位置导入景观模型和公交站点模型

2)导入树模型,在园区前侧围墙均匀分布

2.6 铺设路面


首先道路可以细化为上下行多个车道,而车辆则是行驶在各车道的中心线位置处,所以为了方便后续车辆的控制,需要先将道路拆分,然后获取各个道路中心线和车道中心线信息

1)创建一个道路类Road.js,道路点信息传入的是图中红色点信息(图中菱形点),需要标记出从哪个点开始道路非直线,

比如点信息格式为:[{ coord: [10, 0], type: 1}, { coord: [10, 10], type: 0}, { coord: [0, ], type: 1}] ;0代表曲线点,1代表直线点

2)由于使用传入的原始道路点无法绘制出平滑的曲线而且在细化道路点的时候直线点数据细化不明显,所以需要先按照一定的间隔插入部分点信息(图中绿色五角星点)

3)根据细化后的道路点按照道路宽度向两边开始扩展点信息,扩展方式通过获取当前点A和前一个点B组成的直线,取AB上垂线且距AB直线距离均为路宽的点即可,最终得到道路左侧点A和右侧点B

4)通过ThreeJS中创建一条平滑曲线获取曲线上的多个点即可得到三条平滑的曲线A、B、C。

5)经过第四步虽然可以得到道路数据,但是无法区分上下行,仍然不满足使用,通过图二上下行车辆最后生成组合成的一条闭合轨迹应该是逆时针的,

所以需要将最后生成的A、B线顶点反转拼接成一个完整的多边形,如果是逆时针则可以得到正确的上下行路线。

6)根据道路顶点即可画出道路面以及道路边界和中心线。

2.7 添加车辆以及车辆在道路随机移动的逻辑


创建一个移动类,可以承接车辆或者行人,当前以车辆为主,主要包含移动轨迹、当前移动所在道路和车道、车位停车、驶离车位等内容。

1)创建一个Move.js类,创建对象时传入停车场对象信息、道路对象信息,方便后续移动时可以计算出轨迹信息

2)根据提供的初始位置计算出最近的道路和车道信息,与当前位置拼接在一起即可生成行动轨迹。

3)当车辆移动到道路尽头时可以获取到本道路的另外一条车道即可实现掉头

4)路口的判断:图三中,车辆由M车道途径N车道时,由M车道左侧当前位置和上一个位置组成的线段与N车道右侧车道起始或者终止点组成的线段有交集时则代表有路口,同样方法可以得到右侧道路的路口信息

5)路口处拐入其他车道的轨迹生成:根据4)可以找到转向N的车道信息,但是无法保证平稳转向,所以可以通过找到M和N的车道中心线所在直线获取到交点C,然后由A、C、B生成一条贝塞尔曲线即可平滑转弯

2.8 添加停车逻辑以及车辆驶离逻辑

1)寻找停车场:如图四,车辆在向前移动时,移动到的每个点都和所有停车场的入口B的位置判断距离,如果小于一个固定值的则代表临近车位,可以停车。

2)停车方式:根据6)获取到的停车位,同时在当前路径上继续向前取15个点的位置C、B、A组成的曲线则是倒车入口的路径线。

三、遗留问题、待优化点

1. 拐弯添加的点不多,所以在拐弯处速度较快

---  可以通过在拐弯处组成的多个点通过生成的线获取多个点来解决这个问题

2. 需要添加一个路口来管理各条之间的关系

--- 优点:(1). 有了路口后,可以解决车辆在路口移动时实时计算和其他路口的位置关系,可能会导致路口转弯混乱,通过在路口中心点生成一个外接圆,如果进入路口,则锁死移动方向,如果移出路口则解除锁定

(2). 解决在路口处,各道路绘制的边线有重叠问题,使各个道路之间能看着更平滑

缺点:最好不需要导入路口,而是由各个道路之间的相交关系计算得出,计算逻辑较为复杂。

3. 最好能添加一个停车场方便管理车位以及车辆驶入、驶离停车位

--- 添加停车场,车辆只需要和停车场的位置计算即可,不需要和每个停车位计算位置,减少冗余计算,而且车辆如果和单个停车位计算位置,可能存在从停车位A使出,途径相邻的停车位B,又会进入。

添加停车场通过给停车场添加标识即可解决这个问题

4. 车位和车道的边缘线无法加宽

--- Three.js目前的缺陷,尝试几种办法暂时没有解决

5. 没有添加车辆防碰撞功能

四、完整的代码

为了简单点,没有用Node安装依赖包,下述JS中引入的其他文件均在threeJS安装包中可以找到,拷贝过来即可。


<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>园区案例</title>
</head>

<body style="margin: 0;">
    <div id="webgl" style="border: 1px solid;"></div>
<script type="importmap">{"imports": {"three": "./three.module.js"}
}
</script> <script type="module" src="./Objects/Main.js"></script> </script> </body> </html>

主页面index.html


/**
* 办公园区
*/import* as THREE from 'three';
import { OrbitControls } from
'../OrbitControls.js';
import { GLTFLoader } from
'../GLTFLoader.js';
import { addEnviorment, segmentsIntr } from
'../Objects/Common.js';
import Move from
'./Move.js';
import Road from
'./Road.js';
import Parking from
'./Parking.js';/**
* 1. 先引入环境 天空和地面
* 2. 创建一块全区的地皮
* 3. 布置围墙
* 4. 办公楼、停车场、充电桩的位置
* 5. 添加办公楼前装饰物、树、公交站点
* 6. 铺设路面
* 7. 写动态逻辑,设置页面动态化
*/const wWidth= window.innerWidth; //屏幕宽度 const wHeight = window.innerHeight; //屏幕高度 const scene= newTHREE.Scene();
let renderer
= null;
let camera
= null;
let controls
= null;
const roadObj
= []; //存储道路数据 const moveObj = []; //存储车辆数据 //园区宽度本身 const long = 600; //园区的长 const width = 300; //园区的宽//停车场的长和宽 const [parkingW, parkingH] = [20, 30];
const parks
= []; //存储停车场数据 let everyL= 0; //单个围墙的长度 let everyW = 0; //单个围墙的厚度 let buildH = 0; //办公楼的厚度 let wallNumL = 0; //展示园区占多少个墙的长度,当前设置为最大的整数-1 let wallNumW = 0;/**
* 初始化
*/ functioninit() {
addEnvir(
true, false);
createBase();
loadWall();
setTimeout(()
=>{
loadBuildings();
setTimeout(()
=>{
loadOrnament();
},
200)
loadRoad();
loadBusAndPeople();
addClick();
},
500)
}
/**
* 添加相机等基础功能
*/ function addEnvir(lightFlag = true, axFlag = true, gridFlag = false) {//初始化相机 camera = new THREE.PerspectiveCamera(100, wWidth / wHeight, 1, 3000);
camera.position.set(
300, 100, 300);
camera.lookAt(
0, 0, 0);//创建灯光 //创建环境光 const ambientLight = new THREE.AmbientLight(0xf0f0f0, 1.0);
ambientLight.position.set(
0,0,0);
scene.add(ambientLight);
if(lightFlag) {//创建点光源 const pointLight = new THREE.PointLight(0xffffff, 1);
pointLight.decay
= 0.0;
pointLight.position.set(
200, 200, 50);
scene.add(pointLight);
}
//添加辅助坐标系 if(axFlag) {
const axesHelper
= new THREE.AxesHelper(150);
scene.add(axesHelper);
}
//添加网格坐标 if(gridFlag) {
const gridHelper
= new THREE.GridHelper(300, 25, 0x004444, 0x004444);
scene.add(gridHelper);
}
//创建渲染器 renderer = new THREE.WebGLRenderer({ antialias:true, logarithmicDepthBuffer: true});
renderer.setPixelRatio(window.devicePixelRatio);
renderer.setClearColor(
0xf0f0f0, 0.8);
renderer.setSize(wWidth, wHeight);
//设置three.js渲染区域的尺寸(像素px) renderer.render(scene, camera); //执行渲染操作 controls= newOrbitControls(camera, renderer.domElement);//设置拖动范围 controls.minPolarAngle = - Math.PI / 2;
controls.maxPolarAngle
= Math.PI / 2 - Math.PI / 360;

controls.addEventListener(
'change', () =>{
renderer.render(scene, camera);
})
//添加天空和草地 scene.add(...addEnviorment());functionrender() {//随机选择一个移动物体作为第一视角 //const cur = moveObj[3]; //if (cur) { //const relativeCameraOffset = new THREE.Vector3(0, 20, -15); //const cameraOffset = relativeCameraOffset.applyMatrix4( cur.target.matrixWorld ); //camera.position.x = cameraOffset.x; //camera.position.y = cameraOffset.y; //camera.position.z = cameraOffset.z; //// 始终让相机看向物体 //controls.target = cur.target.position; //camera.lookAt(...cur.target.position.toArray()); //} renderer.render(scene, camera);
requestAnimationFrame(render);
}
render();

document.getElementById(
'webgl').appendChild(renderer.domElement);
}
/**
* 创建园区的地基
*/ functioncreateBase() {
const baseGeo
= new THREE.PlaneGeometry(long, width);
baseGeo.rotateX(
-Math.PI / 2);
const baseMesh
= newTHREE.Mesh(
baseGeo,
newTHREE.MeshBasicMaterial({
color:
'#808080',
side: THREE.FrontSide
})
);
baseMesh.name
= 'BASE';
scene.add(baseMesh);
}
/**
* 加载围墙
*/ functionloadWall() {
const loader
= newGLTFLoader();
loader.load(
'./Objects/model/wall.gltf', (gltf) =>{
gltf.scene.scale.set(
3, 3, 3);
const source
=gltf.scene.clone();//获取单个围墙的大小 const box3 = newTHREE.Box3().setFromObject(gltf.scene);
everyL
= box3.max.x -box3.min.x;
everyW
= box3.max.z -box3.min.z;
wallNumL
= Math.floor(long / everyL) - 1;
wallNumW
= Math.floor(width / everyL) - 1;//加载后墙 //墙的起点和终点 const backS = [-long / 2, 0, -width / 2];for (let i = 0; i < wallNumL; i++) {
const cloneWall
=source.clone();
cloneWall.position.x
= backS[0] + everyL * i + everyL / 2;
cloneWall.position.z
= backS[2];
scene.add(cloneWall);
}
//加载左侧墙 const leftS = [-long / 2, 0, -width / 2];for (let i = 0; i < wallNumW; i++) {
const cloneWall
=source.clone();
cloneWall.rotateY(Math.PI
/ 2);
cloneWall.position.x
= leftS[0];
cloneWall.position.z
= leftS[2] + everyL * i + everyL / 2;
scene.add(cloneWall);
}
//加载右侧墙 const rightS = [-long / 2 + wallNumL * everyL, 0, -width / 2];for (let i = 0; i < wallNumW; i++) {
const cloneWall
=source.clone();
cloneWall.rotateY(Math.PI
/ 2);
cloneWall.position.x
= rightS[0];
cloneWall.position.z
= rightS[2] + everyL * i + everyL / 2;
scene.add(cloneWall);
}
//加载前侧墙 const frontS = [-long / 2, 0, -width / 2 + wallNumW *everyL];for (let i = 0; i < wallNumL; i++) {if (i !== Math.floor(wallNumL / 2)) {
const cloneWall
=source.clone();
cloneWall.position.x
= frontS[0] + everyL * i + everyL / 2;
cloneWall.position.z
= frontS[2];
scene.add(cloneWall);
}
}
})
}
/**
* 加载办公大楼以及停车场和充电桩
*/ functionloadBuildings() {
const loader
= newGLTFLoader();
loader.load(
'./Objects/model/buildings.gltf', (gltf) =>{
gltf.scene.scale.set(
4, 4, 4);//获取大楼的大小 const box3 = newTHREE.Box3().setFromObject(gltf.scene);
buildH
= box3.max.z -box3.min.z;
gltf.scene.position.z
= -width / 2 + buildH / 2;
scene.add(gltf.scene);
})
//添加左侧停车场 //左侧停车场起始点坐标 const leftSPos = [-long / 2 + everyW + parkingH / 2, 0, -width / 2 + everyW + parkingW / 2 + 3];for (let i = 0; i < 4; i++) {
const z
= leftSPos[2] + i *parkingW;
const parking
= newParking({
name: `A00${i
+ 1}`,
width: parkingW,
height: parkingH,
position: [leftSPos[
0], leftSPos[1] + 1, z]
})
scene.add(parking.group);
parks.push(parking);
}
//右侧充电桩起始点坐标 并预留位置给充电枪 const rightSPos = [-long / 2 + wallNumL * everyL - everyW - parkingH / 2 - 10, 0, -width / 2 + everyW + parkingW / 2 + 3];for (let i = 0; i < 4; i++) {
const parking
= newParking({
name: `B00${i
+ 1}`,
width: parkingW,
height: parkingH,
position: [rightSPos[
0], rightSPos[1] + 1, rightSPos[2] + i *parkingW],
rotate: Math.PI
})
scene.add(parking.group);
parks.push(parking);
}
//添加充电桩 const chargePos = [-long / 2 + wallNumL * everyL - everyW - 4, 0, -width / 2 + everyW + 3 +parkingW];
loader.load(
'./Objects/model/charging.gltf', (gltf) =>{for (let i = 0; i < 2; i++) {
const source
=gltf.scene.clone();
source.scale.set(
6, 6, 6);
source.rotateY(Math.PI
/ 2);
source.position.x
= chargePos[0];
source.position.y
= chargePos[1];
source.position.z
= chargePos[2] + i * 2 *parkingW;
scene.add(source);
}
})
}
/**
* 添加办公楼前装饰物、树、公交站点
*/ functionloadOrnament() {//加载办公室前方雕塑 const loader = newGLTFLoader();
loader.load(
'./Objects/model/bed.gltf', (bedGltf) =>{
bedGltf.scene.scale.set(
2, 2, 2);
bedGltf.scene.rotateY(
-Math.PI * 7 / 12);
loader.load(
'./Objects/model/sculpture.gltf', (sculGltf) =>{
sculGltf.scene.scale.set(
20, 20, 20);
sculGltf.scene.y
= sculGltf.scene.y + 4;
const group
= newTHREE.Group();
group.add(bedGltf.scene);
group.add(sculGltf.scene);
group.position.set(
0, 0, -width / 2 + everyW + buildH + 10);
scene.add(group);
});
});
//加载树木,沿街用的是柏树 loader.load('./Objects/model/songshu.gltf', (gltf) =>{
const source
=gltf.scene;
source.scale.set(
8, 8, 8);//前面墙的树木, 单个墙的中间区域放置一棵树 const frontS = [-long / 2 + everyL / 2, 0, -width / 2 + wallNumW * everyL - 5];for (let i = 0; i < wallNumL; i++) {//同样门口不放置树 if (i !== Math.floor(wallNumL / 2)) {
const temp
=source.clone();
temp.position.set(frontS[
0] + i * everyL, frontS[1], frontS[2]);
scene.add(temp);
}
}
});
//加载公交站点,位置在距离大门右侧第二单面墙处 loader.load('./Objects/model/busStops.gltf', (gltf) =>{
const source
=gltf.scene;
source.scale.set(
4, 4, 4);
gltf.scene.position.set(
-long / 2 + (Math.floor(wallNumL / 2) + 3) * everyL, 0, -width / 2 + wallNumW * everyL + everyW + 3);
scene.add(gltf.scene);
});
}
/**
* 铺设园区和园区外面的公路
* 包含公路以及部分人行道路
*/ functionloadRoad() {
const space
= 40;
const outWidth
= 40;//加载园区外面的公路 const outerP1 =[
{ coord: [
-long / 2, 0, -width / 2 + wallNumW * everyL + space], type: 1},
{ coord: [
long / 2, 0, -width / 2 + wallNumW * everyL + space], type: 1},
];
const road1
= newRoad({
name:
'road_1',
sourceCoord: outerP1,
width: outWidth,
showCenterLine:
true});
scene.add(road1.group);

const outerP2
=[
{ coord: [
-long / 2 + wallNumL * everyL + outWidth / 2 + 10, 0, -width / 2 + wallNumW * everyL + space - outWidth / 2 + 0.5], type: 1},
{ coord: [
-long / 2 + wallNumL * everyL + outWidth / 2 + 10, 0, -width / 2], type: 1},
];
const road2
= newRoad({
name:
'road_2',
sourceCoord: outerP2,
width: outWidth,
showCenterLine:
true,
zIndex:
0.8});
scene.add(road2.group);
//加载园区内的道路 const innerWidth = 25;
const color
= 0x787878;
const lineColor
= 0xc2c2c2;//加载到停车场的道路 const innerP1 =[
{ coord: [
-long / 2 + Math.floor(wallNumL / 2) * everyL + everyL / 2, 0, -width / 2 + wallNumW * everyL + space - outWidth / 2 + 0.5], type: 1},
{ coord: [
-long / 2 + Math.floor(wallNumL / 2) * everyL + everyL / 2, 0, -width / 2 + wallNumW * everyL + space - 60], type: 0},
{ coord: [
-long / 2 + Math.floor(wallNumL / 2) * everyL + everyL / 2 - innerWidth / 2, 0, -width / 2 + wallNumW * everyL + space - 60 - innerWidth / 2], type: 1},
{ coord: [
-long / 2 + parkingH + 20 + innerWidth / 2, 0, -width / 2 + wallNumW * everyL + space - 60 - innerWidth / 2], type: 0},
{ coord: [
-long / 2 + parkingH + 20, 0, -width / 2 + wallNumW * everyL + space - 60 - innerWidth], type: 1},
{ coord: [
-long / 2 + parkingH + 20, 0, -width / 2 + everyW + 10], type: 1},
];
const street1
= newRoad({
name:
'street_1',
sourceCoord: innerP1,
width: innerWidth,
showCenterLine:
true,
zIndex:
0.8,
planeColor: color,
sideColor: lineColor
});
scene.add(street1.group);
//加载到充电桩的道路 const innerP2 =[
{ coord: [
-long / 2 + Math.floor(wallNumL / 2) * everyL + everyL / 2, 0, -width / 2 + wallNumW * everyL + space - outWidth / 2 + 0.5], type: 1},
{ coord: [
-long / 2 + Math.floor(wallNumL / 2) * everyL + everyL / 2, 0, -width / 2 + wallNumW * everyL + space - 60], type: 0},
{ coord: [
-long / 2 + Math.floor(wallNumL / 2) * everyL + everyL / 2 + innerWidth / 2, 0, -width / 2 + wallNumW * everyL + space - 60 - innerWidth / 2], type: 1},
{ coord: [
-long / 2 + wallNumL * everyL - parkingH - everyW - 39, 0, -width / 2 + wallNumW * everyL + space - 60 - innerWidth / 2], type: 0},
{ coord: [
-long / 2 + wallNumL * everyL - parkingH - everyW - 39 + innerWidth / 2, 0, -width / 2 + wallNumW * everyL + space - 60 - innerWidth], type: 1},
{ coord: [
-long / 2 + wallNumL * everyL - parkingH - everyW - 39 + innerWidth / 2, 0, -width / 2 + everyW + 10], type: 1},
];
const street2
= newRoad({
name:
'street_2',
sourceCoord: innerP2,
width: innerWidth,
showCenterLine:
true,
zIndex:
0.8,
planeColor: color,
sideColor: lineColor
});
scene.add(street2.group);

roadObj.push(
road1,
road2,
street1,
street2
);

calFork();
}
/**
* 计算pointA和pointB 组成的直线与点集points是否有相交
* @param {*} points
* @param {*} pontA
* @param {*} pointB
*/ functionjudgeIntersect(points, pointA, pointB) {
let res
= { flag: false, interP: [] };for (let i = 0; i < points.length - 1; i++) {
const cur
=points[i];
const nextP
= points[i + 1];
const interP
= segmentsIntr(cur, nextP, pointA, pointB, true)if ( interP !== false) {
res.flag
= true;
res.interP
=interP;
res.index
=i;break;
}
}
returnres;
}
/**
* 计算各条道路的岔口信息并统计到道路对象中
*/ functioncalFork() {functionsetInter(cur, next, interP, corner, width) {
const circle
= new THREE.ArcCurve(corner[0], corner[2], width * 2).getPoints(20);
const cirPoints
= circle.map(e => new THREE.Vector3(e.x, 0, e.y));

cur.intersect.push({ name: next.name,
interPoint: interP,
corner: cirPoints,
cornerCenter: corner
});

next.intersect.push({
name: cur.name,
interPoint: interP,
corner: cirPoints,
cornerCenter: corner
});
}

roadObj.forEach((e, i)
=>{if (i < roadObj.length - 1) {for (let j = i + 1; j < roadObj.length; j++) {if (e.intersect.map(e => e.name).indexOf(roadObj[j].name) < 0) {
const middle
=roadObj[j].middle;//计算路牙和其他道路是否有相交 //左边路牙和下一条路的起始位置做对比 let inter = judgeIntersect(e.left, middle[0], middle[1]);if(inter.flag) {
const cornerCenter
= segmentsIntr(e.middle[inter.index], e.middle[inter.index + 1], middle[0], middle[1]);
setInter(e, roadObj[j], inter.interP, cornerCenter, roadObj[j].width);
continue;
}
//左边路牙和下一条路的终止位置做对比 inter = judgeIntersect(e.left, middle[middle.length - 1], middle[middle.length - 2])if(inter.flag) {
const cornerCenter
= segmentsIntr(e.middle[inter.index], e.middle[inter.index + 1], middle[middle.length - 1], middle[middle.length - 2]);
setInter(e, roadObj[j], inter.interP, cornerCenter, roadObj[j].width);
continue;
}
//右边路牙和下一条路的起始位置做对比 inter = judgeIntersect(e.right, middle[0], middle[1]);if(inter.flag) {
const cornerCenter
= segmentsIntr(e.middle[inter.index], e.middle[inter.index + 1], middle[0], middle[1]);
setInter(e, roadObj[j], inter.interP, cornerCenter, roadObj[j].width);
continue;
}
//右边路牙和下一条路的终止位置做对比 inter = judgeIntersect(e.right, middle[middle.length - 1], middle[middle.length - 2]);if(inter.flag) {
const cornerCenter
= segmentsIntr(e.middle[inter.index], e.middle[inter.index + 1], middle[middle.length - 1], middle[middle.length - 2]);
setInter(e, roadObj[j], inter.interP, cornerCenter, roadObj[j].width);
continue;
}
}
}
}
})
}
functionactionTemp(target, name, flag, moveName) {
const filter
= roadObj.filter(e => e.name === name)[0];

const carObject
= newMove({
name: moveName,
target: target,
roads: roadObj,
startPos: flag
? filter.left[0] : filter.right[0],
parks: parks
});
moveObj.push(carObject);
}
/**
* 加载行人和汽车
*/ functionloadBusAndPeople() {//加载汽车和公交车 const loader = newGLTFLoader();

const carId
=['car0','car2','car4','car5','bus','car3',
];
const roadIds
=['road_1','road_2','street_1','street_2','street_2','road_2',
];

carId.forEach((e, i)
=>{
loader.load(`.
/Objects/model/${e}.gltf`, (gltf) => { gltf.scene.scale.set(4, 4, 4);
scene.add(gltf.scene);
gltf.scene.name
=e;
actionTemp(gltf.scene, roadIds[i],
false, e);
});
})
}
/**
* 点击汽车驶离停车位
*/ functionaddClick() {
renderer.domElement.addEventListener(
'click', (event) =>{
const px
=event.offsetX;
const py
=event.offsetY;
const x
= (px / wWidth) * 2 - 1;
const y
= -(py / wHeight) * 2 + 1;//创建一个射线发射器 const raycaster = newTHREE.Raycaster();//.setFormCamera()计算射线投射器的射线属性ray //即在点击位置创造一条射线,被射线穿过的模型代表选中 raycaster.setFromCamera(newTHREE.Vector2(x, y), camera);

const intersects
= raycaster.intersectObjects(moveObj.map(e =>e.target));if (intersects.length > 0) {
const move
= moveObj.filter(e => e.name === intersects[0].object.parent.name || e.name === intersects[0].object.parent.parent.name)[0];if (move &&move.pause) {
move.unParkCar();
}
}
})
}

init();

控制器Main.js


import * as THREE from 'three';
import { getCurvePoint, getSidePoints, segmentsIntr, clone, isClockWise } from
'./Common.js';/**
* 移动类,实现物体如何按照路径运动以及在岔路口如何选择等功能
* 后期可以增加碰撞检测避让等功能
*/class Road {
constructor(props) {
//道路的原始点信息,通过这些点信息扩展道路 this.sourceCoord =props.sourceCoord;//道路名称 this.name =props.name;//道路宽度 this.width =props.width;//是否显示道路中心线 this.showCenterLine = props.showCenterLine === false ? false : true;//左侧路牙点集合 this.left =[];//道路中心线点集合 this.middle =[];//右侧路牙点集合 this.right =[];//道路面的颜色 this.planeColor = props.planeColor || 0x606060;//道路边线的颜色 this.sideColor = props.sideColor || 0xffffff;//道路中心线的颜色 this.middleColor = props.middleColor || 0xe0e0e0;//道路的层级 this.zIndex = props.zIndex || 0.5;//车道信息 this.lanes =[];//道路组合对象 this.group = null;//相交的道路名称 数据格式{name: ***, interPoint: [xx,xx,xx]} this.intersect =[];this.lineInsert();this.create();
}
/**
* 由于直线获取贝塞尔点的时候插入的点较少导致物体运动较快,所以在
* 平行与X、Y轴的线插入部分点,保证物体运动平滑,插入点时保证X或者Z轴间距为路宽的一半
*/lineInsert() {
const temp
=[];
const half
= this.width / 2;this.sourceCoord.forEach((cur, i) =>{
temp.push(cur);
if (i < this.sourceCoord.length - 1) {
const e
=cur.coord;
const nextP
= this.sourceCoord[i + 1].coord;//处理直线 if (cur.type === 1) {if (e[0] - nextP[0] === 0) {//平行Z轴 if (e[2] < nextP[2]) {for (let i = e[2] + half; i < nextP[2]; i +=half) {
temp.push({
coord: [e[
0], e[1], i],
type:
1});
}
}
else{for (let i = e[2] - half; i > nextP[2]; i -=half) {
temp.push({
coord: [e[
0], e[1], i],
type:
1});
}
}
}
else if (e[2] - nextP[2] === 0) {//平行X轴 if (e[0] < nextP[0]) {for (let i = e[0] + half; i < nextP[0]; i +=half) {
temp.push({
coord: [i, e[
1], e[2]],
type:
1});
}
}
else{for (let i = e[0] - half; i > nextP[0]; i -=half) {
temp.push({
coord: [i, e[
1], e[2]],
type:
1});
}
}
}
}
}
})
this.sourceCoord =temp;
}
/**
* 创建道路
*/create() {
const group
= newTHREE.Group();
const roadPoints
= this.getPoints(this.sourceCoord, this.width);this.left = roadPoints[0];this.middle = roadPoints[1];this.right = roadPoints[2];

const isWise
= isClockWise(this.left.concat(clone(this.right).reverse()));//添加左车道 this.lanes.push(newLane({
name: `${
this.name}_lane_0`,
type:
'left',
isReverse: isWise,
side:
this.left,
middle:
this.middle
}));
//添加右车道 this.lanes.push(newLane({
name: `${
this.name}_lane_1`,
type:
'right',
isReverse:
!isWise,
side:
this.right,
middle:
this.middle
}));

const outlinePoint
= roadPoints[0].concat(clone(roadPoints[2]).reverse());
outlinePoint.push(roadPoints[
0][0]);
const shape
= newTHREE.Shape();
outlinePoint.forEach((e, i)
=>{if (i === 0) {
shape.moveTo(e[
0], e[2], e[1]);
}
else{
shape.lineTo(e[
0], e[2], e[1]);
}
})
//创建道路面 const plane = newTHREE.Mesh(newTHREE.ShapeGeometry(shape),newTHREE.MeshBasicMaterial({
color:
this.planeColor,
side: THREE.DoubleSide
})
)
plane.rotateX(Math.PI
/ 2);
group.add(plane);
//创建道路边沿线 const sideL = newTHREE.Line(new THREE.BufferGeometry().setFromPoints(roadPoints[0].map(e => newTHREE.Vector3(...e))),new THREE.LineBasicMaterial({ color: this.sideColor, linewidth: 10})
);

const sideR
= newTHREE.Line(new THREE.BufferGeometry().setFromPoints(roadPoints[2].map(e => newTHREE.Vector3(...e))),new THREE.LineBasicMaterial({ color: this.sideColor, linewidth: 10})
);
group.add(sideL, sideR);
//创建道路中心虚线 if (this.showCenterLine) {
const sideM
= newTHREE.Line(new THREE.BufferGeometry().setFromPoints(roadPoints[1].map(e => newTHREE.Vector3(...e))),new THREE.LineDashedMaterial({ color: this.middleColor, linewidth: 10, gapSize: 5, dashSize: 5})
);
sideM.computeLineDistances();
group.add(sideM);
}

group.position.y
= group.position.y + this.zIndex;
group.name
= this.name;this.group =group;
}
/**
* 获取道路的顶点信息
*/getPoints(points) {
const half
= this.width / 2;//存储左中右三条线路的顶点信息 const [left, middle, right] =[[], [], []];for (let i = 0; i <points.length;) {
const e
=points[i].coord;if (points[i].type === 1) {//直线处理方式 if (i === 0) {
const nextP
= points[i + 1].coord;
const side
=getSidePoints(e, nextP, e, half);
left.push(side[
0]);
middle.push(e);
right.push(side[
1]);
}
else{
const preP
= points[i - 1].coord;
const side
=getSidePoints(preP, e, e, half);
left.push(side[
0]);
middle.push(e);
right.push(side[
1]);
}
i
++;
}
else{//曲线处理方式 const preMidP = points[i - 1].coord;
const nextMidP1
= points[i + 1].coord;
const nextMidP2
= points[i + 2].coord;//获取两侧点信息 const sideP1 =getSidePoints(preMidP, e, e, half);
const sideP2
=getSidePoints(nextMidP1, nextMidP2, nextMidP1, half);
const sideP3
=getSidePoints(nextMidP1, nextMidP2, nextMidP2, half);//左侧 const interLeft = segmentsIntr(left[left.length - 1], sideP1[0], sideP2[0], sideP3[0]);
const curveLeft
= getCurvePoint(sideP1[0], interLeft, sideP2[0]);
left.push(...curveLeft);
//中间 const interMid = segmentsIntr(middle[middle.length - 1], e, nextMidP1, nextMidP2);
const curveMid
=getCurvePoint(e, interMid, nextMidP1);
middle.push(...curveMid);
//右侧 const interRight = segmentsIntr(right[right.length - 1], sideP1[1], sideP2[1], sideP3[1]);
const curveRight
= getCurvePoint(sideP1[1], interRight, sideP2[1]);
right.push(...curveRight);
i
+= 2;
}
}
return[left, middle, right];
}
}
/**
* 车道对象
*/class Lane {
constructor(options) {
//车道名称 this.name =options.name;//标识左车道还是右车道 this.type =options.type;//行驶方向和点的方向是否一致 this.direction =options.direction;//车道中心线 this.coord = this.getCenter(options.middle, options.side, options.isReverse);
}
getCenter(middle, side, reverseFlag) {
const center
= middle.map((e, i) =>{return [(e[0] + side[i][0]) / 2, e[1], (e[2] + side[i][2]) / 2];
});
return reverseFlag ?center.reverse() : center;
}
}

export
default Road;

道路类Road.js


import * as THREE from 'three';
import { TextGeometry } from
'../TextGeometry.js';
import { FontLoader } from
'../FontLoader.js';/**
* 停车场类
*
*/class Parking {
constructor(props) {
//停车场的名称 this.name =props.name;//停车场的宽 this.width =props.width;//停车场的长 this.height =props.height;//停车场的中心位置 this.position =props.position;//停车场的顶点信息 this.coord =[];//停车场有时候需要根据不同的显示位置进行Z轴旋转 this.rotate =props.rotate;//停车位的入口点,主要确保车辆进出的方向 this.entryPoint =[];//车位已经停车标识 this.placed = false;this.create();
}
/**
* 创建道路
*/create() {
const points
=[new THREE.Vector3(-this.height / 2, this.width / 2, 0),new THREE.Vector3(-this.height / 2, -this.width / 2, 0),new THREE.Vector3(this.height / 2, -this.width / 2, 0),new THREE.Vector3(this.height / 2, this.width / 2, 0)
];
//停车场四周白色边线 const line = newTHREE.LineLoop(newTHREE.BufferGeometry().setFromPoints(points),new THREE.LineBasicMaterial({ color: 0xe0e0e0, linewidth: 1})
);
//停车场面 const plane = newTHREE.Mesh(new THREE.PlaneGeometry(-this.height, this.width),new THREE.MeshLambertMaterial({ color: 0xc0e9bf, side: THREE.DoubleSide })
);
//添加车位编号 const loader = newFontLoader();
loader.load(
'./Objects/helvetiker_regular.typeface.json', (font) =>{
const textGeo
= new TextGeometry(this.name, {
font: font,
size:
2,
height:
0});
textGeo.rotateZ(
-Math.PI / 2);
const text
= newTHREE.Mesh(
textGeo,
new THREE.MeshBasicMaterial({ color: 0xffffff})
);
text.position.x
= this.height * 3 / 8;
text.position.y
= this.name.length / 2;
text.position.z
= text.position.z + 1;
group.add(text);
});
const group
= newTHREE.Group();
group.add(line);
group.add(plane);
group.rotateX(
-Math.PI / 2);if (this.rotate) {
group.rotateZ(
this.rotate);
}
this.group =group;
group.position.set(...
this.position);
let beta
= 1;if (this.rotate ===Math.PI) {
beta
= -1;
}
this.entryPoint = [this.height * beta / 2 + this.position[0], this.position[1], this.position[2]];
}
}

export
default Parking;

停车位类Parking.js


import * as THREE from 'three';
import { calDistance, drawLine, clone, pointInPolygon, segmentsIntr } from
'./Common.js';/**
* 移动类,实现物体如何按照路径运动以及在岔路口如何选择等功能
* 后期可以增加碰撞检测避让等功能
*/class Move {
constructor(props) {
//所有道路信息 this.roads ={};
props.roads.forEach(e
=> this.roads[e.name] =e);this.name =props.name;//物体对象 this.target =props.target;//前进还是倒车 1:前进;-1:倒车 this.direction = 1;//物体初始位置 this.startPos =props.startPos;//移动轨迹 this.trace =[];//当前形式的道路 this.curRoad = null;//锁定下一步行动趋势,主要是为了解决在路口物体获取到下一步行动后再次获取行动导致功能乱掉 this.trendLock = false;this.preRoadName = null;//当前移动所在的车道 this.curLane = null;//是否暂停移动 this.pause = false;//园区停车场信息 this.parks =props.parks;this.parkObj = null;this.init();
}
/**
* 获取当前实体所在道路以及移动方向和移动轨迹
*/init() {
let minDis
={
roadName:
'', //起始位置所在道路的道路名 distance: 100000, //起始物体与车道左右边线的最小距离 curLane: null, //移动的车道 };for (let o in this.roads) {
const road
= this.roads[o];
const startLeftDis
= calDistance(road.lanes[0].coord[0], this.startPos);
const startRightDis
= calDistance(road.lanes[1].coord[0], this.startPos);
const endLeftDis
= calDistance(road.lanes[0].coord[road.lanes[0].coord.length - 1], this.startPos);
const endRightDis
= calDistance(road.lanes[1].coord[road.lanes[1].coord.length - 1], this.startPos);
const min
=Math.min(startLeftDis, startRightDis, endLeftDis, endRightDis);if (minDis.distance >min) {
minDis
={
roadName: o,
distance: min,
curLane: (min
=== startRightDis || min === endRightDis) ? road.lanes[0] : road.lanes[1],
index: (startLeftDis
=== min || startRightDis === min) ? 0 : road.left.length - 1};
}
}
this.curLane =minDis.curLane;this.curRoad = this.roads[minDis.roadName];this.trace = this.getSpreadPoint(this.curLane.coord);this.moveIndex =minDis.index;this.move();
}
/**
* 获取车道中心线
* 将车道的左右线取平均值即可
* @param {*} params
*/getLaneCenter(points1, points2) {return points1.map((e, i) =>{return [(e[0] + points2[i][0]) / 2, e[1], (e[2] + points2[i][2]) / 2];
})
}

move() {
if (!this.pause) {this.checkMoveOutCorner();this.parkCar();//如果移动到当前轨迹的最后一个点时,需要进行掉头重新寻找轨迹 const forks = this.checkFork();if (forks.length !== 0) {this.getRandomTrend(forks);
}
else if (this.moveIndex === this.trace.length - 1) {this.getTurnPoint();
}
const curPosition
= this.trace[this.moveIndex].toArray();this.target.position.set(...curPosition);if (this.direction === 1) {//前进 if (this.moveIndex !== this.trace.length - 1) {this.target.lookAt(...this.trace[this.moveIndex + 1].toArray());
}
}
else{//倒车 if (this.moveIndex !== 0) {this.target.lookAt(...this.trace[this.moveIndex - 1].toArray());
}
}
this.moveIndex ++;
}
requestAnimationFrame(
this.move.bind(this));
}
/**
* 在锁定后检查物体是否已经移出路口,如果移出则解除锁定
*/checkMoveOutCorner() {if (!this.trendLock) {return false;
}
const preObj
= this.curRoad.intersect.filter(e => e.name === this.preRoadName)[0];if (preObj && !pointInPolygon(this.trace[this.moveIndex], preObj.corner)) {this.trendLock = false;this.preRoadName = null;
}
return this.trendLock;
}
/**
* 根据提供的点信息寻找最近的点值
*/findNearIndex(points, pointA) {
let min
= 100000;
let index
= -1;for (let i = 0; i < points.length; i++) {
const dis
=calDistance(points[i], pointA);if (dis <min) {
min
=dis;
}
else{
index
= i - 1;break;
}
}
returnindex;
}
/**
* 在岔路口时随机获取下一步行动方向
*/getRandomTrend(forks) {
const isEnd
= calDistance(this.trace[this.moveIndex].toArray(), this.trace[this.trace.length - 1].toArray()) < this.curRoad.width / 2;//从多条路中随机选择一条,当前园区路况简单 路口数据目前只有一条 const randomRoad = forks[Math.floor(Math.random() *forks.length)];//分别代表掉头、转弯、直行四种情况 let types = [0, 1, 2];if(isEnd) {//如果是道路的尽头 可以选择掉头或者转弯 types = [0, 1, 2];
}
else{//如果不是道路的尽头,可以选择转弯或者直行 types = [1, 2];
}

const random
= types[Math.floor(Math.random() *types.length)];if (random === 0) {//掉头 this.trendLock = true;this.getTurnPoint();
}
else if (random === 1) {//转弯 this.trendLock = true;this.getForkPoint(randomRoad, isEnd);
}
else if (random === 2) {this.preRoadName =randomRoad;//直线 this.trendLock = true;
}
}
/**
* 在岔路口时根据获取道路轨迹信息
*/getForkPoint(name, isEnd) {this.preRoadName = this.curRoad.name;
const roadObj
= this.roads[name];
let splitPoint
=[];if(isEnd) {//如果在道路尽头转弯,随机产生是左侧还是右侧道路 const leftOrRight = Math.floor(Math.random() *roadObj.lanes.length);
const coord
=roadObj.lanes[leftOrRight].coord;this.curLane =roadObj.lanes;
const index
= this.findNearIndex(coord, this.trace[this.moveIndex].toArray());
splitPoint
= coord.slice(index + 1);//为了平滑过渡获取道路末端和当前行驶位置的交点 const corInter = segmentsIntr(splitPoint[0], splitPoint[1], this.trace[this.trace.length - 2].toArray(), this.trace[this.trace.length - 3].toArray());if(corInter) {
splitPoint.unshift(corInter);
}
splitPoint.unshift(
this.trace[this.moveIndex]);
}
else{//转弯前需要判断当前路可以向那个方向转弯,比如临近路口只能转向右车道,非临近路口需要转到对象车道 //可以根据当前点和车道点的距离来判断 const lane1Dis = calDistance(roadObj.lanes[0].coord[0], this.trace[this.moveIndex].toArray());
const lane2Dis
= calDistance(roadObj.lanes[1].coord[0], this.trace[this.moveIndex].toArray());
let temp
= null;if (lane1Dis <lane2Dis) {
temp
= clone(roadObj.lanes[0].coord);this.curLane = roadObj.lanes[0];
}
else{
temp
= clone(roadObj.lanes[1].coord);this.curLane = roadObj.lanes[1];
}
this.curRoad =roadObj;
const index
= this.findNearIndex(temp, this.trace[this.moveIndex].toArray());
splitPoint
= temp.slice(index + 1);//为了平滑过渡获取道路末端和当前行驶位置的交点 const corInter = segmentsIntr(splitPoint[0], splitPoint[1], this.trace[this.moveIndex].toArray(), this.trace[this.moveIndex + 1].toArray());if(corInter) {
splitPoint.unshift(corInter);
}
splitPoint.unshift(
this.trace[this.moveIndex]);
}
this.trace = this.getSpreadPoint(splitPoint).map(e => newTHREE.Vector3(...e));//drawLine(this.target.parent, this.trace); this.moveIndex = 0;
}
/**
* 掉头后获取道路轨迹信息
*/getTurnPoint() {
const roadObj
= this.curRoad;
const nextLane
= roadObj.lanes.filter(e => e.name !== this.curLane.name)[0]
const clonePoint
=clone(nextLane.coord);
clonePoint.unshift(
this.trace[this.moveIndex].toArray());this.trace = this.getSpreadPoint(clonePoint);this.curLane =nextLane;//drawLine(this.target.parent, this.trace); this.moveIndex = 0;
}
/**
* 获取离散点即将点与点之间更加细化,使物体运行起来更加平滑
* @param {*} points
* @returns
*/getSpreadPoint(points, beta= 1) {
const trail
= new THREE.CatmullRomCurve3([...points.map(e => newTHREE.Vector3(...e))]);return trail.getPoints(trail.getLength() *beta);
}
/**
* 将车辆停入停车场内
*/parkCar() {//是否允许停车 if (this.parkEnable) {return;
}
if (this.parkObj) {if (this.direction === -1 && this.moveIndex === this.trace.length - 2) {this.pause = true;this.parkObj.object.placed = true;
}
if (this.moveIndex === this.trace.length - 1) {this.direction = -1;this.trace = this.parkObj.backTrace;//drawLine(this.target.parent, this.trace); this.moveIndex = 0;
}
else{return;
}
}
else{
let flag
= false;
let parkObj
= null;
const frontIndex
= 13;for (let i = 0; i < this.parks.length; i++) {if (calDistance(this.parks[i].entryPoint, this.trace[this.moveIndex].toArray()) < 13 && !this.parks[i].placed) {
flag
= true;
parkObj
= this.parks[i];break;
}
}
//return flag; if(flag) {
const random
= Math.floor(Math.random() * 2);//1代表停车 if (random === 1) {
const front
= this.trace.slice(this.moveIndex, this.moveIndex + frontIndex).map(e => newTHREE.Vector3(...e));
const back
= this.getSpreadPoint([front[front.length - 1], parkObj.entryPoint, parkObj.position]);this.moveIndex = 0;this.trace =front;this.parkObj ={ object: parkObj, backTrace: back };
}
}
}
}

unParkCar() {
this.direction = 1;
const parkObj
= this.parkObj.object;
parkObj.placed
= false;
const index
= this.findNearIndex(this.curLane.coord, parkObj.entryPoint);
const temp
= this.curLane.coord.slice(index);
temp.unshift(parkObj.entryPoint);
temp.unshift(parkObj.position);
this.trace = this.getSpreadPoint(temp);this.moveIndex = 0;this.pause = false;this.parkObj = null;this.parkEnable = true;//不想再扩展功能了 就在这临时用标识符处理了下出库时不断循环查询车位的情况 setTimeout(() =>{this.parkEnable = false;
},
3000)
}
/**
* 检查岔路口
*/checkFork() {if (this.trendLock) {return[];
}
const forks
=[];for (let i = 0; i < this.curRoad.intersect.length; i++) {if (this.moveIndex < this.trace.length - 1) {
const dis1
= calDistance(this.trace[this.moveIndex].toArray(), this.curRoad.intersect[i].interPoint);
const dis2
= calDistance(this.trace[this.moveIndex + 1].toArray(), this.curRoad.intersect[i].interPoint);if (dis1 < this.curRoad.width && dis1 >dis2) {
forks.push(
this.curRoad.intersect[i].name);
}
}
}
returnforks;
}
}

export
default Move;

移动类Move.js

1、准备材料

开发板(
正点原子stm32f407探索者开发板V2.4

STM32CubeMX软件(
Version 6.10.0

野火DAP仿真器

keil µVision5 IDE(
MDK-Arm

ST-LINK/V2驱动

一台示波器

逻辑分析仪
nanoDLA

2、实验目标

使用STM32CubeMX软件配置STM32F407开发板的
DAC OUT1实现输出0-3.3V 周期为12.8ms的正弦波形

3、实验流程

3.0、前提知识

由于STM32F407的两个DAC输出通道只能自动生成三角波和噪声波,因此如果想要输出其他的波形可以自己手动定义一个周期内DAC要输出的值,并选择定时器的更新事件作为DAC输出的触发源按顺序输出

这样按照波形采样值的顺序,在每一个触发源到来的时候,手动指定DAC将要输出的值,理论上就可以输出任何我们想要输出的波形,比如正弦波,
本实验将以正弦波为例,讲解如何通过DAC的DMA输出正弦波型

当DAC参考电源引脚VREF+接VDDA(3.3V)时,可设置的DAC输出寄存器值范围为0~4095,而DAC的输出范围为0-3.3V
,要输出的正弦波sin(x)波形幅值范围为-1~1,因此可以对该波形做一些平移伸缩,将其幅值范围缩放到DAC设置范围0~4095内,变换后的正弦波公式为:y=2047*(sin(x)+1)

在该正弦波形的一个周期0-2pi内平均取128个采样点,然后按照时间先后顺序定义在数组中,每当0.1ms触发源到来的时候,我们就递归的从数组中取出一个值将其设置为DAC的输出值,直到128个采样点全部设置完毕,然后再反复从第一个重新设置,这样就可以大致实现正弦波型

因为需要频繁的从内存取出数据然后写入DAC外设,因此这里比较合适的做法是使用DMA的方式进行
,通过上述设置的DAC输出的正弦波形的周期应该为0.1ms*128=12.8ms,正弦波的幅值范围应该为0-3.3V

3.1、CubeMX相关配置

3.1.0、工程基本配置

打开STM32CubeMX软件,单击ACCESS TO MCU SELECTOR选择开发板MCU(选择你使用开发板的主控MCU型号),选中MCU型号后单击页面右上角Start Project开始工程,具体如下图所示

开始工程之后在配置主页面System Core/RCC中配置HSE/LSE晶振,在System Core/SYS中配置Debug模式,具体如下图所示

详细工程建立内容读者可以阅读“
STM32CubeMX教程1 工程建立

3.1.1、时钟树配置

系统时钟使用8MHz外部高速时钟HSE,HCLK、PCLK1和PCLK2均设置为STM32F407能达到的最高时钟频率,具体如下图所示

3.1.2、外设参数配置

在Pinout & Configuration页面左边功能分类栏目Analog中单击其中DAC

在Mode中勾选OUT1 Configuration

选择TIM6的外部触发事件作为DAC OU1输出的触发源
,不选择波形生成模式,因为本实验要生成自定义波形

具体配置如下图所示

单击Configuration中的DMA Settings选项卡对DAC的DMA请求进行设置

单击ADD按键增加DMA请求,这里可选的只有一个DAC1

选择想要使用的DMA Stream,并设置优先级,将DMA请求模式设置为循环模式,外设地址不增加,内存地址递增,数据宽度选择字Word

上述配置如下图所示

3.1.3、外设中断配置

在Pinout & Configuration页面左边System Core/NVIC中
勾选DMA1 Stream5 全局中断
,然后选择合适的中断优先级即可

3.2、生成代码

3.2.0、配置Project Manager页面

单击进入Project Manager页面,在左边Project分栏中修改工程名称、工程目录和工具链,然后在Code Generator中勾选“Gnerate peripheral initialization as a pair of 'c/h' files per peripheral”,最后单击页面右上角GENERATE CODE生成工程,具体如下图所示

详细Project Manager配置内容读者可以阅读“
STM32CubeMX教程1 工程建立
”实验3.4.3小节

3.2.1、外设初始化调用流程

在生成的工程代码主函数中新增了MX_DMA_Init()函数,该函数对DAC使用的DMA1时钟使能,由于启用了该DMA的中断,因此还对中断优先级及使能进行了配置,如下图所示

DAC的初始化调用流程与“
STM32CubeMX教程16 DAC - 输出3.3V内任意电压
”实验一致,只是因为本实验配置了DMA,因此在HAL_DAC_MspInit()函数中增加了对使用的DAC1 DMA请求的相关配置代码,如下图所示

3.2.2、外设中断调用流程

DMA全局中断事件回调函数为一个函数指针,当使用HAL_DAC_Start_DMA()函数启动DAC传输时,会将DMA全局中断事件回调函数指针指向具体的函数,这里指向了DAC_DMAConvCpltCh1()函数

在DAC_DMAConvCpltCh1()函数中最终调用了DAC OU1 DMA传输完成中断回调函数HAL_DAC_ConvCpltCallbackCh1()
,该函数为虚函数,需要用户重新实现

启用DMA的外设中断调用流程可参考“
STM32CubeMX教程12 DMA 直接内存读取
”实验3.2.2、外设中断调用流程小节,上述具体过程如下图所述

3.2.3、添加其他必要代码

采集正弦波y=2047*(sin(x)+1)的一个周期2pi内n个采样点,并将其定义在一个uint32_t 数组中,笔者这里定义了128个采样点

为什么非要正弦波函数为y=2047*(sin(x)+1)?

因为DAC的输出范围为0~4095,而sin(x)的输出范围为-1~1,因此需要采集的正弦波采样点最好缩放到0-4095范围,这样输出的波形更好显示

源代码如下所示
(注释1)

/*正弦波数据,12bit,1个周期128个点, 0-4095之间变化*/
const uint32_t userWave[] = 
{
    2047,   2147,	2248,   2347,	2446,	2544,	2641,	2737,
    2830,   2922,	3012,	3099,	3184,	3266,	3346,	3422,
    3494,   3564,	3629,	3691,	3749,	3803,	3852,	3897,
    3938,   3974,	4006,	4033,	4055,	4072,	4084,	4092,
    4094,   4092,	4084,	4072,	4055,	4033,	4006,	3974,
    3938,   3897,	3852,	3803,	3749,	3691,	3629,	3564,
    3494,   3422,	3346,	3266,	3184,	3099,	3012,	2922,
    2830,   2737,	2641,	2544,	2446,	2347,	2248,	2147,
    2047,   1947,	1846,	1747,	1648,	1550,	1453,	1357,
    1264,   1172,	1082,	995 ,	910 ,	828 ,	748 ,	672 ,
    600 ,   530 ,	465 ,	403 ,	345 ,	291 ,	242 ,	197 ,
    156 ,   120 ,	88  ,	61  ,	39  ,	22  ,	10  ,	2   ,
    0   ,   2   ,	10  ,	22  ,	39  ,	61  ,	88  ,	120 ,
    156 ,   197 ,	242 ,	291 ,	345 ,	403 ,   465,	530 ,
    600 ,   672 ,	748 ,	828 ,	910 ,	995 ,	1082,	1172,
    1264,   1357,	1453,   1550,	1648,	1747,	1846,	1947
};

在dac.c中重新实现DAC OU1 DMA传输完成中断回调函数HAL_DAC_ConvCpltCallbackCh1(),源代码如下

/*DAC OUT1 DMA传输完成中断回调函数*/
void HAL_DAC_ConvCpltCallbackCh1(DAC_HandleTypeDef *hdac)
{
    /*翻转RED_LED引脚状态*/
    HAL_GPIO_TogglePin(RED_LED_GPIO_Port,RED_LED_Pin);
}

在main.c文件主函数main中以DMA方式启动DAC输出,源代码如下

HAL_DAC_Start_DMA(&hdac,DAC_CHANNEL_1,userWave,128,DAC_ALIGN_12B_R);
HAL_TIM_Base_Start(&htim6);

4、常用函数

/*以DMA启动DAC输出*/
HAL_StatusTypeDef HAL_DAC_Start_DMA(DAC_HandleTypeDef *hdac, uint32_t Channel, const uint32_t *pData, uint32_t Length,uint32_t Alignment)
 
/*停止以DMA启动的DAC输出*/
HAL_StatusTypeDef HAL_DAC_Stop_DMA(DAC_HandleTypeDef *hdac, uint32_t Channel)
 
/*DAC OUT1 DMA传输完成时间中断回调函数*/
void HAL_DAC_ConvCpltCallbackCh1(DAC_HandleTypeDef *hdac)

5、烧录验证

烧录程序,单片机上电后,将示波器的探头挂钩与DAC OUT1引脚PA4相连接,接地环与开发板上的GND引脚连接,将示波器每格电压幅值调节为1.00V,将每格子采集时间调节为10ms,然后开启示波器对DAC OU1输出的波形采集

因为定时器溢出时间为0.1ms,而DMA传输的数据为正弦波一个周期内的128个样本点,因此生成的正弦波周期为128*0.1ms=12.8ms,这与示波器采集到的正弦波波形周期一致,如下图所示为示波器采集到的正弦波形

在DAC OUT1 DMA传输完成时间中断回调函数中翻转了RED_LED(PF9)引脚的状态,经过上述分析知道,传输完成一次所需事件应该为输出正弦波形的周期,也即12.8ms,因此使用逻辑分析仪器采集PF9引脚的状态,发现PF9引脚确实12.8ms翻转一次状态,逻辑分析仪采集的波形如下图所示

6、注释详解

注释1
:正弦波数组定义来源
DAC输出正弦波帖子

更多内容请浏览
STM32CubeMX+STM32F4系列教程文章汇总贴

深入了解Redis数据结构

Redis,作为一种高性能的内存数据库,支持多种数据结构,从简单的字符串到复杂的哈希表。在这篇博文中,我们将深入探讨Redis的一些主要数据结构,并通过详细的例子展示它们的使用。

1. 字符串 (String)

1.1 存储和获取

Redis中的字符串是二进制安全的,可以存储任何数据。让我们通过一个简单的例子来演示:

# 存储字符串
SET my_key "Hello, Redis!"

# 获取字符串
GET my_key

在这个例子中,我们使用
SET
命令将字符串"Hello, Redis!"存储在
my_key
中,并通过
GET
命令获取它。

1.2 字符串操作

Redis提供了丰富的字符串操作,比如拼接、截取等。让我们看一个例子:

# 拼接字符串
APPEND my_key ", How are you?"

# 获取更新后的字符串
GET my_key

在这里,我们使用
APPEND
命令将", How are you?"拼接到之前的字符串后面。

2. 列表 (List)

2.1 添加和获取元素

列表是一个有序的字符串元素集合。我们可以使用
LPUSH

LRANGE
来添加和获取元素:

# 添加元素到列表的头部
LPUSH my_list "Apple"
LPUSH my_list "Banana"
LPUSH my_list "Orange"

# 获取列表的元素
LRANGE my_list 0 -1

在这个例子中,我们通过
LPUSH
命令将"Apple"、"Banana"和"Orange"添加到
my_list
的头部,并通过
LRANGE
命令获取整个列表。

2.2 列表操作

Redis提供了许多列表操作,比如裁剪、弹出等。让我们看一个例子:

# 裁剪列表,保留前两个元素
LTRIM my_list 0 1

# 弹出列表的最后一个元素
RPOP my_list

# 获取更新后的列表
LRANGE my_list 0 -1

在这里,我们使用
LTRIM
命令裁剪列表,保留前两个元素,然后使用
RPOP
命令弹出最后一个元素。

3. 集合 (Set)

3.1 添加和获取元素

集合是一个无序、唯一元素的集合。我们可以使用
SADD

SMEMBERS
来添加和获取元素:

# 添加元素到集合
SADD my_set "Red"
SADD my_set "Green"
SADD my_set "Blue"

# 获取集合的所有元素
SMEMBERS my_set

在这个例子中,我们通过
SADD
命令将"Red"、"Green"和"Blue"添加到
my_set
,并通过
SMEMBERS
获取所有元素。

3.2 集合操作

Redis支持多种集合操作,比如交集、并集等。让我们看一个例子:

# 添加另一个集合
SADD my_set_2 "Green"
SADD my_set_2 "Yellow"

# 计算集合的交集
SINTER my_set my_set_2

在这里,我们通过
SINTER
命令计算
my_set

my_set_2
的交集。

4. 有序集合 (Sorted Set)

4.1 添加和获取元素

有序集合是一种集合,其中的每个元素都关联了一个分数,这使得我们可以按照分数排序元素。下面是一个示例:

# 向有序集合添加元素
ZADD my_zset 1 "Apple"
ZADD my_zset 2 "Banana"
ZADD my_zset 3 "Orange"

# 获取有序集合的所有元素
ZRANGE my_zset 0 -1 WITHSCORES

在这个例子中,我们使用
ZADD
命令向
my_zset
添加了三个元素,并通过
ZRANGE
命令获取所有元素及其分数。

4.2 有序集合操作

我们可以执行许多操作,例如查找特定排名范围的元素,或根据分数范围来查询元素。例如:

# 根据分数范围获取元素
ZRANGEBYSCORE my_zset 1 2

# 获取特定元素的排名
ZRANK my_zset "Banana"

5. 哈希 (Hash)

5.1 添加和获取元素

哈希是一种键值对集合,非常适合存储对象。以下是一个示例:

# 向哈希添加数据
HSET my_hash name "Alice"
HSET my_hash age "30"
HSET my_hash city "New York"

# 获取哈希中的所有键值对
HGETALL my_hash

在这个例子中,我们使用
HSET
命令向
my_hash
中添加了三个键值对,并用
HGETALL
获取了所有键值对。

5.2 哈希操作

哈希结构提供了丰富的操作,比如只获取所有的键或值,或者删除特定的键。例如:

# 获取所有键
HKEYS my_hash

# 获取所有值
HVALS my_hash

# 删除一个键
HDEL my_hash name

6. HyperLogLog

6.1 添加元素

HyperLogLog 是用于估计基数(集合中不重复元素的数量)的数据结构。下面是一个示例:

# 添加元素到 HyperLogLog
PFADD my_hyperloglog "Apple"
PFADD my_hyperloglog "Banana"
PFADD my_hyperloglog "Orange"

在这个例子中,我们使用
PFADD
命令向
my_hyperloglog
添加了三个元素。

6.2 估算基数

HyperLogLog 提供了估算基数的功能:

# 估算基数
PFCOUNT my_hyperloglog

这个命令返回 HyperLogLog 中不同元素的估算数量。

HyperLogLog 在处理大型数据集时非常有用,因为它能够以固定的内存消耗来估算基数,而不需要存储所有元素。

7. Bitmaps

7.1 设置和获取位

Bitmaps 是一种位图数据结构,可以用于存储和处理位信息。下面是一个简单的示例:

# 设置位
SETBIT my_bitmap 0 1
SETBIT my_bitmap 2 1

# 获取位的值
GETBIT my_bitmap 0
GETBIT my_bitmap 1

在这个例子中,我们使用
SETBIT
命令设置了位,然后使用
GETBIT
命令获取了相应位的值。

7.2 位操作

Bitmaps 还支持位操作,例如按位与、按位或、按位异或等:

# 按位与
BITOP AND result_bitmap my_bitmap1 my_bitmap2

# 按位或
BITOP OR result_bitmap my_bitmap1 my_bitmap2

# 按位异或
BITOP XOR result_bitmap my_bitmap1 my_bitmap2

这些位操作可以用于处理多个位图之间的关系。

Bitmaps 在一些场景下非常有用,例如统计用户的在线状态、记录用户的行为等。使用 Bitmaps 可以在占用较少内存的情况下高效地处理大量位信息。

8. Streams

8.1 添加消息

Streams 是一种日志数据结构,允许你按时间顺序添加、读取和消费消息。以下是一个简单的示例:

# 添加消息到 Stream
XADD mystream * name John age 30

# 添加另一条消息
XADD mystream * name Jane age 25

在这个例子中,我们使用
XADD
命令向名为
mystream
的 Stream 添加了两条消息。

8.2 读取消息

可以使用
XRANGE
命令按范围读取消息:

# 读取所有消息
XRANGE mystream - +

这将返回
mystream
中的所有消息。

Streams 在处理事件日志、消息队列等场景中非常有用,因为它允许按时间顺序组织和检索消息。

9. Geospatial 数据结构

9.1 添加地理位置

Geospatial 数据结构可以用来存储地理位置的信息,比如经度和纬度。以下是一个简单的示例:

# 添加地理位置信息
GEOADD locations 13.361389 38.116666666 "Palermo"
GEOADD locations 15.087269 37.502669 "Catania"

在这个例子中,我们使用
GEOADD
命令添加了两个地理位置信息,分别是 "Palermo" 和 "Catania"。

9.2 查询附近的位置

可以使用
GEODIST
命令计算两个位置之间的距离,或者使用
GEORADIUS
命令查找附近的位置:

# 计算两个位置之间的距离
GEODIST locations "Palermo" "Catania" km

# 查找附近的位置
GEORADIUS locations 15 37 100 km

这些命令使得在地理信息系统中进行位置相关的操作变得非常方便。

结语

通过这些详细的例子,我们深入了解了Redis的数据结构。当我们在实际项目中选择合适的数据结构时,这些例子将为我们提供有力的指导。希望这篇博文对你加深对Redis数据结构的理解有所帮助。如果你有其他关于Redis的问题,欢迎留言讨论!

原文链接:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage

译者:Kafka KIP-405是一篇非常优秀的多层存储的设计稿,不过此设计稿涉及内容很多,文章量大、严谨、知识点诸多。我们国内还没有对其有相对完整的译文,面对如此上乘的文章,译者想降低其门槛,让国内更多的人了解其设计,因此花费了诸多时间精力将此文进行了全文翻译,同时有一些可能让人产生疑惑的技术细节,译者也都打上了注释,希望可以帮助更多的人。当然如果有一些Kafka基础,且英文阅读流畅的话,译者还是建议去看原文

背景

Kafka是基础数据重要的组成部分,并且已经得到用户广泛的认可,增长势头迅猛。随着集群规模的增加,越来越多的数据将会被存储在Kafka上,其消息的保留时长、集群的弹缩、性能以及运维等日益变得越来越重要

Kafka采用append-only的日志追加模式,将数据存在在本地磁盘中。消息保留时长通过配置项
log.retention
来进行控制,既可以设置全局层面的,同时也可以设计某个topic维度的。消息保留时长能否确保数据持久化不丢失,即便是consumer短暂性宕机或不可用,当其成功重启后,只要时间没有超过
log.retention
,消息依旧能够读取

总的消息的存储量,与topic/partition数量、消息存储速率、消息保留时长相关,一个Kafka的Broker通常在本地磁盘上存储了大量的数据,例如10TB,这种大量本地存储的现象给Kafka的维护带来了巨大挑战

Kafka作为一种长期的存储服务

Kafka的普及率越来越高,也逐渐成为了很多数据的入口。它会将数据持久化下来,因此允许用户进行一些非实时的消费操作。很多用户
因为Kafka协议的简单以及消费者API的广泛采用,且允许用户将数据保留很长一段时间,这些特性都有助于Kafka日益成为了数据的source of data(SOT)

目前,Kafka一般会配置一个较短的保留时长(例如3天),然后更老的数据可以通过数据管道拷贝至更具弹缩能力的外部存储(例如HDFS)以便长期使用,结果就是客户端需要建立2种机制去读取数据,相对新的数据读取Kafka,老数据则读取HDFS

Kafka存储的提高,一般是依赖增加更多的Broker节点来实现的,但是这样同样也会导致新增了更多的内存+cpu,相对比可弹缩的外部存储来讲,这样无疑是增加了全局的开销,并且一个很多节点的集群同样增加了运维、部署的难度

Kafka本地存储以及维护的复杂性

当Kafka的一个broker坏掉了,将会用一个新的broker来替代,然后这个新节点必须从其他节点上拉取旧节点的全量数据。
同样,当新添加一个broker来横向扩展集群存储时,集群的rebalance会为新节点分配分区,这同样需要复制大量的数据。恢复及rebalance的耗时与kafka broker上的数据量呈正相关。许多多broker的集群(例如100个broker),节点故障是非常常见的情况,在恢复过程中消耗了大量的时间,这使得运维操作变得非常困难

减少每个broker上的存储数据量能够减少recovery及rebalance时间,但是这样操作的话同样需要减少消息的保留时长,这样就使得Kafka可提供的消息回溯时间变得更少

Kafka上云

本地部署的Kafka一般都会使用多个具备硬件SKU的高容量磁盘,从而最大程度提高I/O的吞吐量。而在云上,具有类似SKU的本地磁盘,要么不可用,要么非常昂贵。如果Kafka能够使用容量较小的SKU作为本地存储,那么它就更适合上云

解决方案 - Kafka分层存储

Kafka数据主要以流式方式使用尾部读取来进行消费,提供读取的层,一般都是操作系统的Page Cache,而不是穿透到磁盘。而旧的数据一般是为了回溯或者是因为consumer故障后重启后读取的,而这种情况一般不太常见。

在分层存储方法中,Kafka集群配置有两层存储:本地和远程(local and remote)。本地存储层与当前的Kafka相同,使用Kafka Broker上的本地磁盘来存储日志段。而新的远端存储层则使用一些外部存储,例如HDFS或者S3来实现。不同的存储层使用不同的日志过期时间。当开启远程存储时,本地消息的保留时长将会从几天缩短至几小时,而远端存储的消息保留时长则可能会保留更长的时间,例如几周甚至几个月。当本地日志段发生了滚动 (
译者:这里所谓的滚动rolled,可以简单理解为某个日志段写满1G了,即数据已经不会再发生变化了
),它可能就会被拷贝至远端存储,当然包含日志段相关的索引文件。这样即便是延迟敏感的数据也能获得高效的消费,因为数据都是尾部读取,且数据都会高概率命中page cache。而那些读取历史消息,或者对消息进行回溯的场景,很有可能数据已经不在本地存储了,那么它们将会去远端存储上读取

此解决方案允许在Kafka集群扩容存储时,将不再依赖于内存和CPU,使Kafka成为一个长期存储的解决方案。同时也减少了每个broker上本地存储的数据量,从而减少了集群recovery及rebalance时需要复制的数据量。broker不需要恢复远程存储层中的日志段,也不存在惰性恢复,而是远程存储层直接提供服务。这样,增加消息保留时长就不需要再扩展Kafka集群的broker数量了,同时消息总体的保留时长还可以更长,不用像当前很多集群部署的策略,需要启动一个单独的管道,将数据从Kafka拷贝至外部存储了

Goals

通过将旧数据存储在外部存储(如HDFS或S3)中,实现了将Kafka的存储扩展到了集群之外,不过Kafka的内部的协议不能有太大的变动。对于那么没有启用分层存储功能的现有用户,Kafka各类行为及操作复杂性决不能改变

Non-Goals

  • 分层存储不能取代ETL管道任务。现有的ETL管道继续按原样消费Kafka的数据,尽管Kafka有更长的消息保留时长
  • 二级存储不适用于compact类型的topic。即便是将compact类型的topic的配置项
    remote.storage.enable
    设置为true,也不能将其类型由delete改为compact
  • 二级存储不支持JBOD特性

变更

高层设计

RemoteLogManager
(RLM) 是一个新引入的组件:

  • 处理leader变更、topic partition删除等回调事件
  • 可插拔的存储管理器(即
    RemoteStorageManager
    )将处理segments的copy、read、delete事件,且其需要
    维护远端segments日志段的元数据
    (它需要知道哪些segments存储在了远端)

RemoteLogManager
是一个内部组件,不会向外暴露API

RemoteStorageManager
本身是一个接口,它定义了远端日志段及索引的生命周期。具体细节下文还会说明,我们将提供一个简单的RSM的实现来帮助大家更好的理解它。而诸如HDFS或者S3的实现应该放在他们产品的仓库中,Apache Kafka自身的仓库不会包含其具体的实现。这个设计与Kafka connnector保持一致

译者:其实这里本质上Kafka定义了一套多层存储的规范。突然想起一句话:普通的软件在编码,上流的软件在设计,顶级的软件在定义规范

RemoteLogMetadataManager
本身也是个接口,它同样定义了具有强一致语义的远端元数据的生命周期。它的默认实现是一个kafka系统内部的topic,用户如果需要使用其他远程存储介质来存储元数据的话,需要自己去扩展它

RemoteLogManager (RLM)

RLM为leader及follower启动了很多任务,具体解析可见下文

  • RLM Leader 职责
    • 它会不断地检查非active状态的LogSegments(这些LogSegments中最大的offset需要严格小于LSO,才能进行拷贝),然后将这些LogSegments及索引文件(offset/time/transaction/producer-snapshot)、leader epoch均拷贝至远端存储层
    • 提供从远端存储层查询旧数据的服务(当查询的数据在local log存储中没有时)
    • 即便是local存储已经不足(或存储的日志已经超时
      ?这里存疑
      ),也要先将日志段LogSegments拷贝至远端后,再删除
  • RLM Follower 职责
    • 通过访问RemoteLogMetdataManager来获取远端存储的log及index数据
    • 同时,它也会提供从远端存储层查询旧数据的服务

RLM提供了一个本地的有界缓存(可能是LRU淘汰策略)来存储远端的索引文件,这样可避免频繁的访问远端存储。它们存储在
log dir
目录下的
remote-log-index-cache
子目录,这些索引可以像local索引一样使用,用户可以通过设置配置项
remote.log.index.file.cache.total.size.mb
来设定此缓存的上限

在早期的设计中,还包含了通过远端存储的API拉取
LogSegments元数据的章节,(
译者:这应该是曾经讨论的某次中间版本
)它在HDFS接入时,看起来一切运行的很好。依赖远端存储来维护元数据的问题之一是:整个分层存储是需要强一致性的,它不仅影响元数据,还影响Segments日志段数据本身。其次也要考虑远端存储中存储元数据的耗时,在S3中,
frequent LIST APIs
导致了巨大的开销

译者:主要是讲为什么要将元数据与日志数据分开存储的原因。这段可能读起来有点摸不着头脑,原因是咱们没有参与他们之前的讨论,之前的某个讨论版本是想将日志的元数据信息放入远程存储的,此处不用纠结

因此需要将远端的数据本身,与元数据进行分离,其对应的管理类分别为
RemoteStorageManager

RemoteLogMetadataManager

本地及远端offset约束

以下是leader offset相关描述图

Lx = Local log start offset L
z
= Local log end offset

L
y
= Last stable offset(LSO)

R
y
= Remote log end offset

R
x
= Remote log start offset

Lz >= Ly >= L
x
and Ly >= Ry >= Rx

译者:这里不做赘述,关键一点是remote offset中的最大值,是需要 <= LSO的

Replica Manager

译者:注意,ReplicaManager是独立存在的,在没有引入多层存储的时候,它就在,不过以前只管理local存储罢了。它其实是RLM的上一层

如果配置了RLM,那么
ReplicaManager将调用RLM来分配或删除topic-partition

如果某个Broker从Leader切换为了Follower,而正在此时,RLM正在工作,它正在将某个Segment拷贝至远端,我们这个时候不会直接将其放弃掉,而是会等它完成工作。这个操作可能会导致Segment片段的重复,但是没关系,在远端存储的这些日志过期后,均会删除

译者:为什么会导致Segment片段的重复呢? 因为很有可能新的leader已经对同一份Segment进行了上传

Follower Replication

Overview

目前,followers从leaders拉取消息数据,并且尽力尝试追上leader的log-end-offset(LEO),从而将自己的状态变为in-sync副本。如果需要,follower可能还会截断自己的日志从而与leader的数据保持一致

译者:Kafka为了保证数据的高可用,make leader的过程可能会对HW以上的记录进行截断

而在
多级存储

,follower同样需要与leader的数据保持一致,follower仅复制leader中已经可用的本地存储的消息。但是他们需要为远端的Segment构建诸如「leader epoch cache」、「producer id snapshot」这些状态,甚至有必要,它们还需要对其进行截断

下面这张图对leader、follower、remote log、metadata storage 4者的关系进行了简明的概述,具体的细节将在下文展开

  1. Leader将Segment日志端及
    AuxiliaryState(含leader epoch及producer-id snapshots)拷贝至远端存储
  2. Leader将刚才上传的Segment日志段的元数据发布出去
  3. Follower从Leader拉取消息,并遵循一定的规范,这个规范在下文具体说明
  4. Follower等待Leader将元数据放入RemoteLogSegmentMetadataTopic后将其拉取下来
  5. Follower抓取相应的远端存储的元数据,并构建状态AuxiliaryState

译者:关于第2步,leader将元数据发布出去,这里需要注意的是,存储partition元数据的介质并不一定是远端存储,默认实现是,kafka将其放在了一个内置的topic中,如上文提到的,如果用户愿意,可以将其扩展为一个远程存储

而这里的partition元数据具体是指什么呢?原文并没有说明,其实就是每个Segment是存储在了本地还是远端,可根据这个元数据进行路由

Follower拉取消息协议细节

Leader epoch概念的引入,是为了解决在KIP-101及KIP-279中提到的leader切换的场景中,可能存储日志差异的问题。它(Leader epoch)是partition下的一个单调递增的整数,每当leader进行了切换,那么这个值将会累加,并且它也会存储在消息的message batch中

Leader epoch文件存在于每个broker的每个partition中,然后所有状态是in-sync的副本需要保证其有同样的leader epoch历史信息,以及相同的日志数据

Leader epoch的作用:

  • 决定日志截断(KIP-101)
  • 保证副本间的一致性(KIP-279)
  • 在发生截断后,重置消费位点(KIP-320)

在使用远端存储时,我们应该像使用本地存储一样,来处理日志及leader epoch

目前,纯本地存储的场景,follower从leader拉取消息后,通过读取message batch来构建
AuxiliaryState状态。

译者:这里需要注意,纯本地存储的case是,follower需要不断的从leader拉取消息,而这些消息会携带leader epoch 信息,从而维护自己的
leader-epoch-checkpoint
文件,kafka本身不提供专门的API来同步此文件信息,译者认为这样做也是比较合理的

而在多级存储中,follower需要读取leader构建出来的AuxiliaryState,从而获取起始offset及leader epoch。然后follower将会从这个起始offset开始拉取数据。这个起始offset可能是「local-log-start-offset」或「last-tiered-offset」。local-log-start-offset是本地存储的开始offset;last-tiered-offset是已经拷贝至远端存储的最大offset。我们来讨论下使用这两者的利弊

last-tiered-offset

  • 用这个策略明显的好处就是follower能否非常快的追上leader,因为follower只需要同步那些存在于leader本地存储中,且还没来得及放在远端的日志段
  • 而这样做的一个缺点是,follower相对于leader缺少很多本地日志段,当这个follower成为leader后,其他follower将会根据新leader的log-start-offset来截断它们的日志段

译者:关于这个缺点,是kafka自身的副本同步协议中定义的,因为follower不断地从leader拉取消息,努力跟leader保持一致,一致不仅包括offset的上端,同时也包括offset的下端

local-log-start-offset

  • 在发生leader切换时,将会保留本地日志
  • follower追赶leader,这将会花费较长的时间,当为某个partition新增一个全新follower时,就命中了这个case

基于上述原因,我们更倾向使用「
local-log-start-offset

在多层存储中,当follower来拉取数据时,leader只会返回在本地存储中存在的数据。那些已经存在在远端,且本地已经没有的日志段,follower是不会进行拉取复制的。根据「
local-log-start-offset
」机制,如果有必要的话,follower可能会截断自己的日志

译者:同上文,follower是会根据leader的
local-log-start-offset来
截断自己日志段的

当一个follower从leader拉取一个leader的本地存储已经不存在的offset时,leader将会发送一个错误码
OFFSET_MOVED_TO_TIERED_STORAGE
,然后follower将会重新从leader获取「local-log-start-offset」及「leader eopch」。follower收到leader的
local-log-start-offset
后,需要基于这个offset构建远端日志段的
AuxiliaryState,「
译者:此处注意,在纯local存储的模式下,follower是通过拉取leader的全量日志,并且在这个拉取过程中,逐步构建并维护leader-epoch-checkpoint文件的。而在多层存储的环境中,因为follower不再需要从leader处拉取全量日志,但是follower自身的leader-epoch-checkpoint文件还需要全量维护,因此就需要额外花精力去构建这个文件,否则当这个follower成为leader后,leader-epoch-checkpoint文件的部分缺失,会使其无法做出正常的判断
」这个AuxiliaryState其实就是leader的「leader eopch」及「producer-snapshot-ids」。可以通过两种方式来实现:

  • 引入一个新的协议,专门从leader中拉取这个AuxiliaryState
  • 从远端存储中获取这个AuxiliaryState

这里更推荐后者,因为本身远端存储已经保留了这个字段,且不需要在于leader的交互中引入新的协议

获取目标offset的之前的日志段的
AuxiliaryState状态需要以下2个步骤:

  • 需要拉取远端日志段的元数据
  • 需要在相应日志段中拉取诸如leader epoch的记录

当将一个日志段(segment)搬移至远端存储后,leader broker同时需要将「leader epoch sequence」以及「producer id snapshot」追加到segment所在的目录下。这些数据将会帮助follower来构建自己的「leader epoch sequence」以及「producer id snapshot」

译者:原文其实反复在强调这个事儿

因此,我们需要为这个副本引入一个相对应的新状态,可以将其定义为
BuildingRemoteLogAuxState
。follower的拉取线程就如同切换Fetching或Truncating states状态一样,在每次执行时,都需要判断一下,需要切换至哪个状态

当一个follower尝试拉取一个已经不在leader local 存储的offset时,会收到leader返回的
OffsetMovedToRemoteStorage
错误,如果follower收到了这个状态,将会:

  1. 通过调用API ListOffset来获取leader的Earliest Local Offset (ELO) 以及 leader epoch (ELO-LE)
    译者:注意,ListOffset这个API将会发生改变,其返回的出参中将会携带这些信息
  2. 截断自己的本地日志以及
    AuxiliaryState
  3. 从Fetching状态切换至BuildingRemoteLogAux状态

处于BuildingRemoteLogAux状态时,follower可以在以下两个方案中二选一:

  • 方案1:
    • 通过不断反复调用FetchEarliestOffsetFromLeader API,从而获取ELO-LE至leader中最早的leader epoch,然后构建follower本地的leader epoch。当远端存储上有很多任leader切换时,这个方案可能并不会很高效。不过这个方案的好处是,获取leader epoch的操作完全在kafka内部,当远端存储出现短暂不可用时,follower仍然可以追赶leader并进入ISR
  • 方案2:
    • RLMM(RemoteLogMetadataManager)等待远端的元数据,直到等到某个segment包含了ELO-LE
    • 抓取远端存储的leader epoch以及producer snapshot(使用远端fetcher线程)
      译者:多层存储引入的工作线程
    • 获取远端存储的leader epoch数据后,截取 [LSO, ELO] 部分,然后构建follower自己的cache

在构建完follower自己的leader epoch后,follower状态转换为Fetching,然后继续从leader的ELO开始拉取数据。我们更倾向使用方案2,即从远端存储来获取所需数据

Follower fetch 场景(包含日志截断的场景)

让我们讨论一下follower在尝试从leader复制并从远程存储构建
AuxiliaryState
状态时可能遇到的几种情况

名词定义:

OMTS : OffsetMovedToTieredStorage
译者:offset已经不在leader中,通常是一个错误

ELO : Earliest-Local-Offset
译者:local存储中最早的offset

LE-x : Leader Epoch x,
译者:leader epoch,不赘述

HW : High Watermark
译者:高水位,kafka发明的词,不赘述

seg-a-b: a remote segment with first-offset = a and last-offset = b
译者:远端存储的某个segment日志段,它的offse的区间

LE-x, y : A leader epoch sequence entry indicates leader-epoch x starts from offset y
译者:leader epoch的某个区间

场景1:全新follower

现在假设某个全新的broker刚被加入集群,然后将其指派为某个partition的follower replica,这个follower肯定是没有任何本地存储数据的。它将会从offset为0的位置开始从leader抓取数据,如果offset为0的位点在leader中不存在的话,follower将会收到错误OFFSET_MOVED_TO_TIERED_STORAGE,然后follower将会给leader发送ListOffset API,并且在入参中携带参数timestamp = EARLIEST_LOCAL_TIMESTAMP,接着会收到leader返回的ELO(Earliest-Local-Offset)
译者:多层存储需要修改ListOffset协议

follower需要等待这个offset(leader的ELO)的返回,然后构建
AuxiliaryState
状态,然后才能从leader拉取数据
译者:又强调了构建复核状态的必要

步骤1:

抓取远端segment信息,然后构建leader epoch

Broker A (Leader)

Broker B (Follower)

Remote Storage

RL metadata storage

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-2

6: msg 6 LE-2

7: msg 7 LE-3 (HW)



leader_epochs

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7




1. Fetch LE-1, 0

2. Receives OMTS

3. Receives ELO 3, LE-1

4. Fetch remote segment info and build local leader epoch sequence until ELO



leader_epochs

LE-0, 0

LE-1, 3



seg-0-2, uuid-1

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

epochs:

LE-0, 0



seg 3-5, uuid-2

log:

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-2

epochs:

LE-0, 0

LE-1, 3

LE-2, 5

seg-0-2, uuid-1

segment epochs

LE-0, 0



seg-3-5, uuid-2

segment epochs

LE-1, 3

LE-2, 5

步骤2:

继续从leader拉取数据

Broker A (Leader)

Broker B (Follower)

Remote Storage

RL metadata storage

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-2

6: msg 6 LE-2

7: msg 7 LE-3 (HW)



leader_epochs

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7




Fetch from ELO to HW

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-2

6: msg 6 LE-2

7: msg 7 LE-3 (HW)

leader_epochs

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7

seg-0-2, uuid-1

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

epochs:

LE-0, 0



seg 3-5, uuid-2

log:

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-2

epochs:

LE-0, 0

LE-1, 3

LE-2, 5

seg-0-2, uuid-1

segment epochs

LE-0, 0



seg-3-5, uuid-2

segment epochs

LE-1, 3

LE-2, 5

场景2:
out-of-sync follower catching up

一个follower正在尝试追赶leader,然后leader对应的日志段segment已经转移至了远端存储。我们以目标日志段是否在本地存储来分为2种情况来讨论

  • 本地segment存在,而且本地最新的offset要比leader的ELO大
    • 这种场景,本地存储已有,follower跟常规方式一样进行拉取即可
  • 本地segment不存在,或者最新的offset要比leader的ELO小
    • 这种场景,本地的日志段可能因为日志过期已经删除,或者是因为follower已经离线了很长一段时间。然后follower拉取数据时,将会收到OFFSET_MOVED_TO_TIERED_STORAGE错误,然后follower将不得不截断自己所有的本地日志,因为这些数据在leader已经标记为过期

步骤1:

out-of-sync follower (broker B) 本地的offset存储到了3

Broker A (Leader)

Broker B (Follower)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-2

6: msg 6 LE-2

7: msg 7 LE-3

8: msg 8 LE-3

9: msg 9 LE-3 (HW)





leader_epochs

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-1

leader_epochs

LE-0, 0

LE-1, 3

1. Because the latest leader epoch in the local storage (LE-1) does not equal the current leader epoch (LE-3). The follower starts from the Truncating state.

2. fetchLeaderEpochEndOffsets(LE-1) returns 5, which is larger than the latest local offset. With the existing truncation logic, the local log is not truncated and it moves to Fetching state.





seg-0-2, uuid-1

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

epochs:

LE-0, 0



seg 3-5, uuid-2

log:

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-2

epochs:

LE-0, 0

LE-1, 3

LE-2, 5

seg-0-2, uuid-1

segment epochs

LE-0, 0



seg-3-5, uuid-2

segment epochs

LE-1, 3

LE-2, 5



步骤2:

leader的本地日志段因为数据过期而已经删除,然后follower开始尝试追上leader

Broker A (Leader)

Broker B (Follower)

Remote Storage

RL metadata storage

9: msg 9 LE-3

10: msg 10 LE-3

11: msg 11 LE-3 (HW)




[segments till offset 8 were deleted]




leader_epochs

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-1

leader_epochs

LE-0, 0

LE-1, 3



<Fetch State>

1. Fetch from leader LE-1, 4

2. Receives OMTS, truncate local segments.

3. Fetch ELO, Receives ELO 9, LE-3 and moves to
BuildingRemoteLogAux state





seg-0-2, uuid-1

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

epochs:

LE-0, 0



seg 3-5, uuid-2

log:

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-2

epochs:

LE-0, 0

LE-1, 3

LE-2, 5



Seg 6-8, uuid-3, LE-3

log:

6: msg 6 LE-2

7: msg 7 LE-3

8: msg 8 LE-3

epochs:

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7

seg-0-2, uuid-1

segment epochs

LE-0, 0



seg-3-5, uuid-2

segment epochs

LE-1, 3

LE-2, 5



seg-6-8, uuid-3

segment epochs

LE-2, 5

LE-3, 7

步骤3:

删除本地数据后,将会转换为场景1一样的case

Broker A (Leader)

Broker B (Follower)

Remote Storage

RL metadata storage

9: msg 9 LE-3

10: msg 10 LE-3

11: msg 11 LE-3 (HW)




[segments till offset 8 were deleted]




leader_epochs

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7

1. follower rebuilds leader epoch sequence up to LE-3 using remote segment metadata and remote data

leader_epochs

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7



2. follower continue fetching from the leader from ELO (9, LE-3)

9: msg 9 LE-3

10: msg 10 LE-3

11: msg 11 LE-3 (HW)












seg-0-2, uuid-1

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

epochs:

LE-0, 0



seg 3-5, uuid-2

log:

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-2

epochs:

LE-0, 0

LE-1, 3

LE-2, 5



Seg 6-8, uuid-3, LE-3

log:

6: msg 6 LE-2

7: msg 7 LE-3

8: msg 8 LE-3

epochs:

LE-0, 0

LE-1, 3

LE-2, 5

LE-3, 7

seg-0-2, uuid-1

segment epochs

LE-0, 0



seg-3-5, uuid-2

segment epochs

LE-1, 3

LE-2, 5



seg-6-8, uuid-3

segment epochs

LE-2, 5

LE-3, 7

场景3:
Multiple hard failures

步骤1:

Broker A已经将第一个segment转移至了远端存储

Broker A (Leader)

Broker B

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0 (HW)

leader_epochs

LE-0, 0

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0 (HW)

leader_epochs

LE-0, 0

seg-0-1:

log:

0: msg 0 LE-0

1: msg 1 LE-0

epoch:

LE-0, 0

seg-0-1, uuid-1

segment epochs

LE-0, 0

步骤2:

Broker A与Broker B同时崩溃,在Broker B上的一些消息(msg1及msg2)还没有及时刷盘,然后丢失了。在这种场景下,我们是可以接受数据丢失的,但是我们需要保证与KIP-101具有相同的语义

Broker A (stopped)

Broker B (Leader)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0 (HW)

leader_epochs

LE-0, 0

0: msg 0 LE-0 (HW)

1: msg 3 LE-1

leader_epochs

LE-0, 0

LE-1, 1

seg-0-1:

log:

0: msg 0 LE-0

1: msg 1 LE-0

epoch:

LE-0, 0

seg-0-1, uuid-1

segment epochs

LE-0, 0

在Broker B重启后,B丢失了msg1及msg2,然后B变成了leader,这时收到了一条新的消息 msg3 (LE-1, offset 1)

(注意:严格来讲,这个应该不属于unclean-leader-election,因为B并没有从ISR中移除,因为发生问题时,A B同时崩溃掉了)

步骤3:

重启后,broker A截断offset 1、2,然后这时收到了新的数据(LE-1, offset 1)

Broker A (follower)

Broker B (Leader)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

1: msg 3 LE-1 (HW)

leader_epochs

LE-0, 0

LE-1, 1

0: msg 0 LE-0

1: msg 3 LE-1 (HW)

leader_epochs

LE-0, 0

LE-1, 1

seg-0-1:

log:

0: msg 0 LE-0

1: msg 1 LE-0

epoch:

LE-0, 0

seg-0-1, uuid-1

segment epochs

LE-0, 0

步骤4:

Broker A (follower)

Broker B (Leader)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 3 LE-1

2: msg 4 LE-1 (HW)

leader_epochs

LE-0, 0

LE-1, 1

0: msg 0 LE-0

1: msg 3 LE-1

2: msg 4 LE-1 (HW)

leader_epochs

LE-0, 0

LE-1, 1

seg-0-1:

log:

0: msg 0 LE-0

1: msg 1 LE-0

epoch:

LE-0, 0

seg-1-1

log:

1: msg 3 LE-1

epoch:

LE-0, 0

LE-1, 1

seg-0-1, uuid-1

segment epochs

LE-0, 0



seg-1-1, uuid-2

segment epochs

LE-1, 1

收到一个新消息msg 4,然后B broker上的第二个segment(seg-1-1)传输到了远端存储

考虑在两个broker上删除offset为2的本地segment:

  • consumer拉取offset 0, LEO-0。根据本地leader epoch 缓存,offset 0 LE-0是有效的,因此broker会基于segment 0-1 返回msg 0
  • consumer拉取 offset 1,没有携带leader epoch信息。根据本地leader epoch缓存,offset 1是属于 LE-1的。因此broker将会基于segment 1-1 返回 msg 3,而不是seg-0-1的LE-0 的offset 1
  • consumer拉取 LE-0的offset 2将会被拒绝
  • consumer拉取 LE-1的offset 1将会收到远端日志段segmeng 1-1 对应的msg 3

场景4:
unclean leader election including truncation

步骤1:

Broker A (Leader)

Broker B (out-of-sync)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-0 (HW)

leader_epochs

LE-0, 0

0: msg 0 LE-0 (HW)

leader_epochs

LE-0, 0

seg 0-2:

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

epoch:

LE-0, 0

seg-0-2, uuid-1

segment epochs

LE-0, 0

步骤2:

Broker A (Stopped)

Broker B (Leader)

Remote Storage

RL metadata storage



0: msg 0 LE-0

1: msg 4 LE-1

2: msg 5 LE-1

(HW)

leader_epochs

LE-0, 0

LE-1, 1

seg 0-2:

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

epoch:

LE-0, 0

seg 0-1:

0: msg 0 LE-0

1: msg 4 LE-1

epoch:

LE-0, 0

LE-1, 1

seg-0-2, uuid-1

segment epochs

LE-0, 0



seg-0-1, uuid-2

segment epochs

LE-0, 0

LE-1, 1

Broker A停止了,然后一个 out-of-sync的副本(broker B)成为了新的leader。基于unclean-leader-election策略,是允许数据丢失的,但是我们需要保证已有的Kafka的行为没有发生变化

我们假设 min.in_sync = 1

在HW变为2后,Broker B将其本地的日志段seg-0-1搬运至远端

步骤3:

Broker A (Stopped)

Broker B (Leader)

Remote Storage

RL metadata storage



2: msg 5 LE-1 (HW)

leader_epochs

LE-0, 0

LE-1, 1

seg 0-2:

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

epoch:

LE-0, 0

seg 0-1:

0: msg 0 LE-0

1: msg 4 LE-1

epoch:

LE-0, 0

LE-1, 1

seg-0-2, uuid-1

segment epochs

LE-0, 0



seg-0-1, uuid-2

segment epochs

LE-0, 0

LE-1, 1

Broker B上的第一个本地日志段已经过期

consumer拉取offset 0 LE-0收到msg 0。这个消息既可以由远端日志段seg-0-2提供,也可以由seg-0-1提供

consumer拉取offset 1,broker发现offset 1属于leader eopch 1,因此它返回msg 4 而不是msg 1

consumer拉取offset 1 LE-1,收到远端日志段segmeng 0-1的msg 4

consumer拉取offset 2 LE-0将被拒

场景5:
log divergence in remote storage - unclean leader election

步骤1:

Broker A (Leader)

Broker B

Remote Storage

Remote Segment Metadata

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-0

4: msg 4 LE-0 (HW)

leader_epochs

LE-0, 0

broker A shipped one segment to remote storage






0: msg 0 LE-0

1: msg 1 LE-0

leader_epochs

LE-0, 0



broker B is out-of-sync

seg-0-3

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-0

epoch:

LE0, 0

seg-0-3, uuid1

segment epochs

LE-0, 0

步骤2:

在broker A宕机后,out-of-sync的broker B变成了新leader(unclean leader election)

Broker A (stopped)

Broker B (Leader)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-0

4: msg 4 LE-0

leader_epochs

LE-0, 0






0: msg 0 LE-0

1: msg 1 LE-0

2: msg 4 LE-1

3: msg 5 LE-1

4: msg 6 LE-1

leader_epochs

LE-0, 0

LE-1, 2



After becoming the new leader, B received several new messages, and shipped one segment to remote storage.





seg-0-3

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-0

epoch:

LE-0, 0

Seg-0-3

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 4 LE-1

3: msg 5 LE-1

epoch:

LE-0, 0

LE-1, 2

seg-0-3, uuid1

segment epochs

LE-0, 0



seg-0-3, uuid2

segment epochs

LE-0, 0

LE-1, 2

步骤3:

Broker B宕机,Broker A重启,但是不知道LE-1的存在(另一个unclean leader election)

Broker A (Leader)

Broker B (stopped)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-0

4: msg 4 LE-0

5: msg 7 LE-2

6: msg 8 LE-2

leader_epochs

LE-0, 0

LE-2, 5

1. Broker A receives two new messages in LE-2

2. Broker A ships seg-4-5 to remote storage






0: msg 0 LE-0

1: msg 1 LE-0

2: msg 4 LE-1

3: msg 5 LE-1

4: msg 6 LE-1

leader_epochs

LE-0, 0

LE-1, 2






seg-0-3

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-0

epoch:

LE-0, 0

seg-0-3

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 4 LE-1

3: msg 5 LE-1

epoch:

LE-0, 0

LE-1, 2

seg-4-5

epoch:

LE-0, 0

LE-2, 5

seg-0-3, uuid1

segment epochs

LE-0, 0



seg-0-3, uuid2

segment epochs

LE-0, 0

LE-1, 2



seg-4-5, uuid3

segment epochs

LE-0, 0

LE-2, 5



步骤4:

Broker B重启后丢失了所有本地数据

Broker A (Leader)

Broker B (started, follower)

Remote Storage

RL metadata storage

6: msg 8 LE-2

leader_epochs

LE-0, 0

LE-2, 5






1. Broker B fetches offset 0, and receives OMTS error.

2. Broker B receives ELO=6, LE-2

3. in BuildingRemoteLogAux state, broker B finds seg-4-5 has LE-2. So, it builds local LE cache from seg-4-5:

leader_epochs

LE-0, 0

LE-2, 5

4. Broker B continue fetching from local messages from ELO 6, LE-2

5. Broker B joins ISR

seg-0-3

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-0

epoch:

LE-0, 0

seg-0-3

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 4 LE-1

3: msg 5 LE-1

epoch:

LE-0, 0

LE-1, 2

seg-4-5

epoch:

LE-0, 0

LE-2, 5

seg-0-3, uuid1

segment epochs

LE-0, 0



seg-0-3, uuid2

segment epochs

LE-0, 0

LE-1, 2



seg-4-5, uuid3

segment epochs

LE-0, 0

LE-2, 5

consumer从broker B拉取offset 3,LE-1 被拒

consumer从broker B拉取offet 2,将会收到 msg 2

Follower转换为Leader

controller会根据某个follower的相关配置信息来判断,follower是可以被转换为leader的。当一个follower成为leader后,它需要判断从哪个日志段开始将数据拷贝至远端存储,通过遍历leader epoch的历史信息,直至到最近一次的leader epoch,然后找到已经拷贝至远端存储的最大offset。如果找不到对应的entry,那么就从前一个leader epoch中寻找,如果一直到最早的leader epoch仍然没有找到,那么就从最早的epoch开始拷贝

步骤1:

Broker A (Leader)

Broker B (Follower)

Remote Storage

RL metadata storage

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-1

6: msg 6 LE-2 (HW)

7: msg 7 LE-2

8: msg 8 LE-2




leader_epochs

LE-0, 0

LE-1, 3

LE-2, 6

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-1

6: msg 6 LE-2 (HW)









leader_epochs

LE-0, 0

LE-1, 3

LE-2, 6




seg-0-2, uuid-1

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

epochs:

LE-0, 0



seg 3-4, uuid-2

log:

3: msg 3 LE-1

4: msg 4 LE-1

epochs:

LE-0, 0

LE-1, 3




seg-0-2, uuid-1

Segment epochs

LE-0, 0









seg-3-4, uuid-2

Segment epochs

LE-1, 3

步骤2:

此时Broker A宕机了,然后Broker B成为了新的leader,它现在的leader epoch是3,因此需要在远端元数据中寻找leader epoch 为2的offset的最大值,如果不存在,那么就需要找leader epoch 1 的数据,以此类推。在本例中,它找到了epoch为1,offset=4的记录,因此它需要拷贝包含了offset 5的日志段segment,因此,从seg-4-6日志段开始拷贝
译者:此处存疑,感觉可能是作者写错了,因为只有当某个日志段segment成为了非active状态时,才能从本地拷贝至远端,也就是如果seg 3-4已经拷贝到了远端,那么就表明seg 3-4只包含2条消息,所以应该是从seg-5-6拷贝才对

Broker A (Stopped)

Broker B (Leader)

Remote Storage

RL metadata storage



0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-1

6: msg 6 LE-2 (HW)

7: msg 7 LE-2

8: msg 8 LE-2




leader_epochs

LE-0, 0

LE-1, 3

LE-2, 6



0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

3: msg 3 LE-1

4: msg 4 LE-1

5: msg 5 LE-1

6: msg 6 LE-2 (HW)

7: msg 8 LE-3






leader_epochs

LE-0, 0

LE-1, 3

LE-2, 6

LE-3, 7




seg-0-2, uuid-1

log:

0: msg 0 LE-0

1: msg 1 LE-0

2: msg 2 LE-0

epochs:

LE-0, 0



seg-3-4, uuid-2

log:

3: msg 3 LE-1

4: msg 4 LE-1

epochs:

LE-0, 0

LE-1, 3



Seg-4-6, uuid-3

4: msg 4 LE-1

5: msg 5 LE-1

6: msg 6 LE-2

epochs:

LE-0, 0

LE-1, 3

LE-2, 6

seg-0-2, uuid-1

Segment epochs

LE-0, 0











seg-3-4, uuid-2

Segment epochs

LE-1, 3














seg-4-6, uuid-3

Segment epochs

LE-1, 3

LE-2, 6



事务支持

RemoteLogManager拷贝的数据均是小于LSO(last-stable-offset)的。follower可以返回那些事务的取消消息

Consumer抓取请求

任何消费请求,ReplicaManager都会去本地存储查询,如果本地存储返回OffsetOutOfRange异常,那么将会去远端存储查询,如果远端存储依旧没有对应的数据,那么kafka将会扔出TIERED_STORAGE_NOT_AVAILABLE的错误

其他API

DeleteRecords

ListOffsets

LeaderAndIsr

Stopreplica

OffsetForLeaderEpoch

LogStartOffset

RLM/RSM tasks and thread pools

远端存储(例如HDFS/S3/GCP)一般相对比本地来说,都具有高I/O延迟、低性能的特点

当远端存储变得临时不可用(长达数小时)或者延迟变高(长达几分钟),Kafka应该依旧能够正常运转。所有Kafka的操作(produce/consume local data/create topic/etc.)不应该被远端存储所影响。当远端存储不可用或超时时,consumer消费远端数据,应该收到一个响应错误

为了实现这点,我们必须在专用线程池中处理远端存储的操作,而不是在Kafka I/O线程和fetcher线程中

Remote Log Manager (RLM) Thread Pool

RLM维护其管理的topic-partition的list,当topic-partition添加/删除时,这个list将被Kafka的I/O线程更新维护。这个list中每个topic-partition都被分配了一个计划处理的时间,某个topic-partition到达处理时间后,RLM线程池就会将其发起调度

当一个新的topic-partition被指派给broker后,topic-partition就会加入这个list,于此同时,这个partition的执行时间将被初始化为0(
scheduled processing time = 0
),也就意味着这个partition将会被立即调度执行,然后从远端存储中检索查询信息

当一个partition执行结束后,它的调度时间将会被置为(now() +
remote.log.manager.task.interval.ms
),配置项
remote.log.manager.task.interval.ms
可以在broker.conf中进行配置

如果某次partition由于远端存储错误导致执行失败,它会依据规避重试算法进行重试,重试初始配置项为
remote.log.manager.task.retry.interval.ms
,最大重试时间为
remote.log.manager.task.retry.backoff.max.ms
,以及抖动配置
remote.log.manager.task.retry.jitter

当一个partition在broker上注销时,如果线程池没有调度这个partition,那么直接将其从list中移除,否则将此partition标记位
deleted
,然后会在当前调度结束后直接将其删除

线程池中的线程,在同一时刻,只会处理一个partition,遵循如下步骤:

  • 拷贝日志段segment至远端存储(leader)
    • 拷贝的日志段有如下特征
      • 非active
        译者:拷贝的所有segment都不会再产生新数据了
      • 拷贝的offset range,没有被远端存储段完全覆盖
      • last offset < last-stable-offset
        译者:拷贝LSO以下的数据
    • 如果日志段都已经就绪,那么它们将会逐个拷贝至远端存储,按照时间升序,依次拷贝。它为每个segment生成一个普遍唯一的RemoteLogSegmentId,它调用
      RLMM.putRemoteLogSementData(RemoteLogSegentMetadata RemoteLogSegmentMetadata)
      ,并在RSM上调用
      copyLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,LogSegmentData LogSegmentData)
      。如果成功,将会调用
      RLMM.putRemoteLogSegmentData
      ,入参为update state
  • 处理过期的远端日志段segment(leader)
    • RLM leader会根据远端超时参数配置,来计算判断segment是否应该删除。并且它会实时维护RLMM的最早的offset,它会提供包含segment ids的远端日志段的全量列表(实现类为RemoteStorageManager),同样它也会删除对应的元数据(实现类为RemoteLogMetadataManager)

Remote Storage Fetcher Thread Pool

当处理consumer拉取请求时,如果请求的offset落在了远端存储上,那么这个请求将会被加入至
RemoteFetchPurgatory(方便处理超时),而RemoteFetchPurgatory是kafka原始定义的延迟处理器kafka.server.DelayedOperationPurgatory的一种实例,与现有的producer/fetch回调类似。与此同时,这个请求将会被放入命名为“remote storage fetcher thread pool”的队列中

在这个线程池中的每个线程在同一时刻,只会处理一个fetch操作,而这个从远端拉取数据的线程:

  1. 从RLMM中找出相应的RemoteLogSegmentId,从offset索引中找出startPosition和endPosition
  2. 尝试从方法
    RSM.fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, Long startPosition, Long endPosition)
    构建拉取的数据记录
    1. 如果成功,RemoteFetchPurgatory将会被通知,然后返回数据给client端
    2. 如果远端日志已经被删了,RemoteFetchPurgatory也将会被通知到,然后给client端返回一个错误
    3. 如果远端操作失败(例如远端存储短暂性不可用),拉取操作将会自动重试,直至consumer的fetch操作超时

Remote Log Metadata State transitions

COPY_SEGMENT_STARTED
- 这个状态表明segment正在向远端拷贝,但是还没有完成

COPY_SEGMENT_FINISHED
- 这个状态表明segment已经向远端拷贝完成

leader broker将segment拷贝至远端,然后将元数据状态修改为
COPY_SEGMENT_STARTED,
而一旦拷贝完成,则将状态修改为
COPY_SEGMENT_FINISHED

DELETE_SEGMENT_STARTED
-
这个状态表明segment正在进行删除操作,但是还没完成

DELETE_SEGMENT_FINISHED
-
这个状态表明segment已经成功删除完毕

当远端存储的消息过时后,Leader partition负责发布以上两个状态。 Remote Partition Removers也会发布这两个事件

DELETE_PARTITION_MARKED
- 当某个partition被controller删除时,将会被修改为这个状态。 也就意味着,所有拥有这个partition远端日志删除的操作者,都有权力去删除这个远端日志

DELETE_PARTITION_STARTED
-
这个状态表明这个partition启动了删除日志操作,但是还未完成

DELETE_PARTITION_FINISHED
-
这个状态表明这个partition完成了删除日志操作

Remote Partition Removers发布这两个状态

当一个partition被删,controller在RLMM中修改其状态为DELETE_PARTITION_MARKED,预期RLMM能有一个清理远端日志的机制

RemoteLogMetadataManager implemented with an internal topic

远端存储的segment的元数据存放在一个内部的非compact的topic中,topic name为
__remote_log_metadata,
这个topic默认初始化了50个分区,用户可以通过修改配置参数来修改这个选项

在本设计中, RemoteLogMetadataManager(RLMM)的职责是存储及拉取远端的元数据,它提供如下功能

  • 为一个partition的日志段segment提供存储元数据的功能
    译者:一个用户定义的partition一般都会有多个日志段
  • 根据一个leader epoch + offset抓取远端元数据
    译者:给定leader epoch及offet来定位元数据信息
  • 通过读取这个元数据topic,来构建partition的元数据缓存

RemoteLogMetadataManager(RLMM)可能由以下组件组成

  • Cache - 缓存
  • Producer - 生产者
  • Consumer - 消费者

某个用户创建的topic的元数据存储在
__remote_log_metadata
的partition为:

译者:也就是说,用户自己创建的topic,在内部topic
__remote_log_metadata
存储的内容是放在某个partition上的,而这个partition就是通过下面的公式来获取

Utils.toPositive(Utils.murmur2(tp.toString().getBytes(StandardCharsets.UTF_8))) % no_of_remote_log_metadata_topic_partitions

不论是当前broker是
__remote_log_metadata
对应partition的leader还是follower,RLMM均会注册,当然这些partition包含远端元数据信息

RLMM通过订阅
__remote_log_metadata
对应的partition,维护了一套自己的缓存。无论何时,当一个partition被指派到了一个新broker上,并且这个新broker的RLMM并没有订阅这个partition的元数据信息,那么这时新broker上的RLMM就一定会去订阅对应的partition远端元数据,并将其维护在自己的cache中。因此,在最坏的场景中,某个broker的RLMM可能订阅了topic上大部分的partition。在原始版本中,无论RLMM何时启动,我们都将会有一个基于文件的缓存,这个缓存保存了这个实例已经消费的全部消息。每个partition文件都有这样一个独立的文件。它将会帮助我们看到已经读取的数据,从而定位到commit offset,commit offset也可以存储在本地文件中,当我们重启broker时,也就避免了重新读取消息

译者:这里作者维护了一套存储内部特殊topic
__remote_log_metadata
的逻辑,已经读取的数据也都会以缓存的方式存储在内存中,同时也有一个local文件来持久化数据。其实这里所有的元数据信息已经存储在内部topic
__remote_log_metadata
中了,没有必要再引入一个 local 文件再做持久化,之所以这么做,是因为RLMM只是一个接口,默认实现虽然是通过一个内部topic来做的,但实际操作时,用户可能会对其扩展,是的元数据信息存储在了远端,因此引入一个local存储还是很有必要的

RLMM segment的存储开销

Topic partition's topic-id : uuid : 2 longs.

remoteLogSegmentId : uuid : 2 longs.

remoteLogSegmentMetadata : 5 longs + 1 int +1 byte + ~3 epochs(approx avg)

It has leader epochs in-memory which will be much less.

On avg: 10 longs : 10 * 8 = 80 *(other overhead 1.25) = 100 bytes

When a segment is rolled on a broker per sec. //
译者:这里假设每秒生成1G文件

retention as 30days : 60*60*24*30 ~ 2.6MM

2.6MM segments would take ~ 260MB. (This is 1% in our production env)

译者:跑了30天后,缓存文件大概占用了260MB的空间

这个开销可能并没有那么大,因为随意一个borker就会使用几个GB的内存

我们同样可以设置一个类似懒加载、有界缓存的模式来控制内存消耗。需要的话,我们甚至基于操作文件来维护都行

译者:其实这块都不是问题,每秒1G的segment,连续跑一个月,刚产生260M的内存空间,是完全可控的

Message Format

RLMM实例会发布key为null、value格式如下的消息

type : 相对应的value类型,就像schema中定义的apikey,类型是字节byte


version : schema中定义的version,类型是字节byte


data : kafka消息协议格式,以下给出schema定义


在data被序列化之前,type跟version其实已经定义好了,通过添加一个新的version,很容易扩展Schema,同样也可以比较容易地添加一个新的type及其version

Schema

{
    "apiKey": 0,
    "type": "data",
    "name": "RemoteLogSegmentMetadataRecord",
    "validVersions": "0",
    "flexibleVersions": "none",
    "fields": [
    {
        "name": "RemoteLogSegmentId",
        "type": "RemoteLogSegmentIdEntry",
        "versions": "0+",
        "about": "Unique representation of the remote log segment",
        "fields": [
        {
            "name": "TopicIdPartition",
            "type": "TopicIdPartitionEntry",
            "versions": "0+",
            "about": "Represents unique topic partition",
            "fields": [
            {
              "name": "Name",
              "type": "string",
              "versions": "0+",
              "about": "Topic name"
            },
            {
              "name": "Id",
              "type": "uuid",
              "versions": "0+",
              "about": "Unique identifier of the topic"
            },
            {
              "name": "Partition",
              "type": "int32",
              "versions": "0+",
              "about": "Partition number"
            }
          ]
        },
        {
          "name": "Id",
          "type": "uuid",
          "versions": "0+",
          "about": "Unique identifier of the remote log segment"
        }
      ]
    },
    {
      "name": "StartOffset",
      "type": "int64",
      "versions": "0+",
      "about": "Start offset  of the segment."
    },
    {
      "name": "EndOffset",
      "type": "int64",
      "versions": "0+",
      "about": "End offset  of the segment."
    },
    {
      "name": "LeaderEpoch",
      "type": "int32",
      "versions": "0+",
      "about": "Leader epoch from which this segment instance is created or updated"
    },
    {
      "name": "MaxTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Maximum timestamp with in this segment."
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "SegmentLeaderEpochs",
      "type": "[]SegmentLeaderEpochEntry",
      "versions": "0+",
      "about": "Leader epoch cache.",
      "fields": [
        {
          "name": "LeaderEpoch",
          "type": "int32",
          "versions": "0+",
          "about": "Leader epoch"
        },
        {
          "name": "Offset",
          "type": "int64",
          "versions": "0+",
          "about": "Start offset for the leader epoch"
        }
      ]
    },
    {
      "name": "SegmentSizeInBytes",
      "type": "int32",
      "versions": "0+",
      "about": "Segment size in bytes"
    },
    {
      "name": "RemoteLogSegmentState",
      "type": "int8",
      "versions": "0+",
      "about": "State of the remote log segment"
    }
  ]
}
 
 
{
  "apiKey": 1,
  "type": "data",
  "name": "RemoteLogSegmentMetadataRecordUpdate",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    {
      "name": "RemoteLogSegmentId",
      "type": "RemoteLogSegmentIdEntry",
      "versions": "0+",
      "about": "Unique representation of the remote log segment",
      "fields": [
        {
          "name": "TopicIdPartition",
          "type": "TopicIdPartitionEntry",
          "versions": "0+",
          "about": "Represents unique topic partition",
          "fields": [
            {
              "name": "Name",
              "type": "string",
              "versions": "0+",
              "about": "Topic name"
            },
            {
              "name": "Id",
              "type": "uuid",
              "versions": "0+",
              "about": "Unique identifier of the topic"
            },
            {
              "name": "Partition",
              "type": "int32",
              "versions": "0+",
              "about": "Partition number"
            }
          ]
        },
        {
          "name": "Id",
          "type": "uuid",
          "versions": "0+",
          "about": "Unique identifier of the remote log segment"
        }
      ]
    },
    {
      "name": "LeaderEpoch",
      "type": "int32",
      "versions": "0+",
      "about": "Leader epoch from which this segment instance is created or updated"
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "RemoteLogSegmentState",
      "type": "int8",
      "versions": "0+",
      "about": "State of the remote segment"
    }
  ]
}
 
 
 
{
  "apiKey": 2,
  "type": "data",
  "name": "RemotePartitionDeleteMetadataRecord",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    {
      "name": "TopicIdPartition",
      "type": "TopicIdPartitionEntry",
      "versions": "0+",
      "about": "Represents unique topic partition",
      "fields": [
        {
          "name": "Name",
          "type": "string",
          "versions": "0+",
          "about": "Topic name"
        },
        {
          "name": "Id",
          "type": "uuid",
          "versions": "0+",
          "about": "Unique identifier of the topic"
        },
        {
          "name": "Partition",
          "type": "int32",
          "versions": "0+",
          "about": "Partition number"
        }
      ]
    },
    {
      "name": "Epoch",
      "type": "int32",
      "versions": "0+",
      "about": "Epoch (controller or leader) from which this event is created. DELETE_PARTITION_MARKED is sent by the controller. DELETE_PARTITION_STARTED and DELETE_PARTITION_FINISHED are sent by remote log metadata topic partition leader."
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "RemotePartitionDeleteState",
      "type": "int8",
      "versions": "0+",
      "about": "Deletion state of the remote partition"
    }
  ]
}
 
package org.apache.kafka.server.log.remote.storage;
...
/**
 * It indicates the deletion state of the remote topic partition. This will be based on the action executed on this
 * partition by the remote log service implementation.
 */
public enum RemotePartitionDeleteState {
 
    /**
     * This is used when a topic/partition is determined to be deleted by controller.
     * This partition is marked for delete by controller. That means, all its remote log segments are eligible for
     * deletion so that remote partition removers can start deleting them.
     */
    DELETE_PARTITION_MARKED((byte) 0),
 
    /**
     * This state indicates that the partition deletion is started but not yet finished.
     */
    DELETE_PARTITION_STARTED((byte) 1),
 
    /**
     * This state indicates that the partition is deleted successfully.
     */
    DELETE_PARTITION_FINISHED((byte) 2);
...
}
 
 
package org.apache.kafka.server.log.remote.storage;
...
/**
 * It indicates the state of the remote log segment or partition. This will be based on the action executed on this
 * segment or partition by the remote log service implementation.
 * <p>
 */
public enum RemoteLogSegmentState {
 
    /**
     * This state indicates that the segment copying to remote storage is started but not yet finished.
     */
    COPY_SEGMENT_STARTED((byte) 0),
 
    /**
     * This state indicates that the segment copying to remote storage is finished.
     */
    COPY_SEGMENT_FINISHED((byte) 1),
 
    /**
     * This state indicates that the segment deletion is started but not yet finished.
     */
    DELETE_SEGMENT_STARTED((byte) 2),
 
    /**
     * This state indicates that the segment is deleted successfully.
     */
    DELETE_SEGMENT_FINISHED((byte) 3),
...
}

Configs

remote.log.metadata.topic.replication.factor




topic的副本因子

默认值: 3

remote.log.metadata.topic.num.partitions

topic的分区数

默认值: 50

remote.log.metadata.topic.retention.ms

topic过期时间

默认值: -1, 没有过期限制。

用户可以根据自己的情况来配置这个选项,为了避免数据丢失,这个选项的值应该大于多层存储中配置的topic的过期时间

remote.log.metadata.manager.listener.name

此项配置为了通过RemoteLogMetadataManager的实现类与本地broker建联,本地实现了接口RemoteLogMetadataManager的类名,默认配置为`org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager`,可以手动修改。相对应的endpoint接入点为配置项"bootstrap.servers"

remote.log.metadata.*

默认RLMM的实现类会创建producer及consumer的实例,常规的client端配置通常是以
remote.log.metadata.common.client.
作为前缀的,用户也可以通过指定
remote.log.metadata.producer.

remote.log.metadata.consumer.
来覆盖common配置。这些配置都将通过方法RemoteLogMetadataManager#configure(Map<String, ?> props) 来实现

例如:“rlmm.config.remote.log.metadata.producer.batch.size=100”将会设置producer的batch.size配置

remote.partition.remover.task.interval.ms

删除远端分区的任务,在执行前后两次删除任务的时间间隔,默认3600000,即 1 小时

Committed offsets file format

译者:消息提交位点的文件格式

已经提交的位点信息会存储在一个名为
_rlmm_committed_offsets
的本地文件,这个文件在log dir目录下。这个文件为每个分区都创建了一个键值对:“<partition-no> <offset>”。
_rlmm_committed_offsets
的文件内容举例:

0 2022
4 104
2 498

Internal flat-file store format of remote log metadata

RLMM存储远端日志的元数据,并为用户topic的每一个partition构建一个物化实例并存储在单独的打平文件中

打平的文件格式如下

<magic><topic-name><topic-id><metadata-topic-offset><sequence-of-serialized-entries>
 
magic:                 
    unsigned var int, version of this file format.
topic-name:            
    string, topic name.
topic-id:              
    uuid, uuid of topic
metadata-topic-offset: 
    var long, offset of the remote log metadata topic partition upto which this topic partition's remote log metadata is fetched.
serialized-entries:
   sequence of serialized entries defined as below, more types can be added later if needed.
 
Serialization of entry is done as mentioned below. This is very similar to the message format mentioned earlier for storing into the metadata topic. 
 
length    : unsigned var int, length of this entry which is sum of sizes of type, version, and data.
type      : unsigned var int, represents the value type. This value is 'apikey' as mentioned in the schema. 
version   : unsigned var int, the 'version' number of the type as mentioned in the schema. 
data      : record payload in kafka protocol message format, the schema is given below.
 
Both type and version are added before the data is serialized into record value.  Schema can be evolved by adding a new version with the respective changes. A new type can also be supported by adding the respective type and its version.
 
 
{
  "apiKey": 0,
  "type": "data",
  "name": "RemoteLogSegmentMetadataRecordStored",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    {
      "name": "SegmentId",
      "type": "uuid",
      "versions": "0+",
      "about": "Unique identifier of the log segment"
    },
    {
      "name": "StartOffset",
      "type": "int64",
      "versions": "0+",
      "about": "Start offset  of the segment."
    },
    {
      "name": "EndOffset",
      "type": "int64",
      "versions": "0+",
      "about": "End offset  of the segment."
    },
    {
      "name": "LeaderEpoch",
      "type": "int32",
      "versions": "0+",
      "about": "Leader epoch from which this segment instance is created or updated"
    },
    {
      "name": "MaxTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Maximum timestamp with in this segment."
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "SegmentLeaderEpochs",
      "type": "[]SegmentLeaderEpochEntry",
      "versions": "0+",
      "about": "Event timestamp of this segment.",
      "fields": [
        {
          "name": "LeaderEpoch",
          "type": "int32",
          "versions": "0+",
          "about": "Leader epoch"
        },
        {
          "name": "Offset",
          "type": "int64",
          "versions": "0+",
          "about": "Start offset for the leader epoch"
        }
      ]
    },
    {
      "name": "SegmentSizeInBytes",
      "type": "int32",
      "versions": "0+",
      "about": "Segment size in bytes"
    },
    {
      "name": "RemoteLogSegmentState",
      "type": "int8",
      "versions": "0+",
      "about": "State of the remote log segment"
    }
  ]
}
 
 
{
  "apiKey": 1,
  "type": "data",
  "name": "DeletePartitionStateRecord",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    {
      "name": "Epoch",
      "type": "int32",
      "versions": "0+",
      "about": "Epoch (controller or leader) from which this event is created. DELETE_PARTITION_MARKED is sent by the controller. DELETE_PARTITION_STARTED and DELETE_PARTITION_FINISHED are sent by remote log metadata topic partition leader."
    },
    {
      "name": "EventTimestamp",
      "type": "int64",
      "versions": "0+",
      "about": "Event timestamp of this segment."
    },
    {
      "name": "RemotePartitionDeleteState",
      "type": "int8",
      "versions": "0+",
      "about": "Deletion state of the remote partition"
    }
  ]
}

译者:这里作者对这个打平文件存储的内容进行了举例,上面2个分别是RemoteLogSegmentMetadataRecordStored 及 DeletePartitionStateRecord,即远端存储的记录,以及删除partition的记录

Message Formatter for the internal topic

当从远端元数据topic中消费到消息后,
`
org.apache.kafka.server.log.remote.storage.RemoteLogMetadataFormatter
`
这个类用来格式化消息,用户可以指定格式化的property,如下文所示。这个对于debug来说非常有帮助

Internal message format

partition:<val><sep>message-offset:<val><sep>type:<RemoteLogSegmentMetadata | RemoteLogSegmentMetadataUpdate | DeletePartitionState><sep>version:<_no_><vs>event-value:<string representation of the event>
 
val: represents the respective value of the key.
sep: represents the separator, default value is: ","
 
partition : Remote log metata topic partition number. This is optional.
Use print.partition property to print it, default is false
 
message-offset : Offset of this message in remote log metadata topic. This is optional.
Use print.message.offset property to print it, default is false
 
type: Event value type, which can be one of RemoteLogSegmentMetadata, RemoteLogSegmentMetadataUpdate, DeletePartitionState values.
 
version: Version number of the event value type. This is optional.
Use print.version property to print it, default is false
 
Use print.all.event.value.fields to print the string representation of the event which will include all the fields in the data, default property value is false.
 
Event value can be of any of the types below:
 
remote-log-segment-id is represented as "{id:<><sep>topicId:<val><sep>topicName:<val><sep>partition:<val>}" in the event value.
topic-id-partition is represented as "{topicId:<val><sep>topicName:<val><sep>partition:<val>}" in the event value.
 
For RemoteLogSegmentMetadata
default representation is "{remote-log-segment-id:<val><sep>start-offset:<val><sep>end-offset:<val><sep>leader-epoch:<val><sep>remote-log-segment-state:<COPY_SEGMENT_STARTED | COPY_SEGMENT_FINISHED | DELETE_SEGMENT_STARTED | DELETE_SEGMENT_FINISHED>}"
 
For RemoteLogSegmentMetadataUpdate
default representation is "{remote-log-segment-id:<val><sep>leader-epoch:<val><sep>remote-log-segment-state:<COPY_SEGMENT_STARTED | COPY_SEGMENT_FINISHED | DELETE_SEGMENT_STARTED | DELETE_SEGMENT_FINISHED>}"
 
For DeletePartitionState
default representation is "{topic-id-partition:<val><sep>epoch:<val><sep>remote-partition-delete-state:<DELETE_PARTITION_MARKED | DELETE_PARTITION_STARTED | DELETE_PARTITION_FINISHED>

Topic deletion lifecycle

译者:这节是讨论topic删除动作,不包括远端日志过期后的删除

当一个controller收到了一个删除topic的请求,那么将会遵循现有的Kafka删除协议,将此topic对应的所有的replicas标记为离线offline,并且停止一切拉取请求。当所有的副本replica都达到了offline状态后,controller向RLMM发布一个删除事件(调用方法
RemoteLogMetadataManager.updateRemotePartitionDeleteMetadata
),将topic标记为deleted,即设置为RemotePartitionDeleteState#DELETE_PARTITION_MARKED。因为为topic引入了uuid,因此topic的删除操作可变成异步操作
译者:这里其实引入了topicId,避免快速删除topic后又新建同名topic
。这个设计允许以后通过将删除标记发布到远程日志元数据topic中来回收远程日志。RLMM则收到DELETE_PARTITION_MARKED状态后,就触发了异步删除远端元数据的操作

默认RLMM处理删除操作使用的类为RemotePartitionRemover(RPRM)

如果某个broker是topic
__remote_log_metadata
对应partition的leader,RPRM的实例将会在这个broker上创建。当某个topic被标记为删除,那么RPRM就要负责将其从远端存储上删除。RLMM消费远端partition元数据的消息,然后从中过滤删除partition的事件。它收集那些待删除的partitions,然后调用对应的RemoteStorageManager类来删除日志segment。这个执行间隔的时间是通过配置
remote.partition.remover.task.interval.ms
来设置的(默认1小时)。一旦删除操作成功执行,那么它将会提交消费位点