2024年11月

探讨 Unity 实时多人游戏的现状,不同游戏类型中的不同网络架构。

网络架构模式

游戏开发者使用各种网络架构模式来确保多人游戏中玩家之间可靠且快速的互动。每种模式都有其自身的优缺点,选择合适的模式取决于您正在使用的特定游戏类型和互动场景。

在本节中,我们将讨论以下模式:锁步、回滚、快照插值和延迟补偿。此外,我们将讨论最适合不同类型或游戏的模式。

锁帧(LockStep)

锁帧(LockStep)
是最古老的网络游戏同步方法之一,如今仍然经常使用。虽然这种架构可以采用多种形式,但我们将重点介绍最常见的实现,然后讨论必要的条件、限制和可能的配置。

在锁步架构中,每个玩家将其输入发送到其他所有玩家,然后在收到所有玩家的输入后立即推进其模拟——就是这样!游戏在互联网上完全同步,使用非常少的带宽,每个玩家始终看到事件的展开方式与其他所有人完全相同。

为了使游戏与锁步架构兼容,必须满足几个条件。让我们讨论一下。

确定性

锁步架构的主要要求是游戏模拟必须支持严格的位级[确定性](https://www.whatgamesare.com/determinism.html)。由于网络代码只同步输入,因此游戏模拟必须在每个帧中使用相同的输入数据在每台机器上计算出相同的结果。否则,游戏模拟将不同步、发散并偏离彼此——最终导致游戏看起来完全不同。

这通常是一个难以满足的条件,游戏必须经过精心设计才能保持确定性。开发人员通常在每个帧或帧中的多个点检查游戏状态校验和,并将这些校验和在参与者之间进行比较,以帮助跟踪和修复在游戏测试期间发生不同步的非确定性来源。

此外,任何使用浮点运算的游戏(即大多数现代游戏)都需要额外的考虑。如果您的游戏在多个平台上运行,由于平台和编译器之间的差异,浮点确定性可能特别难以实现。每个编译器可以使用不同的指令集、重新排序指令或自动矢量化。每个系统都可以以不同的方式实现超越函数,例如余弦、正弦和正切。所有这些都可能导致平台之间甚至构建之间出现不同步。一些开发人员完全使用定点运算或软件模拟浮点运算来实现其游戏模拟,以绕过使用浮点数产生的非确定性。

其他非确定性来源包括使用不同的种子生成随机数或以不同的顺序处理对象,例如物理中的接触。所有这些考虑因素也极大地限制了可以用作游戏模拟一部分的第三方库,例如碰撞检测库、物理引擎等。因此,这对于许多类型的游戏和引擎来说可能不切实际。

固定帧率

锁步还要求所有玩家同步每个输入和滴答所代表的时间单位。换句话说,游戏将以固定的帧率进行。一些游戏将渲染帧率锁定为与固定模拟帧率匹配。其他游戏允许以任意帧率进行渲染,并在固定滴答结果之间显示插值。

问题和限制

虽然锁步架构是最容易实现的架构之一,并且没有视觉伪影,但它存在一些问题和限制。在某些情况下,可以通过对游戏实现做出一些妥协来克服这些限制。在其他情况下,这些限制可能只是限制了这种架构对特定游戏的适用性。

输入延迟

锁步通过等待直到收到所有相关信息才推进,从而防止任何延迟引起的视觉伪影。这种等待有一个缺点:输入延迟。

“输入延迟”通常是指用户按下按钮到在屏幕上看到响应之间的时间。从用户的角度来看,当他们在屏幕上看到该输入的结果时,他们会感知到对他们输入的响应。如果用户必须等到所有其他玩家收到并处理他们的输入才能看到此响应,这会导致明显的输入延迟。

在对反应性要求很高的游戏中,例如射击游戏,输入延迟可能是关键因素,因为它会使游戏响应速度变慢,更难玩。

回滚

接下来,
回滚
是一种流行的网络代码架构,广泛应用于现代竞技游戏,尤其是在格斗游戏中。

回滚可以看作是经典锁步架构的扩展。在回滚架构中,玩家在每一帧发送他们的命令,并在不等待其他玩家的命令的情况下继续他们的游戏。游戏在不等待来自远程玩家的数据的情况下进行——这被称为“客户端预测”,因为远程输入数据尚不可知,客户端必须对其他玩家的未来行动做出假设。

与锁步架构(它提供帧到帧的完美顺序,但存在输入延迟)相比,回滚架构提供即时命令响应,但以顺序为代价。

在回滚过程中,游戏会进行,并且一旦本地玩家的命令发送出去,就会立即显示出来。但是,由于尚未收到来自远程玩家的命令,因此显示的信息是预测。一旦收到来自远程玩家的数据,就必须将此新信息与预测进行协调。如果信息与预测不匹配,则游戏将回滚以“纠正”错误。

什么是“回滚”?

假设在客户端准备渲染第 7 帧时,收到了来自远程玩家的第 5 帧的新数据。在这种情况下,客户端需要执行以下步骤:

  1. 加载/恢复整个游戏的状态,使其处于第 4 帧,即所有命令都已知的上一个不可预测帧。

  2. 使用第 5 帧的原始本地命令以及从远程玩家新收到的命令,将游戏移动到第 5 帧。

  3. 通过对每一帧应用本地命令继续向前推进,直到我们到达当前帧(在我们的示例中,这是第 7 帧)。

图片

完成这些步骤后,我们可能会注意到,远程玩家在第 5 帧中的操作可能导致的结果与预测的结果不同;现在将在游戏中显示这些更正后的结果。

因此,在这种情况下,“回滚”意味着游戏“回滚”到所有内容都已知的最后一帧,然后使用新信息“重播”向前。这提供了纠正预测错误的能力。

需要注意的是,为了使此方法有效,游戏必须能够快速保存和恢复其完整状态,以及使用任意命令向前移动任意数量的帧。根据游戏的复杂性,这可能需要大量的计算资源。

快照插值

快照插值
是一种由游戏 Quake 推广的技术,此后已广泛应用于源自 Quake 的游戏和游戏引擎。事实上,这种模型特别适合射击游戏。

快照插值方法基于游戏对象两个独立的时间流的概念:一个反映过去对象的状态,另一个反映未来对象预期状态。

客户端(玩家)将其命令发送到服务器,服务器处理这些命令,更改游戏状态,然后将游戏当前状态的“快照”发送回客户端。此快照包含有关游戏中的所有对象在创建快照时状态的信息。

为了保持游戏的响应速度,客户端会立即将部分命令应用于某些对象,预测其行为。这会导致对象在玩家的屏幕上同时以不同的状态存在:

  • 插值对象
    以它们在过去某个时间点的状态表示。

  • 预测对象
    以它们在未来某个时间点预期存在的状态表示。

这会产生一种有趣的动态,例如,当您预测的角色试图躲避一个正在插值的传入弹丸时。这可能比看起来更难,因为您的角色正在未来移动,而弹丸则在过去。

尽管如此,这种模型还是有一些优点:

  • 游戏会立即响应玩家命令;与锁步模型不同,不需要输入延迟。

  • 与完整的回滚架构相比,客户端需要更少的处理时间。

  • 对象在从服务器获得的已知状态之间进行插值,因此对象只通过它们已经处于的状态。

需要注意的是,为了有效地使用这种模型,游戏必须能够快速处理和传输游戏状态的快照,以及立即响应玩家命令。

不同游戏类型的最佳架构

每种游戏类型都会对网络延迟、稳定性和吞吐量施加其自身的限制。例如,MMO 游戏需要高吞吐量、低延迟和高稳定性。同时,第一人称射击 (FPS) 游戏需要低延迟和高吞吐量。在这两种类型中,建议使用具有权威游戏服务器和基于状态和输入的数据交换的服务器-客户端拓扑。

下表提供了不同视频游戏类型的网络需求、数据交换格式、推荐的网络拓扑和网络模式的比较概述。

图片

通过分析此表,我们可以得出一些结论:

  1. 网络需求在很大程度上取决于类型:例如,RTS 和 FPS 游戏需要低延迟才能确保流畅且逼真的游戏体验。

  2. 数据交换格式也因类型而异:大多数游戏使用状态和输入交换,但有些游戏,例如 RTS 和动作游戏,只使用输入。

  3. 网络模式和拓扑是根据每种类型的特定需求选择的:动作和体育游戏通常使用预测和回滚,而 MMO 和 RPG 游戏通常使用快照预测和插值。

继续前进

每款游戏都有其自身的一套需求和类型交叉,这会对其网络互动施加其自身的限制。在许多情况下,选择经过验证的解决方案将比独立开发更好。

在这篇文章中,我们回顾了实时多人游戏中网络架构的主要模式。我们还汇总了一个针对不同类型推荐解决方案的通用表格。当然,这些建议并非最终的和通用的,但它们可以作为您选择架构解决方案时的起点。

自研
原生鸿蒙NEXT5.0 API12 ArkTS
仿微信
app聊天
模板
HarmonyOSChat

harmony-wechat
原创重磅实战
纯血鸿蒙OS ArkUI+ArkTs仿微信App
聊天实例。包括
聊天、通讯录、我、朋友圈
等模块,实现类似
微信消息UI布局、编辑器光标处输入文字+emo表情图片/GIF动图、图片预览、红包、语音/位置UI、长按语音面板
等功能。

版本信息

DevEco Studio 5.0.3.906HarmonyOS5.0.0API12 Release SDK
commandline
-tools-windows-x64-5.0.3.906

纯血鸿蒙OS元年已来,华为大力推广自主研发的全场景分布操作系统HarmonyOS,赶快加入鸿蒙原生应用开发,未来可期!

基于鸿蒙os
ArkTs

ArkUI
实现下拉刷新、右键长按/下拉菜单、自定义弹窗、朋友圈等功能。

项目框架结构

基于最新版
DevEco Studio 5.0.3.906
编码工具构建鸿蒙app聊天项目模板。

https://developer.huawei.com/consumer/cn/deveco-studio/

HarmonyOS-Chat聊天app项目已经发布到我的原创作品集,有需要的可以去拍哈~

https://gf.bilibili.com/item/detail/1107424011

如果大家想快速的入门到进阶开发,先把官方文档撸一遍,然后找个实战项目案例练练手。

华为鸿蒙os开发官网
https://developer.huawei.com/consumer/cn/
HarmonyOS开发设计规范
https://developer.huawei.com/consumer/cn/design/
ArkUI方舟UI框架
https://developer.huawei.com/consumer/cn/doc/harmonyos-references-V5/arkui-declarative-comp-V5

路由页面JSON文件

HarmonyOS ArkUI自定义顶部导航条

项目中所有顶部标题导航栏均是自定义封装ArkUI组件实现功能效果。之前有写过一篇专门的分享介绍,感兴趣的可以去看看下面这篇文章。

HarmonyOS NEXT 5.0自定义增强版导航栏组件|鸿蒙ArkUI自定义标题栏

https://www.cnblogs.com/xiaoyan2017/p/18517517

Index.ets入口模板

//自定义页面
@Builder customPage() {if(this.pageIndex === 0) {
IndexPage()
}
else if(this.pageIndex === 1) {
FriendPage()
}
else if(this.pageIndex === 2) {
MyPage()
}
}

build() {
Navigation() {
this.customPage()
}
.toolbarConfiguration(
this.customToolBar)
.height(
'100%')
.width(
'100%')
.backgroundColor($r(
'sys.color.background_secondary'))
.expandSafeArea([SafeAreaType.SYSTEM], [SafeAreaEdge.TOP, SafeAreaEdge.BOTTOM])
}

//自定义底部菜单栏
@Builder customToolBar() {
Row() {
Row() {
Badge({
count:
8,
style: {},
position: BadgePosition.RightTop
}) {
Column({space:
2}) {
SymbolGlyph($r(
'sys.symbol.ellipsis_message_fill'))Text('聊天').fontSize(12)}
}
}
.layoutWeight(
1)
.justifyContent(FlexAlign.Center)
.onClick(()
=>{this.pageIndex = 0})

Row() {
Column({space:
2}) {
SymbolGlyph($r(
'sys.symbol.person_2'))Text('通讯录').fontSize(12)}
}
.layoutWeight(
1)
.justifyContent(FlexAlign.Center)
.onClick(()
=>{this.pageIndex = 1})

Row() {
Badge({
value:
'',
style: { badgeSize:
8, badgeColor: '#fa2a2d'}
}) {
Column({space:
2}) {
SymbolGlyph($r(
'sys.symbol.person_crop_circle_fill_1'))Text('我').fontSize(12)}
}
}
.layoutWeight(
1)
.justifyContent(FlexAlign.Center)
.onClick(()
=>{this.pageIndex = 2})
}
.height(
56)
.width(
'100%')
.backgroundColor($r(
'sys.color.background_secondary'))
.borderWidth({top:
1})
.borderColor($r(
'sys.color.background_tertiary'))
}

HarmonyOS实现登录/注册/倒计时验证

登录模板

/**
* 登录模板
* @author andy
*/import { router, promptAction } from'@kit.ArkUI'@Entry
@Component
struct Login {
@State name: string
= ''@State pwd: string= '' //提交 handleSubmit() {if(this.name === '' || this.pwd === '') {
promptAction.showToast({ message:
'账号或密码不能为空'})
}
else{//登录接口逻辑... promptAction.showToast({ message:'登录成功'})
setTimeout(()
=>{
router.replaceUrl({ url:
'pages/Index'})
},
2000)
}
}

build() {
Column() {
Column({space:
10}) {
Image(
'pages/assets/images/logo.png').height(50).width(50)
Text(
'HarmonyOS-Chat').fontSize(18).fontColor('#0a59f7')
}
.margin({top:
50})
Column({space:
15}) {
TextInput({placeholder:
'请输入账号'})
.onChange((value)
=>{this.name =value
})
TextInput({placeholder:
'请输入密码'}).type(InputType.Password)
.onChange((value)
=>{this.pwd =value
})
Button(
'登录').height(45).width('100%')
.linearGradient({ angle:
135, colors: [['#0a59f7', 0.1], ['#07c160', 1]] })
.onClick(()
=>{this.handleSubmit()
})
}
.margin({top:
30})
.width(
'80%')
Row({space:
15}) {
Text(
'忘记密码').fontSize(14).opacity(0.5)
Text(
'注册账号').fontSize(14).opacity(0.5)
.onClick(()
=>{
router.pushUrl({url:
'pages/views/auth/Register'})
})
}
.margin({top:
20})
}
.height(
'100%')
.width(
'100%')
.expandSafeArea([SafeAreaType.SYSTEM], [SafeAreaEdge.TOP, SafeAreaEdge.BOTTOM])
}
}

Stack({alignContent: Alignment.End}) {
TextInput({placeholder:
'验证码'})
.onChange((value)
=>{this.code =value
})
Button(`${
this.codeText}`).enabled(!this.disabled).controlSize(ControlSize.SMALL).margin({right: 5})
.onClick(()
=>{this.handleVCode()
})
}

鸿蒙arkts实现60s倒计时验证码

//验证码参数
@State codeText: string = '获取验证码'@State disabled:boolean = false@State time: number= 60

//获取验证码
handleVCode() {if(this.tel === '') {
promptAction.showToast({ message:
'请输入手机号'})
}
else if(!checkMobile(this.tel)) {
promptAction.showToast({ message:
'手机号格式错误'})
}
else{
const timer
= setInterval(() =>{if(this.time > 0) {this.disabled = true this.codeText = `获取验证码(${this.time--})`
}
else{
clearInterval(timer)
this.codeText = '获取验证码' this.time = 5 this.disabled = false}
},
1000)
}
}

鸿蒙os下拉刷新/九宫格图像/长按菜单

  • 下拉刷新组件
Refresh({
refreshing: $$
this.isRefreshing,
builder:
this.customRefreshTips
}) {
List() {
ForEach(
this.queryData, (item: RecordArray) =>{
ListItem() {
//... }
.stateStyles({pressed:
this.pressedStyles, normal: this.normalStyles})
.bindContextMenu(
this.customCtxMenu, ResponseType.LongPress)
.onClick(()
=>{//... })
}, (item: RecordArray)
=>item.cid.toString())
}
.height(
'100%')
.width(
'100%')
.backgroundColor(
'#fff')
.divider({ strokeWidth:
1, color: '#f5f5f5', startMargin: 70, endMargin: 0})
.scrollBar(BarState.Off)
}
.pullToRefresh(
true)
.refreshOffset(
64)//当前刷新状态变更时触发回调 .onStateChange((refreshStatus: RefreshStatus) =>{
console.info(
'Refresh onStatueChange state is ' +refreshStatus)this.refreshStatus =refreshStatus
})
//进入刷新状态时触发回调 .onRefreshing(() =>{
console.log(
'onRefreshing...')
setTimeout(()
=>{this.isRefreshing = false},2000)
})
  • 自定义刷新提示
@State isRefreshing: boolean = false@State refreshStatus: number= 1

//自定义刷新tips
@Builder customRefreshTips() {
Stack() {
Row() {
if(this.refreshStatus == 1) {
SymbolGlyph($r(
'sys.symbol.arrow_down')).fontSize(24)
}
else if(this.refreshStatus == 2) {
SymbolGlyph($r(
'sys.symbol.arrow_up')).fontSize(24)
}
else if(this.refreshStatus == 3) {
LoadingProgress().height(
24)
}
else if(this.refreshStatus == 4) {
SymbolGlyph($r(
'sys.symbol.checkmark')).fontSize(24)
}
Text(`${
this.refreshStatus == 1 ? '下拉刷新':this.refreshStatus == 2 ? '释放更新':this.refreshStatus == 3 ? '加载中...':this.refreshStatus == 4 ? '完成' : ''}`).fontSize(16).margin({left:10})
}
.alignItems(VerticalAlign.Center)
}
.align(Alignment.Center)
.clip(
true)
.constraintSize({minHeight:
32})
.width(
'100%')
}
  • 长按右键菜单

.bindContextMenu(
this
.customCtxMenu, ResponseType.LongPress)

//自定义长按右键菜单
@Builder customCtxMenu() {
Menu() {
MenuItem({
content:
'标为已读'})
MenuItem({
content:
'置顶该聊天'})
MenuItem({
content:
'不显示该聊天'})
MenuItem({
content:
'删除'})
}
}
  • 下拉菜单

.bindMenu([ ... ])

Image($r('app.media.plus')).height(24).width(24)
.bindMenu([
{
icon: $r(
'app.media.message_on_message'),
value:
'发起群聊',
action: ()
=>{}
},
{
icon: $r(
'app.media.person_badge_plus'),
value:
'添加朋友',
action: ()
=> router.pushUrl({url: 'pages/views/friends/AddFriend'})
},
{
icon: $r(
'app.media.line_viewfinder'),
value:
'扫一扫',
action: ()
=>{}
},
{
icon: $r(
'app.media.touched'),
value:
'收付款',
action: ()
=>{}
}
])

HarmonyOS arkui自定义dialog弹框组件



支持参数配置如下:

//标题(支持字符串|自定义组件)
@BuilderParam title: ResourceStr | CustomBuilder =BuilderFunction//内容(字符串或无状态组件内容)
@BuilderParam message: ResourceStr | CustomBuilder =BuilderFunction//响应式组件内容(自定义@Builder组件是@State动态内容)
@BuilderParam content: () => void =BuilderFunction//弹窗类型(android | ios | actionSheet)
@Prop type: string//是否显示关闭图标
@Prop closable: boolean
//关闭图标颜色
@Prop closeColor: ResourceColor//是否自定义内容
@Prop custom: boolean
//自定义操作按钮
@BuilderParam buttons: Array<ActionItem> | CustomBuilder = BuilderFunction

调用方式非常简单。

//自定义退出弹窗
logoutController: CustomDialogController = newCustomDialogController({
builder: HMPopup({
type:
'android',
title:
'提示',
message:
'确定要退出当前登录吗?',
buttons: [
{
text:
'取消',
color:
'#999'},
{
text:
'退出',
color:
'#fa2a2d',
action: ()
=>{
router.replaceUrl({url:
'pages/views/auth/Login'})
}
}
]
}),
maskColor:
'#99000000',
cornerRadius:
12,
width:
'75%'})
//自定义公众号弹窗
@Builder customQRContent() {
Column({space:
15}) {
Image(
'pages/assets/images/qrcode.png').height(150).objectFit(ImageFit.Contain)
Text(
'扫一扫,加我公众号').fontSize(14).opacity(.5)
}
}
qrController: CustomDialogController
= newCustomDialogController({
builder: HMPopup({
message:
this.customQRContent,
closable:
true}),
cornerRadius:
12,
width:
'70%'})

好了,以上就是harmonyos next实战开发聊天app的一些知识分享,希望对大家有所帮助~

整个项目涉及到的知识点非常多,限于篇幅就先分享到这里。感谢大家的阅读与支持。

https://www.cnblogs.com/xiaoyan2017/p/18396212

https://www.cnblogs.com/xiaoyan2017/p/18437155

https://www.cnblogs.com/xiaoyan2017/p/18467237

目录

参考资料

什么是WebRTC?

  • WebRTC(Web实时通信)技术
  • 浏览器之间交换任意数据,而无需中介
  • 不需要用户安装插件或任何其他第三方软件

能做什么?

与Media Capture和Streams API一起
  • 支持音频和视频会议
  • 文件交换
  • 屏幕共享
  • 身份管理
  • 以及与传统电话系统的接口,包括支持发送DTMF(按键拨号)信号

架构图

个人理解(类比)

官方文档晦涩难懂,所以按照自己的思路,整理总结。

核心知识点

先整理官方核心知识点,这里不理解,没关系,我们继续按自己的思路总结
  • ICE(框架)允许您的Web浏览器与对等端连接
  • STUN(协议)用于发现您的公共地址并确定路由器中阻止与对等体直接连接的任何限制
  • NAT 用于为您的设备提供公共IP地址
  • TURN 是指通过打开与TURN服务器的连接并通过该服务器中继所有信息来绕过对称NAT限制
  • SDP 从技术上讲会话描述协议(SDP并不是一个真正的协议,而是一种数据格式)

核心知识点类比

我们使用餐厅(或者其他)来类比WebRTC核心概念, 想象一下,你现在正在餐厅里面。
顾客(用户)
可以直接与
厨房(服务器)
进行交流,而不需要通过
服务员(中介)
。在这个餐厅里,顾客可以
点菜(发送音视频请求)

享用美食(接受音视频流)

还可以与其他顾客(其他用户)直接交流(数据传输)
,而这一切都
不要
额外的
工具或设备(插件)

备注:如果你明白,上面描述,那我们就继续。

ICE框架

想象一下ICE就像餐厅
整体布局和设计
,它确保
顾客(用户)
能够顺利找到座位并与
厨房(对等端)
建立联系。ICE负责协调顾客和厨房之间的所有连接方式,确保他们能顺利交流。

STUN(协议)

STUN就像餐厅门口
接待员
,负责帮助顾客找到餐厅的公共入口。接待员会
告诉顾客他们的公共地址(公共IP地址)
,并帮助他们了解是否有任何
障碍(比如路由器的限制、防火墙等)阻止他们直接进入餐厅(与对等端直接连接)

NAT(网络地址转换)

NAT就像餐厅外墙,它为餐厅提供一个
公共门牌号(公共IP地址)
。虽然餐厅内部有很多
桌子(设备)
,但外面的人只知道这个公共门牌号,而不知道内部具体位置。

TURN

TURN就像餐厅的
外卖服务
。如果顾客无法直接进入餐厅(由于对称NAT限制),他们可以选择通过
外卖服务(TURN服务器)来获取食物
。所有的订单和交流通过外卖服务进行,这样即使顾客无法直接到达餐厅,他们仍然可以享用美食。

SDP(会话描述协议)

SDP就像餐厅的
菜单
,它描述了可供选择的菜品和饮料(音视频流的格式和参数),虽然菜单本身不是一个真正协议,但它提供顾客和厨房之间所需的信息,以便他们能达成共识,确保顾客点的菜品能够被厨房正确准备。

WebRTC的核心API

  • getUserMedia(点菜):

这个API就像顾客在餐厅里点菜。顾客告诉厨房他们想要什么(音频或视频),厨房就会准备好这些食材(获取用户的音频和视频流)。

  • RTCPeerConnection(厨房的工作台):

这个API就像厨房的工作台,负责处理顾客的订单(建立连接)。它确保顾客和厨房之间的交流顺畅,处理音视频流的传输,就像厨房准备和发送食物一样。

  • RTCDataChannel(顾客之间的交流):
    这个API就像顾客之间的对话。顾客可以直接与其他顾客交流(传输数据),比如分享他们的用餐体验或交换食谱,而不需要通过服务员。
    总结
    在这个餐厅的类比中,WebRTC就像一个高效的餐厅,顾客可以直接与厨房和其他顾客交流,享受美食和分享信息,而不需要中介的干预。核心API则是实现这一切的工具,帮助顾客点菜、厨房准备食物和顾客之间的交流。这样,WebRTC使得实时通信变得简单而高效。

现在开始做饭

如果你看到这里,恭喜你,我们达成共识,现在开始做饭。

准备阶段

环境准备

安装Docker、Nginx、Nodejs等,请查询其他文档
  • 一台服务器
  • Debian 12 x86_64 操作系统
  • Docker
  • Nginx
  • Nodejs

服务器搭建

首先我们需要两个服务,STUN/TURN、Signal Server, What's Signal Server? 别紧张我待会,会解释现在我们先专注与STUN/TURN,再次之前我们需要了解  Coturn TURN server(开源框架,感谢开发人员)
  • STUN/TURN
  • Signal Server 信令服务

Coturn TURN server(开源服务) 部署

对的你没有看错,就一行命令,这就是我为什么推荐使用Docker的原因,详细的Dockerfile请看 参考资料
docker run -d --network=host coturn/coturn

测试

打开我们的测试网站
https://webrtc.github.io/samples/src/content/peerconnection/trickle-ice/
添加服务器,等等我们的密码哪里来的?

用户名和密码

用户名和密码在Dockerfile文件里面,我使用的是默认配置,没设置任何配置文件,所以密码是默认密码,自己可以修改
https://github.com/coturn/coturn/blob/master/docker/coturn/debian/Dockerfile

Signal Server信令服务

想象一下,在这个餐厅中,顾客(用户)需要与厨房(对等端)进行交流,但他们并不能直接看到厨房内部情况,信令服务器就像餐厅的接待员或前台,负责协调顾客之间的交流和信息传递。
  • 传递消息,比如顾客A想与顾客B进行视频通话,顾客A请求会先发送到信令服务器,然后由信令服务器转发给顾客B

信令服务与客户端源代码

注意事项: WebRTC需要使用 SSL/TLS 证书,也就是https 协议。

测试

总结

  • 搭建Signal Server信令服务
  • 搭建STUN/TURN 服务
  • Docker 部署Coturn TURN server(节省大量部署时间)

常见问题

  • 稍后补充

源代码

系列文章

DateHistogram
用于根据日期或时间数据进行分桶聚合统计。它允许你将时间序列数据按照指定的时间间隔进行分组,从而生成统计信息,例如每小时、每天、每周或每月的数据分布情况。

Elasticsearch 就支持 DateHistogram 聚合,在关系型数据库中,可以使用
GROUP BY
配合日期函数来实现时间分桶。但是当数据基数特别大时,或者时间分桶较多时,这个聚合速度就非常慢了。如果前端想呈现一个时间分桶的 Panel,这个后端接口的响应速度将非常感人。

我决定用 Flink 做一个实时的 DateHistogram。

实验设计

场景就设定为从 Kafka 消费数据,由 Flink 做实时的时间分桶聚合,将聚合结果写入到 MySQL。

源端-数据准备

Kafka 中的数据格式如下,为简化程序,测试数据做了尽可能的精简:

testdata.json

{
	"gid" : "001254500828905",
	"timestamp" : 1620981709790
}

KafkaDataProducer
源端数据生成程序,要模拟数据乱序到达的情况:

package org.example.test.kafka;

import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.Random;
import java.util.Scanner;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.alibaba.fastjson.JSONObject;

public class KafkaDataProducer {

	private static final String TEST_DATA = "testdata.json";
	private static final String GID = "gid";
	private static final String TIMESTAMP = "timestamp";
	
	// 定义日志颜色
	public static final String reset = "\u001B[0m";
	public static final String red = "\u001B[31m";
	public static final String green = "\u001B[32m";
	
	public static void main(String[] args) throws InterruptedException {
		Properties props = new Properties();
		String topic = "trace-2024";
		props.put("bootstrap.servers", "127.0.0.1:9092");
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		Producer<String, String> producer = new KafkaProducer<String, String>(props);
		
		InputStream inputStream = KafkaDataProducer.class.getClassLoader().getResourceAsStream(TEST_DATA);
		Scanner scanner = new Scanner(inputStream, StandardCharsets.UTF_8.name());
        String content = scanner.useDelimiter("\\A").next();
        scanner.close();
		JSONObject jsonContent = JSONObject.parseObject(content);
		
		int totalNum = 2000;
		Random r = new Random();
		for (int i = 0; i < totalNum; i++) {
			// 对时间进行随机扰动,模拟数据乱序到达
			long current = System.currentTimeMillis() - r.nextInt(60) * 1000;
			jsonContent.put(TIMESTAMP, current);
			producer.send(new ProducerRecord<String, String>(topic, jsonContent.toString()));
			// wait some time
			Thread.sleep(2 * r.nextInt(10));
			System.out.print("\r" + "Send " + green + (i + 1) + "/" + totalNum + reset + " records to Kafka");
		}
		Thread.sleep(2000);
		producer.close();
		System.out.println("发送记录总数: " + totalNum);
	}
}

目标端-表结构设计

MySQL 的表结构设计:

CREATE TABLE `flink`.`datehistogram` (
  `bucket` varchar(255) PRIMARY KEY,
  `count` bigint
);

bucket 列用于存储时间分桶,形如
[09:50:55 - 09:51:00]
,count 列用于存储对应的聚合值。

实现

maven 依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>3.1.2-1.17</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.27</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.72</version>
    </dependency>
</dependencies>

BucketCount
类用于转换 Kafka 中的数据为时间分桶格式,并便于聚合:

package org.example.flink.data;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

public class BucketCount {

	private long timestamp;
	private String bucket;
	private long count;

	public BucketCount(long timestamp) {
		this.timestamp = timestamp;
		this.bucket = formatTimeInterval(timestamp);
		this.count = 1;
	}
	
	public BucketCount(String bucket, long count) {
		this.bucket = bucket;
		this.count = count;
	}
	
	/**
	 * 将时间戳格式化为时间区间格式
	 * 
	 * @param time
	 * @return 例如 [11:28:00 — 11:28:05]
	 */
	private String formatTimeInterval(long time) {
        // 定义输出的日期时间格式
        DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");

        // 将时间戳转换为 LocalDateTime 对象
        LocalDateTime dateTime = Instant.ofEpochMilli(time).atZone(ZoneId.systemDefault()).toLocalDateTime();

        // 提取秒数并计算区间开始时间
        int seconds = dateTime.getSecond();
        int intervalStartSeconds = (seconds / 5) * 5;

        // 创建区间开始和结束时间的 LocalDateTime 对象
        LocalDateTime intervalStartTime = dateTime.withSecond(intervalStartSeconds);
        LocalDateTime intervalEndTime = intervalStartTime.plusSeconds(5);

        // 格式化区间开始和结束时间为字符串
        String startTimeString = intervalStartTime.format(outputFormatter);
        String endTimeString = intervalEndTime.format(outputFormatter);

        // 返回格式化后的时间区间字符串
        return startTimeString + "-" + endTimeString;
    }
    
    // 省略Getter, Setter
}

RealTimeDateHistogram
类完成流计算:

package org.example.flink;

import java.time.Duration;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.example.flink.data.BucketCount;

import com.alibaba.fastjson.JSONObject;

public class RealTimeDateHistogram {

	public static void main(String[] args) throws Exception {
		// 1. prepare
		Configuration configuration = new Configuration();
		configuration.setString("rest.port", "9091");
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
		env.enableCheckpointing(2 * 60 * 1000);
		// 使用rocksDB作为状态后端
		env.setStateBackend(new EmbeddedRocksDBStateBackend());
		
		// 2. Kafka Source
		KafkaSource<String> source = KafkaSource.<String>builder()
			.setBootstrapServers("127.0.0.1:9092")
			.setTopics("trace-2024")
			.setGroupId("group-01")
		    .setStartingOffsets(OffsetsInitializer.latest())
		    .setProperty("commit.offsets.on.checkpoint", "true")
		    .setValueOnlyDeserializer(new SimpleStringSchema())
		    .build();

		DataStreamSource<String> sourceStream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
				"Kafka Source");
		sourceStream.setParallelism(2);	// 设置source算子的并行度为2
		
		// 3. 转换为易于统计的BucketCount对象结构{ bucket: 00:00, count: 200 }
		SingleOutputStreamOperator<BucketCount> mapStream = sourceStream
			.map(new MapFunction<String, BucketCount>() {
				@Override
				public BucketCount map(String value) throws Exception {
					JSONObject jsonObject = JSONObject.parseObject(value);
					long timestamp = jsonObject.getLongValue("timestamp");
					return new BucketCount(timestamp);
				}
			});
		mapStream.name("Map to BucketCount");
		mapStream.setParallelism(2);	// 设置map算子的并行度为2
		
		// 4. 设置eventTime字段作为watermark,要考虑数据乱序到达的情况
		SingleOutputStreamOperator<BucketCount> mapStreamWithWatermark = mapStream
			.assignTimestampsAndWatermarks(
			    WatermarkStrategy.<BucketCount>forBoundedOutOfOrderness(Duration.ofSeconds(60))
					.withIdleness(Duration.ofSeconds(60))
					.withTimestampAssigner(new SerializableTimestampAssigner<BucketCount>() {
						@Override
						public long extractTimestamp(BucketCount bucketCount, long recordTimestamp) {
							// 提取eventTime字段作为watermark
							return bucketCount.getTimestamp();
						}
					}));
		mapStreamWithWatermark.name("Assign EventTime as Watermark");
		
		// 5. 滚动时间窗口聚合
		SingleOutputStreamOperator<BucketCount> windowReducedStream = mapStreamWithWatermark
			.windowAll(TumblingEventTimeWindows.of(Time.seconds(5L)))	// 滚动时间窗口
			.trigger(ProcessingTimeTrigger.create()) // ProcessingTime触发器
			.allowedLateness(Time.seconds(120))  	 // 数据延迟容忍度, 允许数据延迟乱序到达
			.reduce(new ReduceFunction<BucketCount>() {
				@Override
				public BucketCount reduce(BucketCount bucket1, BucketCount bucket2) throws Exception {
					// 将两个bucket合并,count相加
					return new BucketCount(bucket1.getBucket(), bucket1.getCount() + bucket2.getCount());
				}
			});
		windowReducedStream.name("Window Reduce");
		windowReducedStream.setParallelism(1);		// reduce算子的并行度只能是1
		
		// 6. 将结果写入到数据库
		DataStreamSink<BucketCount> sinkStream = windowReducedStream.addSink(
				JdbcSink.sink("insert into flink.datehistogram(bucket, count) values (?, ?) "
						+ "on duplicate key update count = VALUES(count);",
						(statement, bucketCount) -> {
							statement.setString(1, bucketCount.getBucket());
                            statement.setLong(2, bucketCount.getCount());
                        },
						JdbcExecutionOptions.builder()
                        		.withBatchSize(1000)
                        		.withBatchIntervalMs(200)
                        		.withMaxRetries(5)
                        		.build(), 
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            	.withUrl("jdbc:mysql://127.0.0.1:3306/flink")
                            	.withUsername("username")
                            	.withPassword("password")
                            	.build())
				);
		sinkStream.name("Sink DB");
		sinkStream.setParallelism(1);				// sink算子的并行度只能是1
		
		// 执行
		env.execute("Real-Time DateHistogram");
	}
}

几个关键点

window

Flink 的 window 将数据源沿着时间边界,切分成有界的数据块,然后对各个数据块进行处理。下图表示了三种窗口类型:

窗口划分策略比较
  • 固定窗口(又名滚动窗口)
    固定窗口在时间维度上,按照固定长度将无界数据流切片,是一种对齐窗口。窗口紧密排布,首尾无缝衔接,均匀地对数据流进行切分。

  • 滑动窗口
    滑动时间窗口是固定时间窗口的推广,由窗口大小和窗口间隔两个参数共同决定。当窗口间隔小于窗口大小时,窗口之间会出现重叠;当窗口间隔等于窗口大小时,滑动窗口蜕化为固定窗口;当窗口间隔大于窗口大小时,得到的是一个采样窗口。与固定窗口一样,滑动窗口也是一种对齐窗口。

  • 会话窗口
    会话窗口是典型的非对齐窗口。会话由一系列连续发生的事件组成,当事件发生的间隔超过某个超时时间时,意味着一个会话的结束。会话很有趣,例如,我们可以通过将一系列时间相关的事件组合在一起来分析用户的行为。会话的长度不能先验地定义,因为会话长度在不同的数据集之间永远不会相同。

EventTime

数据处理系统中,通常有两个时间域:

  • 事件时间:事件发生的时间,即业务时间。
  • 处理时间:系统发现事件,开始对事件进行处理的时间。

根据事件时间划分窗口
的方式在事件本身的发生时间备受关注时显得格外重要。下图所示为将无界数据根据事件时间切分成 1 小时固定时间窗口:

根据事件时间划分固定窗口

要特别注意箭头中所示的两个事件,两个事件根据处理时间所在的窗口,跟事件时间发生的窗口不是同一个。如果基于处理时间划分窗口的话,结果就是错的。只有基于事件时间进行计算,才能保证数据的正确性。

当然,天下没有免费的午餐。事件时间窗口功能很强大,但由于迟到数据的原因,窗口的存在时间比窗口本身的大小要长很多,导致的两个明显的问题是:

  • 缓存:事件时间窗口需要存储更长时间内的数据。
  • 完整性:基于事件时间的窗口,我们也不能判断什么时候窗口的数据都到齐了。Flink 通过 watermark,能够推断一个相对精确的窗口结束时间。但是这种方式并不能得到完全正确的结果。因此,Flink 还支持让用户能定义何时输出窗口结果,并且定义当迟到数据到来时,如何更新之前窗口计算的结果。

reduce

Reduce 算子基于 ReduceFunction 对集合中的元素进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。常用的聚合方法如 average, sum, min, max, count 都可以使用 reduce 实现。

效果预览

从效果图中可以看出,Sum Panel 中的 stat value(为 DateHistogram 中每个 Bucket 对应值的加和)和 Kafka 端的数据跟进的非常紧,代表 Flink 的处理延迟非常低。向 Kafka 中总计压入的数据量和 Flink 输出的数据总数一致,代表数据的统计结果是准确的。此外,最近一段时间的柱状图都在实时变化,代表 Flink 对迟到的数据按照 EventTime 进行了准确处理,把数据放到了准确的 date bucket 中。

iOS
动态链接器
dyld
中有一个神秘的变量
__dso_handle
:

// dyld/dyldMain.cpp
static const MachOAnalyzer* getDyldMH()
{
#if __LP64__
    // 声明 __dso_handle
    extern const MachOAnalyzer __dso_handle;
    return &__dso_handle;
#else
    ...
#endif // __LP64__
}

这个函数内部声明了一个变量
__dso_handle
,其类型是
struct MachOAnalyzer

查看
struct MachOAnalyzer
的定义,它继承自
struct mach_header
:

image

struct mach_header
正是
XNU
内核里面,定义的
Mach-O
文件头:

// EXTENERL_HEADERS/mach-o/loader.h
struct mach_header {
	uint32_t	magic;		/* mach magic number identifier */
	cpu_type_t	cputype;	/* cpu specifier */
	cpu_subtype_t	cpusubtype;	/* machine specifier */
	uint32_t	filetype;	/* type of file */
	uint32_t	ncmds;		/* number of load commands */
	uint32_t	sizeofcmds;	/* the size of all the load commands */
	uint32_t	flags;		/* flags */
};

从上面函数
getDyldMH
的名字来看,它返回
dyld
这个
Mach-O
文件的文件头,而这确实也符合变量
__dso_handle
的类型定义。

但是奇怪的事情发生了,搜遍整个
dyld
源码库,都无法找到变量
__dso_handle
的定义。所有能搜到的地方,都只是对这个变量
__dso_handle
的声明。

众所周知,动态连接器
dyld
本身是静态链接的。

也就是说,动态连接器
dyld
本身是不依赖任何其他动态库的。

因此,这个变量
__dso_handle
不可能定义在其他动态库。

既然这样,动态链接器
dyld
本身是如何静态链接通过的呢?

答案只可能是静态链接器
ld
在链接过程中做了手脚。

查看静态链接器
ld
的源码,也就是
llvm
的源码,可以找到如下代码:

// lld/MachO/SyntheticSections.cpp
void macho::createSyntheticSymbols() {
  // addHeaderSymbol 的 lamba 表达式
  auto addHeaderSymbol = [](const char *name) {
    symtab->addSynthetic(name, in.header->isec, /*value=*/0,
                         /*isPrivateExtern=*/true, /*includeInSymtab=*/false,
                         /*referencedDynamically=*/false);
  };

  ...

  // The Itanium C++ ABI requires dylibs to pass a pointer to __cxa_atexit
  // which does e.g. cleanup of static global variables. The ABI document
  // says that the pointer can point to any address in one of the dylib's
  // segments, but in practice ld64 seems to set it to point to the header,
  // so that's what's implemented here.
  addHeaderSymbol("___dso_handle");
}

上面代码定义了一个
addHeaderSymbol

lamda
表达式,然后使用它添加了一个符号,这个符号正是
__dso_handle

调用
addHeaderSymbol
上方的注释使用
chatGPT
翻译过来如下:

Itanium C++ ABI 要求动态库传递一个指向 __cxa_atexit 的指针,该函数负责例如静态全局变量的清理。ABI 文档指出,指针可以指向动态库的某个段中的任意地址,但实际上,ld64(苹果的链接器)似乎将其设置为指向头部,所以这里实现了这种做法。

注释中提到的
Itanium C++ ABI
最初是为英特尔和惠普联合开发的
Itanium
处理器架构设计的。

但其影响已经超过了最初设计的架构范围,并被广泛用于其他架构,比如
x86

x86-64
上的多种编译器,包括
GCC

Clang

而且,注释中还提到,
__dso_handle
在苹果的实现里,是指向了
Mach-O
的头部。

至此,谜底解开~。