2024年11月

自研
原生鸿蒙NEXT5.0 API12 ArkTS
仿微信
app聊天
模板
HarmonyOSChat

harmony-wechat
原创重磅实战
纯血鸿蒙OS ArkUI+ArkTs仿微信App
聊天实例。包括
聊天、通讯录、我、朋友圈
等模块,实现类似
微信消息UI布局、编辑器光标处输入文字+emo表情图片/GIF动图、图片预览、红包、语音/位置UI、长按语音面板
等功能。

版本信息

DevEco Studio 5.0.3.906HarmonyOS5.0.0API12 Release SDK
commandline
-tools-windows-x64-5.0.3.906

纯血鸿蒙OS元年已来,华为大力推广自主研发的全场景分布操作系统HarmonyOS,赶快加入鸿蒙原生应用开发,未来可期!

基于鸿蒙os
ArkTs

ArkUI
实现下拉刷新、右键长按/下拉菜单、自定义弹窗、朋友圈等功能。

项目框架结构

基于最新版
DevEco Studio 5.0.3.906
编码工具构建鸿蒙app聊天项目模板。

https://developer.huawei.com/consumer/cn/deveco-studio/

HarmonyOS-Chat聊天app项目已经发布到我的原创作品集,有需要的可以去拍哈~

https://gf.bilibili.com/item/detail/1107424011

如果大家想快速的入门到进阶开发,先把官方文档撸一遍,然后找个实战项目案例练练手。

华为鸿蒙os开发官网
https://developer.huawei.com/consumer/cn/
HarmonyOS开发设计规范
https://developer.huawei.com/consumer/cn/design/
ArkUI方舟UI框架
https://developer.huawei.com/consumer/cn/doc/harmonyos-references-V5/arkui-declarative-comp-V5

路由页面JSON文件

HarmonyOS ArkUI自定义顶部导航条

项目中所有顶部标题导航栏均是自定义封装ArkUI组件实现功能效果。之前有写过一篇专门的分享介绍,感兴趣的可以去看看下面这篇文章。

HarmonyOS NEXT 5.0自定义增强版导航栏组件|鸿蒙ArkUI自定义标题栏

https://www.cnblogs.com/xiaoyan2017/p/18517517

Index.ets入口模板

//自定义页面
@Builder customPage() {if(this.pageIndex === 0) {
IndexPage()
}
else if(this.pageIndex === 1) {
FriendPage()
}
else if(this.pageIndex === 2) {
MyPage()
}
}

build() {
Navigation() {
this.customPage()
}
.toolbarConfiguration(
this.customToolBar)
.height(
'100%')
.width(
'100%')
.backgroundColor($r(
'sys.color.background_secondary'))
.expandSafeArea([SafeAreaType.SYSTEM], [SafeAreaEdge.TOP, SafeAreaEdge.BOTTOM])
}

//自定义底部菜单栏
@Builder customToolBar() {
Row() {
Row() {
Badge({
count:
8,
style: {},
position: BadgePosition.RightTop
}) {
Column({space:
2}) {
SymbolGlyph($r(
'sys.symbol.ellipsis_message_fill'))Text('聊天').fontSize(12)}
}
}
.layoutWeight(
1)
.justifyContent(FlexAlign.Center)
.onClick(()
=>{this.pageIndex = 0})

Row() {
Column({space:
2}) {
SymbolGlyph($r(
'sys.symbol.person_2'))Text('通讯录').fontSize(12)}
}
.layoutWeight(
1)
.justifyContent(FlexAlign.Center)
.onClick(()
=>{this.pageIndex = 1})

Row() {
Badge({
value:
'',
style: { badgeSize:
8, badgeColor: '#fa2a2d'}
}) {
Column({space:
2}) {
SymbolGlyph($r(
'sys.symbol.person_crop_circle_fill_1'))Text('我').fontSize(12)}
}
}
.layoutWeight(
1)
.justifyContent(FlexAlign.Center)
.onClick(()
=>{this.pageIndex = 2})
}
.height(
56)
.width(
'100%')
.backgroundColor($r(
'sys.color.background_secondary'))
.borderWidth({top:
1})
.borderColor($r(
'sys.color.background_tertiary'))
}

HarmonyOS实现登录/注册/倒计时验证

登录模板

/**
* 登录模板
* @author andy
*/import { router, promptAction } from'@kit.ArkUI'@Entry
@Component
struct Login {
@State name: string
= ''@State pwd: string= '' //提交 handleSubmit() {if(this.name === '' || this.pwd === '') {
promptAction.showToast({ message:
'账号或密码不能为空'})
}
else{//登录接口逻辑... promptAction.showToast({ message:'登录成功'})
setTimeout(()
=>{
router.replaceUrl({ url:
'pages/Index'})
},
2000)
}
}

build() {
Column() {
Column({space:
10}) {
Image(
'pages/assets/images/logo.png').height(50).width(50)
Text(
'HarmonyOS-Chat').fontSize(18).fontColor('#0a59f7')
}
.margin({top:
50})
Column({space:
15}) {
TextInput({placeholder:
'请输入账号'})
.onChange((value)
=>{this.name =value
})
TextInput({placeholder:
'请输入密码'}).type(InputType.Password)
.onChange((value)
=>{this.pwd =value
})
Button(
'登录').height(45).width('100%')
.linearGradient({ angle:
135, colors: [['#0a59f7', 0.1], ['#07c160', 1]] })
.onClick(()
=>{this.handleSubmit()
})
}
.margin({top:
30})
.width(
'80%')
Row({space:
15}) {
Text(
'忘记密码').fontSize(14).opacity(0.5)
Text(
'注册账号').fontSize(14).opacity(0.5)
.onClick(()
=>{
router.pushUrl({url:
'pages/views/auth/Register'})
})
}
.margin({top:
20})
}
.height(
'100%')
.width(
'100%')
.expandSafeArea([SafeAreaType.SYSTEM], [SafeAreaEdge.TOP, SafeAreaEdge.BOTTOM])
}
}

Stack({alignContent: Alignment.End}) {
TextInput({placeholder:
'验证码'})
.onChange((value)
=>{this.code =value
})
Button(`${
this.codeText}`).enabled(!this.disabled).controlSize(ControlSize.SMALL).margin({right: 5})
.onClick(()
=>{this.handleVCode()
})
}

鸿蒙arkts实现60s倒计时验证码

//验证码参数
@State codeText: string = '获取验证码'@State disabled:boolean = false@State time: number= 60

//获取验证码
handleVCode() {if(this.tel === '') {
promptAction.showToast({ message:
'请输入手机号'})
}
else if(!checkMobile(this.tel)) {
promptAction.showToast({ message:
'手机号格式错误'})
}
else{
const timer
= setInterval(() =>{if(this.time > 0) {this.disabled = true this.codeText = `获取验证码(${this.time--})`
}
else{
clearInterval(timer)
this.codeText = '获取验证码' this.time = 5 this.disabled = false}
},
1000)
}
}

鸿蒙os下拉刷新/九宫格图像/长按菜单

  • 下拉刷新组件
Refresh({
refreshing: $$
this.isRefreshing,
builder:
this.customRefreshTips
}) {
List() {
ForEach(
this.queryData, (item: RecordArray) =>{
ListItem() {
//... }
.stateStyles({pressed:
this.pressedStyles, normal: this.normalStyles})
.bindContextMenu(
this.customCtxMenu, ResponseType.LongPress)
.onClick(()
=>{//... })
}, (item: RecordArray)
=>item.cid.toString())
}
.height(
'100%')
.width(
'100%')
.backgroundColor(
'#fff')
.divider({ strokeWidth:
1, color: '#f5f5f5', startMargin: 70, endMargin: 0})
.scrollBar(BarState.Off)
}
.pullToRefresh(
true)
.refreshOffset(
64)//当前刷新状态变更时触发回调 .onStateChange((refreshStatus: RefreshStatus) =>{
console.info(
'Refresh onStatueChange state is ' +refreshStatus)this.refreshStatus =refreshStatus
})
//进入刷新状态时触发回调 .onRefreshing(() =>{
console.log(
'onRefreshing...')
setTimeout(()
=>{this.isRefreshing = false},2000)
})
  • 自定义刷新提示
@State isRefreshing: boolean = false@State refreshStatus: number= 1

//自定义刷新tips
@Builder customRefreshTips() {
Stack() {
Row() {
if(this.refreshStatus == 1) {
SymbolGlyph($r(
'sys.symbol.arrow_down')).fontSize(24)
}
else if(this.refreshStatus == 2) {
SymbolGlyph($r(
'sys.symbol.arrow_up')).fontSize(24)
}
else if(this.refreshStatus == 3) {
LoadingProgress().height(
24)
}
else if(this.refreshStatus == 4) {
SymbolGlyph($r(
'sys.symbol.checkmark')).fontSize(24)
}
Text(`${
this.refreshStatus == 1 ? '下拉刷新':this.refreshStatus == 2 ? '释放更新':this.refreshStatus == 3 ? '加载中...':this.refreshStatus == 4 ? '完成' : ''}`).fontSize(16).margin({left:10})
}
.alignItems(VerticalAlign.Center)
}
.align(Alignment.Center)
.clip(
true)
.constraintSize({minHeight:
32})
.width(
'100%')
}
  • 长按右键菜单

.bindContextMenu(
this
.customCtxMenu, ResponseType.LongPress)

//自定义长按右键菜单
@Builder customCtxMenu() {
Menu() {
MenuItem({
content:
'标为已读'})
MenuItem({
content:
'置顶该聊天'})
MenuItem({
content:
'不显示该聊天'})
MenuItem({
content:
'删除'})
}
}
  • 下拉菜单

.bindMenu([ ... ])

Image($r('app.media.plus')).height(24).width(24)
.bindMenu([
{
icon: $r(
'app.media.message_on_message'),
value:
'发起群聊',
action: ()
=>{}
},
{
icon: $r(
'app.media.person_badge_plus'),
value:
'添加朋友',
action: ()
=> router.pushUrl({url: 'pages/views/friends/AddFriend'})
},
{
icon: $r(
'app.media.line_viewfinder'),
value:
'扫一扫',
action: ()
=>{}
},
{
icon: $r(
'app.media.touched'),
value:
'收付款',
action: ()
=>{}
}
])

HarmonyOS arkui自定义dialog弹框组件



支持参数配置如下:

//标题(支持字符串|自定义组件)
@BuilderParam title: ResourceStr | CustomBuilder =BuilderFunction//内容(字符串或无状态组件内容)
@BuilderParam message: ResourceStr | CustomBuilder =BuilderFunction//响应式组件内容(自定义@Builder组件是@State动态内容)
@BuilderParam content: () => void =BuilderFunction//弹窗类型(android | ios | actionSheet)
@Prop type: string//是否显示关闭图标
@Prop closable: boolean
//关闭图标颜色
@Prop closeColor: ResourceColor//是否自定义内容
@Prop custom: boolean
//自定义操作按钮
@BuilderParam buttons: Array<ActionItem> | CustomBuilder = BuilderFunction

调用方式非常简单。

//自定义退出弹窗
logoutController: CustomDialogController = newCustomDialogController({
builder: HMPopup({
type:
'android',
title:
'提示',
message:
'确定要退出当前登录吗?',
buttons: [
{
text:
'取消',
color:
'#999'},
{
text:
'退出',
color:
'#fa2a2d',
action: ()
=>{
router.replaceUrl({url:
'pages/views/auth/Login'})
}
}
]
}),
maskColor:
'#99000000',
cornerRadius:
12,
width:
'75%'})
//自定义公众号弹窗
@Builder customQRContent() {
Column({space:
15}) {
Image(
'pages/assets/images/qrcode.png').height(150).objectFit(ImageFit.Contain)
Text(
'扫一扫,加我公众号').fontSize(14).opacity(.5)
}
}
qrController: CustomDialogController
= newCustomDialogController({
builder: HMPopup({
message:
this.customQRContent,
closable:
true}),
cornerRadius:
12,
width:
'70%'})

好了,以上就是harmonyos next实战开发聊天app的一些知识分享,希望对大家有所帮助~

整个项目涉及到的知识点非常多,限于篇幅就先分享到这里。感谢大家的阅读与支持。

https://www.cnblogs.com/xiaoyan2017/p/18396212

https://www.cnblogs.com/xiaoyan2017/p/18437155

https://www.cnblogs.com/xiaoyan2017/p/18467237

目录

参考资料

什么是WebRTC?

  • WebRTC(Web实时通信)技术
  • 浏览器之间交换任意数据,而无需中介
  • 不需要用户安装插件或任何其他第三方软件

能做什么?

与Media Capture和Streams API一起
  • 支持音频和视频会议
  • 文件交换
  • 屏幕共享
  • 身份管理
  • 以及与传统电话系统的接口,包括支持发送DTMF(按键拨号)信号

架构图

个人理解(类比)

官方文档晦涩难懂,所以按照自己的思路,整理总结。

核心知识点

先整理官方核心知识点,这里不理解,没关系,我们继续按自己的思路总结
  • ICE(框架)允许您的Web浏览器与对等端连接
  • STUN(协议)用于发现您的公共地址并确定路由器中阻止与对等体直接连接的任何限制
  • NAT 用于为您的设备提供公共IP地址
  • TURN 是指通过打开与TURN服务器的连接并通过该服务器中继所有信息来绕过对称NAT限制
  • SDP 从技术上讲会话描述协议(SDP并不是一个真正的协议,而是一种数据格式)

核心知识点类比

我们使用餐厅(或者其他)来类比WebRTC核心概念, 想象一下,你现在正在餐厅里面。
顾客(用户)
可以直接与
厨房(服务器)
进行交流,而不需要通过
服务员(中介)
。在这个餐厅里,顾客可以
点菜(发送音视频请求)

享用美食(接受音视频流)

还可以与其他顾客(其他用户)直接交流(数据传输)
,而这一切都
不要
额外的
工具或设备(插件)

备注:如果你明白,上面描述,那我们就继续。

ICE框架

想象一下ICE就像餐厅
整体布局和设计
,它确保
顾客(用户)
能够顺利找到座位并与
厨房(对等端)
建立联系。ICE负责协调顾客和厨房之间的所有连接方式,确保他们能顺利交流。

STUN(协议)

STUN就像餐厅门口
接待员
,负责帮助顾客找到餐厅的公共入口。接待员会
告诉顾客他们的公共地址(公共IP地址)
,并帮助他们了解是否有任何
障碍(比如路由器的限制、防火墙等)阻止他们直接进入餐厅(与对等端直接连接)

NAT(网络地址转换)

NAT就像餐厅外墙,它为餐厅提供一个
公共门牌号(公共IP地址)
。虽然餐厅内部有很多
桌子(设备)
,但外面的人只知道这个公共门牌号,而不知道内部具体位置。

TURN

TURN就像餐厅的
外卖服务
。如果顾客无法直接进入餐厅(由于对称NAT限制),他们可以选择通过
外卖服务(TURN服务器)来获取食物
。所有的订单和交流通过外卖服务进行,这样即使顾客无法直接到达餐厅,他们仍然可以享用美食。

SDP(会话描述协议)

SDP就像餐厅的
菜单
,它描述了可供选择的菜品和饮料(音视频流的格式和参数),虽然菜单本身不是一个真正协议,但它提供顾客和厨房之间所需的信息,以便他们能达成共识,确保顾客点的菜品能够被厨房正确准备。

WebRTC的核心API

  • getUserMedia(点菜):

这个API就像顾客在餐厅里点菜。顾客告诉厨房他们想要什么(音频或视频),厨房就会准备好这些食材(获取用户的音频和视频流)。

  • RTCPeerConnection(厨房的工作台):

这个API就像厨房的工作台,负责处理顾客的订单(建立连接)。它确保顾客和厨房之间的交流顺畅,处理音视频流的传输,就像厨房准备和发送食物一样。

  • RTCDataChannel(顾客之间的交流):
    这个API就像顾客之间的对话。顾客可以直接与其他顾客交流(传输数据),比如分享他们的用餐体验或交换食谱,而不需要通过服务员。
    总结
    在这个餐厅的类比中,WebRTC就像一个高效的餐厅,顾客可以直接与厨房和其他顾客交流,享受美食和分享信息,而不需要中介的干预。核心API则是实现这一切的工具,帮助顾客点菜、厨房准备食物和顾客之间的交流。这样,WebRTC使得实时通信变得简单而高效。

现在开始做饭

如果你看到这里,恭喜你,我们达成共识,现在开始做饭。

准备阶段

环境准备

安装Docker、Nginx、Nodejs等,请查询其他文档
  • 一台服务器
  • Debian 12 x86_64 操作系统
  • Docker
  • Nginx
  • Nodejs

服务器搭建

首先我们需要两个服务,STUN/TURN、Signal Server, What's Signal Server? 别紧张我待会,会解释现在我们先专注与STUN/TURN,再次之前我们需要了解  Coturn TURN server(开源框架,感谢开发人员)
  • STUN/TURN
  • Signal Server 信令服务

Coturn TURN server(开源服务) 部署

对的你没有看错,就一行命令,这就是我为什么推荐使用Docker的原因,详细的Dockerfile请看 参考资料
docker run -d --network=host coturn/coturn

测试

打开我们的测试网站
https://webrtc.github.io/samples/src/content/peerconnection/trickle-ice/
添加服务器,等等我们的密码哪里来的?

用户名和密码

用户名和密码在Dockerfile文件里面,我使用的是默认配置,没设置任何配置文件,所以密码是默认密码,自己可以修改
https://github.com/coturn/coturn/blob/master/docker/coturn/debian/Dockerfile

Signal Server信令服务

想象一下,在这个餐厅中,顾客(用户)需要与厨房(对等端)进行交流,但他们并不能直接看到厨房内部情况,信令服务器就像餐厅的接待员或前台,负责协调顾客之间的交流和信息传递。
  • 传递消息,比如顾客A想与顾客B进行视频通话,顾客A请求会先发送到信令服务器,然后由信令服务器转发给顾客B

信令服务与客户端源代码

注意事项: WebRTC需要使用 SSL/TLS 证书,也就是https 协议。

测试

总结

  • 搭建Signal Server信令服务
  • 搭建STUN/TURN 服务
  • Docker 部署Coturn TURN server(节省大量部署时间)

常见问题

  • 稍后补充

源代码

系列文章

DateHistogram
用于根据日期或时间数据进行分桶聚合统计。它允许你将时间序列数据按照指定的时间间隔进行分组,从而生成统计信息,例如每小时、每天、每周或每月的数据分布情况。

Elasticsearch 就支持 DateHistogram 聚合,在关系型数据库中,可以使用
GROUP BY
配合日期函数来实现时间分桶。但是当数据基数特别大时,或者时间分桶较多时,这个聚合速度就非常慢了。如果前端想呈现一个时间分桶的 Panel,这个后端接口的响应速度将非常感人。

我决定用 Flink 做一个实时的 DateHistogram。

实验设计

场景就设定为从 Kafka 消费数据,由 Flink 做实时的时间分桶聚合,将聚合结果写入到 MySQL。

源端-数据准备

Kafka 中的数据格式如下,为简化程序,测试数据做了尽可能的精简:

testdata.json

{
	"gid" : "001254500828905",
	"timestamp" : 1620981709790
}

KafkaDataProducer
源端数据生成程序,要模拟数据乱序到达的情况:

package org.example.test.kafka;

import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.Random;
import java.util.Scanner;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.alibaba.fastjson.JSONObject;

public class KafkaDataProducer {

	private static final String TEST_DATA = "testdata.json";
	private static final String GID = "gid";
	private static final String TIMESTAMP = "timestamp";
	
	// 定义日志颜色
	public static final String reset = "\u001B[0m";
	public static final String red = "\u001B[31m";
	public static final String green = "\u001B[32m";
	
	public static void main(String[] args) throws InterruptedException {
		Properties props = new Properties();
		String topic = "trace-2024";
		props.put("bootstrap.servers", "127.0.0.1:9092");
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		Producer<String, String> producer = new KafkaProducer<String, String>(props);
		
		InputStream inputStream = KafkaDataProducer.class.getClassLoader().getResourceAsStream(TEST_DATA);
		Scanner scanner = new Scanner(inputStream, StandardCharsets.UTF_8.name());
        String content = scanner.useDelimiter("\\A").next();
        scanner.close();
		JSONObject jsonContent = JSONObject.parseObject(content);
		
		int totalNum = 2000;
		Random r = new Random();
		for (int i = 0; i < totalNum; i++) {
			// 对时间进行随机扰动,模拟数据乱序到达
			long current = System.currentTimeMillis() - r.nextInt(60) * 1000;
			jsonContent.put(TIMESTAMP, current);
			producer.send(new ProducerRecord<String, String>(topic, jsonContent.toString()));
			// wait some time
			Thread.sleep(2 * r.nextInt(10));
			System.out.print("\r" + "Send " + green + (i + 1) + "/" + totalNum + reset + " records to Kafka");
		}
		Thread.sleep(2000);
		producer.close();
		System.out.println("发送记录总数: " + totalNum);
	}
}

目标端-表结构设计

MySQL 的表结构设计:

CREATE TABLE `flink`.`datehistogram` (
  `bucket` varchar(255) PRIMARY KEY,
  `count` bigint
);

bucket 列用于存储时间分桶,形如
[09:50:55 - 09:51:00]
,count 列用于存储对应的聚合值。

实现

maven 依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>3.1.2-1.17</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.27</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.72</version>
    </dependency>
</dependencies>

BucketCount
类用于转换 Kafka 中的数据为时间分桶格式,并便于聚合:

package org.example.flink.data;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

public class BucketCount {

	private long timestamp;
	private String bucket;
	private long count;

	public BucketCount(long timestamp) {
		this.timestamp = timestamp;
		this.bucket = formatTimeInterval(timestamp);
		this.count = 1;
	}
	
	public BucketCount(String bucket, long count) {
		this.bucket = bucket;
		this.count = count;
	}
	
	/**
	 * 将时间戳格式化为时间区间格式
	 * 
	 * @param time
	 * @return 例如 [11:28:00 — 11:28:05]
	 */
	private String formatTimeInterval(long time) {
        // 定义输出的日期时间格式
        DateTimeFormatter outputFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");

        // 将时间戳转换为 LocalDateTime 对象
        LocalDateTime dateTime = Instant.ofEpochMilli(time).atZone(ZoneId.systemDefault()).toLocalDateTime();

        // 提取秒数并计算区间开始时间
        int seconds = dateTime.getSecond();
        int intervalStartSeconds = (seconds / 5) * 5;

        // 创建区间开始和结束时间的 LocalDateTime 对象
        LocalDateTime intervalStartTime = dateTime.withSecond(intervalStartSeconds);
        LocalDateTime intervalEndTime = intervalStartTime.plusSeconds(5);

        // 格式化区间开始和结束时间为字符串
        String startTimeString = intervalStartTime.format(outputFormatter);
        String endTimeString = intervalEndTime.format(outputFormatter);

        // 返回格式化后的时间区间字符串
        return startTimeString + "-" + endTimeString;
    }
    
    // 省略Getter, Setter
}

RealTimeDateHistogram
类完成流计算:

package org.example.flink;

import java.time.Duration;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.example.flink.data.BucketCount;

import com.alibaba.fastjson.JSONObject;

public class RealTimeDateHistogram {

	public static void main(String[] args) throws Exception {
		// 1. prepare
		Configuration configuration = new Configuration();
		configuration.setString("rest.port", "9091");
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
		env.enableCheckpointing(2 * 60 * 1000);
		// 使用rocksDB作为状态后端
		env.setStateBackend(new EmbeddedRocksDBStateBackend());
		
		// 2. Kafka Source
		KafkaSource<String> source = KafkaSource.<String>builder()
			.setBootstrapServers("127.0.0.1:9092")
			.setTopics("trace-2024")
			.setGroupId("group-01")
		    .setStartingOffsets(OffsetsInitializer.latest())
		    .setProperty("commit.offsets.on.checkpoint", "true")
		    .setValueOnlyDeserializer(new SimpleStringSchema())
		    .build();

		DataStreamSource<String> sourceStream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
				"Kafka Source");
		sourceStream.setParallelism(2);	// 设置source算子的并行度为2
		
		// 3. 转换为易于统计的BucketCount对象结构{ bucket: 00:00, count: 200 }
		SingleOutputStreamOperator<BucketCount> mapStream = sourceStream
			.map(new MapFunction<String, BucketCount>() {
				@Override
				public BucketCount map(String value) throws Exception {
					JSONObject jsonObject = JSONObject.parseObject(value);
					long timestamp = jsonObject.getLongValue("timestamp");
					return new BucketCount(timestamp);
				}
			});
		mapStream.name("Map to BucketCount");
		mapStream.setParallelism(2);	// 设置map算子的并行度为2
		
		// 4. 设置eventTime字段作为watermark,要考虑数据乱序到达的情况
		SingleOutputStreamOperator<BucketCount> mapStreamWithWatermark = mapStream
			.assignTimestampsAndWatermarks(
			    WatermarkStrategy.<BucketCount>forBoundedOutOfOrderness(Duration.ofSeconds(60))
					.withIdleness(Duration.ofSeconds(60))
					.withTimestampAssigner(new SerializableTimestampAssigner<BucketCount>() {
						@Override
						public long extractTimestamp(BucketCount bucketCount, long recordTimestamp) {
							// 提取eventTime字段作为watermark
							return bucketCount.getTimestamp();
						}
					}));
		mapStreamWithWatermark.name("Assign EventTime as Watermark");
		
		// 5. 滚动时间窗口聚合
		SingleOutputStreamOperator<BucketCount> windowReducedStream = mapStreamWithWatermark
			.windowAll(TumblingEventTimeWindows.of(Time.seconds(5L)))	// 滚动时间窗口
			.trigger(ProcessingTimeTrigger.create()) // ProcessingTime触发器
			.allowedLateness(Time.seconds(120))  	 // 数据延迟容忍度, 允许数据延迟乱序到达
			.reduce(new ReduceFunction<BucketCount>() {
				@Override
				public BucketCount reduce(BucketCount bucket1, BucketCount bucket2) throws Exception {
					// 将两个bucket合并,count相加
					return new BucketCount(bucket1.getBucket(), bucket1.getCount() + bucket2.getCount());
				}
			});
		windowReducedStream.name("Window Reduce");
		windowReducedStream.setParallelism(1);		// reduce算子的并行度只能是1
		
		// 6. 将结果写入到数据库
		DataStreamSink<BucketCount> sinkStream = windowReducedStream.addSink(
				JdbcSink.sink("insert into flink.datehistogram(bucket, count) values (?, ?) "
						+ "on duplicate key update count = VALUES(count);",
						(statement, bucketCount) -> {
							statement.setString(1, bucketCount.getBucket());
                            statement.setLong(2, bucketCount.getCount());
                        },
						JdbcExecutionOptions.builder()
                        		.withBatchSize(1000)
                        		.withBatchIntervalMs(200)
                        		.withMaxRetries(5)
                        		.build(), 
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            	.withUrl("jdbc:mysql://127.0.0.1:3306/flink")
                            	.withUsername("username")
                            	.withPassword("password")
                            	.build())
				);
		sinkStream.name("Sink DB");
		sinkStream.setParallelism(1);				// sink算子的并行度只能是1
		
		// 执行
		env.execute("Real-Time DateHistogram");
	}
}

几个关键点

window

Flink 的 window 将数据源沿着时间边界,切分成有界的数据块,然后对各个数据块进行处理。下图表示了三种窗口类型:

窗口划分策略比较
  • 固定窗口(又名滚动窗口)
    固定窗口在时间维度上,按照固定长度将无界数据流切片,是一种对齐窗口。窗口紧密排布,首尾无缝衔接,均匀地对数据流进行切分。

  • 滑动窗口
    滑动时间窗口是固定时间窗口的推广,由窗口大小和窗口间隔两个参数共同决定。当窗口间隔小于窗口大小时,窗口之间会出现重叠;当窗口间隔等于窗口大小时,滑动窗口蜕化为固定窗口;当窗口间隔大于窗口大小时,得到的是一个采样窗口。与固定窗口一样,滑动窗口也是一种对齐窗口。

  • 会话窗口
    会话窗口是典型的非对齐窗口。会话由一系列连续发生的事件组成,当事件发生的间隔超过某个超时时间时,意味着一个会话的结束。会话很有趣,例如,我们可以通过将一系列时间相关的事件组合在一起来分析用户的行为。会话的长度不能先验地定义,因为会话长度在不同的数据集之间永远不会相同。

EventTime

数据处理系统中,通常有两个时间域:

  • 事件时间:事件发生的时间,即业务时间。
  • 处理时间:系统发现事件,开始对事件进行处理的时间。

根据事件时间划分窗口
的方式在事件本身的发生时间备受关注时显得格外重要。下图所示为将无界数据根据事件时间切分成 1 小时固定时间窗口:

根据事件时间划分固定窗口

要特别注意箭头中所示的两个事件,两个事件根据处理时间所在的窗口,跟事件时间发生的窗口不是同一个。如果基于处理时间划分窗口的话,结果就是错的。只有基于事件时间进行计算,才能保证数据的正确性。

当然,天下没有免费的午餐。事件时间窗口功能很强大,但由于迟到数据的原因,窗口的存在时间比窗口本身的大小要长很多,导致的两个明显的问题是:

  • 缓存:事件时间窗口需要存储更长时间内的数据。
  • 完整性:基于事件时间的窗口,我们也不能判断什么时候窗口的数据都到齐了。Flink 通过 watermark,能够推断一个相对精确的窗口结束时间。但是这种方式并不能得到完全正确的结果。因此,Flink 还支持让用户能定义何时输出窗口结果,并且定义当迟到数据到来时,如何更新之前窗口计算的结果。

reduce

Reduce 算子基于 ReduceFunction 对集合中的元素进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。常用的聚合方法如 average, sum, min, max, count 都可以使用 reduce 实现。

效果预览

从效果图中可以看出,Sum Panel 中的 stat value(为 DateHistogram 中每个 Bucket 对应值的加和)和 Kafka 端的数据跟进的非常紧,代表 Flink 的处理延迟非常低。向 Kafka 中总计压入的数据量和 Flink 输出的数据总数一致,代表数据的统计结果是准确的。此外,最近一段时间的柱状图都在实时变化,代表 Flink 对迟到的数据按照 EventTime 进行了准确处理,把数据放到了准确的 date bucket 中。

iOS
动态链接器
dyld
中有一个神秘的变量
__dso_handle
:

// dyld/dyldMain.cpp
static const MachOAnalyzer* getDyldMH()
{
#if __LP64__
    // 声明 __dso_handle
    extern const MachOAnalyzer __dso_handle;
    return &__dso_handle;
#else
    ...
#endif // __LP64__
}

这个函数内部声明了一个变量
__dso_handle
,其类型是
struct MachOAnalyzer

查看
struct MachOAnalyzer
的定义,它继承自
struct mach_header
:

image

struct mach_header
正是
XNU
内核里面,定义的
Mach-O
文件头:

// EXTENERL_HEADERS/mach-o/loader.h
struct mach_header {
	uint32_t	magic;		/* mach magic number identifier */
	cpu_type_t	cputype;	/* cpu specifier */
	cpu_subtype_t	cpusubtype;	/* machine specifier */
	uint32_t	filetype;	/* type of file */
	uint32_t	ncmds;		/* number of load commands */
	uint32_t	sizeofcmds;	/* the size of all the load commands */
	uint32_t	flags;		/* flags */
};

从上面函数
getDyldMH
的名字来看,它返回
dyld
这个
Mach-O
文件的文件头,而这确实也符合变量
__dso_handle
的类型定义。

但是奇怪的事情发生了,搜遍整个
dyld
源码库,都无法找到变量
__dso_handle
的定义。所有能搜到的地方,都只是对这个变量
__dso_handle
的声明。

众所周知,动态连接器
dyld
本身是静态链接的。

也就是说,动态连接器
dyld
本身是不依赖任何其他动态库的。

因此,这个变量
__dso_handle
不可能定义在其他动态库。

既然这样,动态链接器
dyld
本身是如何静态链接通过的呢?

答案只可能是静态链接器
ld
在链接过程中做了手脚。

查看静态链接器
ld
的源码,也就是
llvm
的源码,可以找到如下代码:

// lld/MachO/SyntheticSections.cpp
void macho::createSyntheticSymbols() {
  // addHeaderSymbol 的 lamba 表达式
  auto addHeaderSymbol = [](const char *name) {
    symtab->addSynthetic(name, in.header->isec, /*value=*/0,
                         /*isPrivateExtern=*/true, /*includeInSymtab=*/false,
                         /*referencedDynamically=*/false);
  };

  ...

  // The Itanium C++ ABI requires dylibs to pass a pointer to __cxa_atexit
  // which does e.g. cleanup of static global variables. The ABI document
  // says that the pointer can point to any address in one of the dylib's
  // segments, but in practice ld64 seems to set it to point to the header,
  // so that's what's implemented here.
  addHeaderSymbol("___dso_handle");
}

上面代码定义了一个
addHeaderSymbol

lamda
表达式,然后使用它添加了一个符号,这个符号正是
__dso_handle

调用
addHeaderSymbol
上方的注释使用
chatGPT
翻译过来如下:

Itanium C++ ABI 要求动态库传递一个指向 __cxa_atexit 的指针,该函数负责例如静态全局变量的清理。ABI 文档指出,指针可以指向动态库的某个段中的任意地址,但实际上,ld64(苹果的链接器)似乎将其设置为指向头部,所以这里实现了这种做法。

注释中提到的
Itanium C++ ABI
最初是为英特尔和惠普联合开发的
Itanium
处理器架构设计的。

但其影响已经超过了最初设计的架构范围,并被广泛用于其他架构,比如
x86

x86-64
上的多种编译器,包括
GCC

Clang

而且,注释中还提到,
__dso_handle
在苹果的实现里,是指向了
Mach-O
的头部。

至此,谜底解开~。

概述

使用 explain 输出 SELECT 语句执行的详细信息,包括以下信息:

  • 表的加载顺序
  • sql 的查询类型
  • 可能用到哪些索引,实际上用到哪些索引
  • 读取的行数

Explain 执行计划包含字段信息如下:分别是 id、select_type、table、partitions、type、possible_keys、key、key_len、ref、rows、filtered、Extra 12个字段。

通过explain extended + show warnings可以在原本explain的基础上额外提供一些查询优化的信息,得到优化以后的可能的查询语句(不一定是最终优化的结果)。

测试环境:

CREATE TABLE `blog` (
  `blog_id` int NOT NULL AUTO_INCREMENT COMMENT '唯一博文id--主键',
  `blog_title` varchar(255) NOT NULL COMMENT '博文标题',
  `blog_body` text NOT NULL COMMENT '博文内容',
  `blog_time` datetime NOT NULL COMMENT '博文发布时间',
  `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  `blog_state` int NOT NULL COMMENT '博文状态--0 删除 1正常',
  `user_id` int NOT NULL COMMENT '用户id',
  PRIMARY KEY (`blog_id`)
) ENGINE=InnoDB AUTO_INCREMENT=17 DEFAULT CHARSET=utf8

CREATE TABLE `user` (
  `user_id` int NOT NULL AUTO_INCREMENT COMMENT '用户唯一id--主键',
  `user_name` varchar(30) NOT NULL COMMENT '用户名--不能重复',
  `user_password` varchar(255) NOT NULL COMMENT '用户密码',
  PRIMARY KEY (`user_id`),
  KEY `name` (`user_name`)
) ENGINE=InnoDB AUTO_INCREMENT=17 DEFAULT CHARSET=utf8

CREATE TABLE `discuss` (
  `discuss_id` int NOT NULL AUTO_INCREMENT COMMENT '评论唯一id',
  `discuss_body` varchar(255) NOT NULL COMMENT '评论内容',
  `discuss_time` datetime NOT NULL COMMENT '评论时间',
  `user_id` int NOT NULL COMMENT '用户id',
  `blog_id` int NOT NULL COMMENT '博文id',
  PRIMARY KEY (`discuss_id`)
) ENGINE=InnoDB AUTO_INCREMENT=61 DEFAULT CHARSET=utf8

id

表示查询中执行select子句或者操作表的顺序,id的值越大,代表优先级越高,越先执行

explain select discuss_body 
from discuss 
where blog_id = (
    select blog_id from blog where user_id = (
        select user_id from user where user_name = 'admin'));

三个表依次嵌套,发现最里层的子查询 id最大,最先执行。

select_type

表示 select 查询的类型,主要是用于区分各种复杂的查询,例如:普通查询、联合查询、子查询等。

  • SIMPLE:表示最简单的 select 查询语句,在查询中不包含子查询或者交并差集等操作。
  • PRIMARY:查询中最外层的SELECT(存在子查询的外层的表操作为PRIMARY)。
  • SUBQUERY:子查询中首个SELECT。
  • DERIVED:被驱动的SELECT子查询(子查询位于FROM子句)。
  • UNION:在SELECT之后使用了UNION

table

查询的表名,并不一定是真实存在的表,有别名显示别名,也可能为临时表。当from子句中有子查询时,table列是<derivenN>的格式,表示当前查询依赖 id为N的查询,会先执行 id为N的查询。

partitions

查询时匹配到的分区信息,对于非分区表值为NULL,当查询的是分区表时,partitions显示分区表命中的分区情况。

type

查询使用了何种类型,它在 SQL优化中是一个非常重要的指标

访问效率:const > eq_ref > ref > range > index > ALL

system

当表仅有一行记录时(系统表),数据量很少,往往不需要进行磁盘IO,速度非常快。比如,Mysql系统表proxies_priv在Mysql服务启动时候已经加载在内存中,对这个表进行查询不需要进行磁盘 IO。

const

单表操作的时候,查询使用了主键或者唯一索引。

eq_ref

多表关联查询的时候,主键和唯一索引作为关联条件。如下图的sql,对于user表(外循环)的每一行,user_role表(内循环)只有一行满足join条件,只要查找到这行记录,就会跳出内循环,继续外循环的下一轮查询。

ref

查找条件列使用了索引而且不为主键和唯一索引。虽然使用了索引,但该索引列的值并不唯一,这样即使使用索引查找到了第一条数据,仍然不能停止,要在目标值附近进行小范围扫描。但它的好处是不需要扫全表,因为索引是有序的,即便有重复值,也是在一个非常小的范围内做扫描。

ref_or_null

类似 ref,会额外搜索包含NULL值的行

index_merge

使用了索引合并优化方法,查询使用了两个以上的索引。新建comment表,id为主键,value_id为非唯一索引,执行explain select content from comment where value_id = 1181000 and id > 1000;,执行结果显示查询同时使用了id和value_id索引,type列的值为index_merge。

range

有范围的索引扫描,相对于index的全索引扫描,它有范围限制,因此要优于index。像between、and、>、<、in和or都是范围索引扫描。

index

index包括select索引列,order by主键两种情况。

order by主键。这种情况会按照索引顺序全表扫描数据,拿到的数据是按照主键排好序的,不需要额外进行排序。

select索引列。type为index,而且extra字段为using index,也称这种情况为索引覆盖。所需要取的数据都在索引列,无需回表查询。

all

全表扫描,查询没有用到索引,性能最差。

possible_keys

此次查询中可能选用的索引。但这个索引并不定一会是最终查询数据时所被用到的索引。

key

此次查询中确切使用到的索引

ref

ref 列显示使用哪个列或常数与key一起从表中选择数据行。常见的值有const、func、NULL、具体字段名。当 key 列为 NULL,即不使用索引时。如果值是func,则使用的值是某个函数的结果。

以下SQL的执行计划ref为const,因为使用了组合索引(user_id, blog_id),where user_id = 13中13为常量

mysql> explain select blog_id from user_like where user_id = 13;
+----+-------------+-----------+------------+------+---------------+------+---------+-------+------+----------+-------------+
| id | select_type | table     | partitions | type | possible_keys | key  | key_len | ref   | rows | filtered | Extra       |
+----+-------------+-----------+------------+------+---------------+------+---------+-------+------+----------+-------------+
|  1 | SIMPLE      | user_like | NULL       | ref  | ul1,ul2       | ul1  | 4       | const |    2 |   100.00 | Using index |
+----+-------------+-----------+------------+------+---------------+------+---------+-------+------+----------+-------------+

而下面这个SQL的执行计划ref值为NULL,因为key为NULL,查询没有用到索引。

mysql> explain select user_id from user_like where status = 1;
+----+-------------+-----------+------------+------+---------------+------+---------+------+------+----------+-------------+
| id | select_type | table     | partitions | type | possible_keys | key  | key_len | ref  | rows | filtered | Extra       |
+----+-------------+-----------+------------+------+---------------+------+---------+------+------+----------+-------------+
|  1 | SIMPLE      | user_like | NULL       | ALL  | NULL          | NULL | NULL    | NULL |    6 |    16.67 | Using where |
+----+-------------+-----------+------------+------+---------------+------+---------+------+------+----------+-------------+

rows

估算要找到所需的记录,需要读取的行数。评估SQL 性能的一个比较重要的数据,mysql需要扫描的行数,很直观的显示 SQL 性能的好坏,一般情况下 rows 值越小越好

filtered

存储引擎返回的数据在经过过滤后,剩下满足条件的记录数量的比例

extra

表示额外的信息说明。为了方便测试,这里新建两张表。

CREATE TABLE `t_order` (
  `id` int NOT NULL AUTO_INCREMENT,
  `user_id` int DEFAULT NULL,
  `order_id` int DEFAULT NULL,
  `order_status` tinyint DEFAULT NULL,
  `create_date` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_userid_order_id_createdate` (`user_id`,`order_id`,`create_date`)
) ENGINE=InnoDB AUTO_INCREMENT=99 DEFAULT CHARSET=utf8

CREATE TABLE `t_orderdetail` (
  `id` int NOT NULL AUTO_INCREMENT,
  `order_id` int DEFAULT NULL,
  `product_name` varchar(100) DEFAULT NULL,
  `cnt` int DEFAULT NULL,
  `create_date` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_orderid_productname` (`order_id`,`product_name`)
) ENGINE=InnoDB AUTO_INCREMENT=152 DEFAULT CHARSET=utf8

using where

表示在查询过程中使用了WHERE条件进行数据过滤。当一 个查询中包含WHERE条件时,MySQL会根据该条件过滤出满足条件的数据行,然后再进行后续的操作。这个过程 就被称为"Using Where”。

表示查询的列未被索引覆盖,,且where筛选条件是索引列前导列的一个范围,或者是索引列的非前导列,或者是非索引列。对存储引擎返回的结果进行过滤(Post-filter,后过滤),一般发生在MySQL服务器,而不是存储引擎层,因此需要回表查询数据。

using index

查询的列被索引覆盖,并且where筛选条件符合最左前缀原则,通过索引查找就能直接找到符合条件的数据,
不需要回表
查询数据。

Using where&Using index

查询的列被索引覆盖,但无法通过索引查找找到符合条件的数据,不过可以通过索引扫描找到符合条件的数据,也不需要回表查询数据。

包括两种情况(组合索引为(user_id, orde)):

where筛选条件不符合最左前缀原则

where筛选条件是索引列前导列的一个范围

null

查询的列未被索引覆盖,并且where筛选条件是索引的前导列,也就是用到了索引,但是部分字段未被索引覆盖,必须回表查询这些字段,Extra中为NULL。

using index condition

索引下推(index condition pushdown,ICP),先使用where条件过滤索引,过滤完索引后找到所有符合索引条件的数据行,随后用 WHERE 子句中的其他条件去过滤这些数据行。

对于联合索引(a, b),在执行 select * from table where a > 1 and b = 2 语句的时候,只有 a 字段能用到索引,那在联合索引的 B+Tree 找到第一个满足条件的主键值(ID 为 2)后,还需要判断其他条件是否满足(看 b 是否等于 2),那是在联合索引里判断?还是回主键索引去判断呢?
MySQL 5.6 引入的索引下推优化(index condition pushdown), 可以在联合索引遍历过程中,对联合索引中包含的字段先做判断,直接过滤掉不满足条件的记录,减少回表次数。

不使用ICP的情况(set optimizer_switch='index_condition_pushdown=off'),如下图,在步骤4中,没有使用where条件过滤索引:

使用ICP的情况(set optimizer_switch='index_condition_pushdown=on'):

下面的例子使用了ICP:

explain select user_id, order_id, order_status from t_order where user_id > 1 and user_id < 5\G;

关掉ICP之后(set optimizer_switch='index_condition_pushdown=off'),可以看到extra列为using where,不会使用索引下推。

using temporary

使用了临时表保存中间结果,常见于 order by 和 group by 中。典型的,当group by和order by同时存在,且作用于不同的字段时,就会建立临时表,以便计算出最终的结果集

filesort

文件排序。表示无法利用索引完成排序操作,以下情况会导致filesort:

  • order by 的字段不是索引字段
  • select 查询字段不全是索引字段
  • select 查询字段都是索引字段,但是 order by 字段和索引字段的顺序不一致

using join buffer

Block Nested Loop,需要进行嵌套循环计算。两个关联表join,关联字段均未建立索引,就会出现这种情况。比如内层和外层的type均为ALL,rows均为4,需要循环进行
4*4
次计算。常见的优化方案是,在关联字段上添加索引,避免每次嵌套循环计算。

面试题专栏

Java面试题专栏
已上线,欢迎访问。

  • 如果你不知道简历怎么写,简历项目不知道怎么包装;
  • 如果简历中有些内容你不知道该不该写上去;
  • 如果有些综合性问题你不知道怎么答;

那么可以私信我,我会尽我所能帮助你。