2024年7月

前言

在实际项目开发中,需求变更和项目迭代是常态。要求我们能够迅速响应,对数据库结构进行相应的调整,如添加新表、更新现有表结构或增加字段等。

为了确保团队成员之间的信息同步,实时更新和维护数据库文档变得至关重要。这不仅提升了数据库的可读性,也极大提高了开发效率和团队协作的流畅性。

SmartSQL,一款专为.NET平台设计的开源数据库文档工具,它通过简化数据库文档的查询、生成和管理流程,大大减少了手动文档维护的负担,方便信息同步。

本文将深入探讨SmartSQL的功能特性、实际应用案例,并展示如何利用这一工具优化数据库文档的管理流程。

项目介绍

SmartSQL 是一款方便、快捷的数据库文档查询、导出工具!从最初仅支持SqlServer数据库、CHM文档格式开始,通过不断地探索开发、集思广益和不断改进,又陆续支持Word、Excel、PDF、Html、Xml、Json、MarkDown等文档格式的导出。同时又扩展支持包括SqlServer、MySql、PostgreSQL、SQLite等多种数据库的文档查询和导出功能。

SmartSQL的核心优势在于其便捷性和高效率。用户可以轻松访问和理解数据库结构,工具自动生成的详尽文档支持持续更新,确保了信息的时效性和准确性。

功能架构

主要功能

工具支持哪些数据库?

目前支持的数据库 SqlServer、MySQL、Oracle、PostgreSQL、SQLite、达梦

文档的内容都包含什么?

表 序号 | 列名 | 主键 | 自增 | 数据类型 | 长度 | 允许NULL值 | 默认值 | 备注说明
视图 视图内容SQL脚本
存储过程 存储过程内容SQL脚本

支持导出哪些文档格式?

Word、Excel、PDF、Html、Xml、Json、MarkDown

更新表列的注释,有哪些方式?

通过文件➡导入备注,选择文件导入进行更新批注(注释):

  • pdm 由
    PowerDesigner
    设计数据库时产生。
  • xml 由
    Visual Studio
    设置 实体类库的项目属性,勾选
    XML文档文件
    后生成项目时产生。
  • xml 由
    SmartSQL
    的 XML导出而产生。

什么是分组管理

可以对数据库中的表、视图、存储过程进行自定义分组

可以对分组对象进行文档批量导出

功能介绍


主界面


工具菜单


快捷查询


导入导出


HTML 文档


Word 文档



Excel 文档

PDF 文档

项目地址

文件下载解压后,双击运行 SmartSQL.exe 即可

如果觉得这篇文章对你有用,欢迎加入微信公众号 [
DotNet技术匠
] 社区,与其他热爱技术的同行交流心得,共同成长。

一转眼go1.23都快发布了,时间过得真快。

不过今天我们把时间倒流回三年半之前,来关注一个在go1.16引入的关于处理目录时的优化。

对于go1.16的新变化,大家印象最深的可能是io包的大规模重构,但这个重构实际上还引进了一个优化,这篇文章要说的就是这个优化。

本文默认Linux环境,不过这个优化在BSD系统上也是通用的。

遍历目录时的优化

遍历目录是个很常见的需求,尤其是对于有大量文件的目录来说,遍历的性能直接关系到了整体程序的性能。

go1.16对于遍历目录增加了几个新接口:
os.ReadDir

(*os.File).ReadDir

filepath.WalkDir

这几个接口最大的特征是对目录项使用
fs.DirEntry
表示而不是
os.FileInfo

fs.DirEntry
是一个接口,它提供了类似
os.FileInfo
的方法:

type DirEntry interface {
        Name() string
        IsDir() bool
        Type() FileMode
        Info() (FileInfo, error)
}

它还提供了一个叫Info的方法以便获得
os.FileInfo

这个接口有什么神奇的呢?我们看下性能测试:

func IterateDir(path string) int {
    // go1.16 的 os.ReadDir 就是这么实现的,为了测试我们把它展开成对(*os.File).ReadDir的调用
	f, err := os.Open(path)
	if err != nil {
		panic(err)
	}
	defer f.Close()

	files, err := f.ReadDir(-1)
	if err != nil {
		panic(err)
	}
	length := 0
	for _, finfo := range files {
		length = max(length, len(finfo.Name()))
	}
	return length
}

func IterateDir2(path string) int {
    // 1.16之前遍历目录的常用方法之一
	f, err := os.Open(path)
	if err != nil {
		panic(err)
	}
	defer f.Close()

	files, err := f.Readdir(-1)
	if err != nil {
		panic(err)
	}
	length := 0
	for _, finfo := range files {
		length = max(length, len(finfo.Name()))
	}
	return length
}

func BenchmarkIter1(b *testing.B) {
	for range b.N {
		IterateDir("../test")
	}
}

func BenchmarkIter2(b *testing.B) {
	for range b.N {
		IterateDir2("../test")
	}
}

test目录是一个有5000个文件的位于Btrfs文件系统上的目录,我们的测试用例会遍历目录并找出名字最长的文件的文件名长度。

这是测试结果:

可以看到优化后的遍历比原先的快了480%。换了个函数为什么就会有这么大的提升?想知道答案的话就继续看吧。

优化的原理

继续深入前我们先看看老的接口是怎么获取到目录里的文件信息的。答案是遍历目录拿到路径,然后调用
os.Lstat
获取完整的文件信息:

func (f *File) Readdir(n int) ([]FileInfo, error) {
	if f == nil {
		return nil, ErrInvalid
	}
	_, _, infos, err := f.readdir(n, readdirFileInfo)
	if infos == nil {
		// Readdir has historically always returned a non-nil empty slice, never nil,
		// even on error (except misuse with nil receiver above).
		// Keep it that way to avoid breaking overly sensitive callers.
		infos = []FileInfo{}
	}
	return infos, err
}

这个
f.readdir
会根据第二个参数的值来改变自己的行为,根据值不同它会遵循1.16前老代码的行为或者采用新的优化方法。这个函数不同系统上的实现也不同,我们选则*nix系统上的实现看看:

func (f *File) readdir(n int, mode readdirMode) (names []string, dirents []DirEntry, infos []FileInfo, err error) {
	...

	for n != 0 {
		// 使用系统调用获得目录项的数据
        // 目录项的元信息一般是存储在目录本身的数据里的,所以读这些信息和读普通文件很类似
		if d.bufp >= d.nbuf {
			d.bufp = 0
			var errno error
			d.nbuf, errno = f.pfd.ReadDirent(*d.buf)
			runtime.KeepAlive(f)
			if errno != nil {
				return names, dirents, infos, &PathError{Op: "readdirent", Path: f.name, Err: errno}
			}
			if d.nbuf <= 0 {
				break // EOF
			}
		}

		buf := (*d.buf)[d.bufp:d.nbuf]
		reclen, ok := direntReclen(buf)
		if !ok || reclen > uint64(len(buf)) {
			break
		}
        // 注意这行
		rec := buf[:reclen]

		if mode == readdirName {
			names = append(names, string(name))
		} else if mode == readdirDirEntry {
			// 这里的代码后面再看
		} else {
			info, err := lstat(f.name + "/" + string(name))
			if IsNotExist(err) {
				// File disappeared between readdir + stat.
				// Treat as if it didn't exist.
				continue
			}
			if err != nil {
				return nil, nil, infos, err
			}
			infos = append(infos, info)
		}
	}

	if n > 0 && len(names)+len(dirents)+len(infos) == 0 {
		return nil, nil, nil, io.EOF
	}
	return names, dirents, infos, nil
}

ReadDirent对应的是Linux上的系统调用
getdents
,这个系统调用会把目录的目录项信息读取到一块内存里,之后程序可以解析这块内存里的数据来获得目录项的一些信息,这些信息一般包括了文件名,文件的类型,文件是否是目录等信息。

老代码在读取完这些信息后会利用文件名再次调用lstat,这个也是系统调用,可以获取更完整的文件信息,包括了文件的拥有者,文件的大小,文件的修改日期等。

老的代码有啥问题呢?大的问题不存在,接口也算易用,但有些小瑕疵:

  1. 大多数时间遍历目录主要是要获得目录中文件的名字或者类型等属性,显然
    os.FileInfo
    返回的信息过多了。这些用不着的信息会浪费不少内存,获取这些信息也需要额外花时间——lstat需要去进行磁盘io才能得到这些信息,而目录里的文件不像目录项信息那样紧密的存储在一起,它们是分散的,所以一一读取它们的元信息带来的负担会很大。
  2. 使用的系统调用太多了。由于我们测试目录的文件很多,但getdents可能要调用多次,这里假设为两次好了。对于每一个目录项,都需要用lstat去获取文件的详细信息,这样又有5000次系统调用,加起来是5002次。系统调用的开销是很大的,积累到5000多次则会带来肉眼可见的性能下降。实际上linux本身对lstat有优化,不会真的出现要反复进入系统调用5000次的情况,但几十到上百次还是需要的。

优化的代码其实只改了一行,是
f.readdir(n, readdirDirEntry)
,第二个参数变了。新代码会走上面注释掉的那段逻辑:

// rec := buf[:reclen] 防止你忘了rec是哪来的
de, err := newUnixDirent(f.name, string(name), direntType(rec))
if IsNotExist(err) {
	// File disappeared between readdir and stat.
	// Treat as if it didn't exist.
	continue
}
if err != nil {
	return nil, dirents, nil, err
}
dirents = append(dirents, de)

取代lstat的是函数newUnixDirent,这个函数可以不依赖额外的系统调用获取文件的一部分元数据:

type unixDirent struct {
	parent string
	name   string
	typ    FileMode
	info   FileInfo
}

func newUnixDirent(parent, name string, typ FileMode) (DirEntry, error) {
	ude := &unixDirent{
		parent: parent,
		name:   name,
		typ:    typ,
	}
    // 检测文件类型信息是否有效
	if typ != ^FileMode(0) && !testingForceReadDirLstat {
		return ude, nil
	}

	info, err := lstat(parent + "/" + name)
	if err != nil {
		return nil, err
	}

	ude.typ = info.Mode().Type()
	ude.info = info
	return ude, nil
}

文件名和类型都是在解析目录项时就得到的,因此直接设置就行。不过不是每个文件系统都支持在目录项数据里存储文件类型,所以代码里做了回退,一旦发现文件类型是无效数据就会使用lstat重新获取信息。

如果只使用文件名和文件的类型这两个信息,那么整个遍历的逻辑流程到这就结束了,文件系统提供支持的情况下不需要调用lstat。所以整个遍历只需要两次系统调用。这就是为什么优化方案会快接近五倍的原因。

对于要使用其他信息比如文件大小的用户,优化方案实际上也有好处,因为现在lstat是延迟且按需调用的:

func (d *unixDirent) Info() (FileInfo, error) {
	if d.info != nil {
		return d.info, nil
	}
    // 只会调用一次
	return lstat(d.parent + "/" + d.name)
}

这样也能尽量减少不必要的系统调用。

所以整体优化的原理是:尽量充分利用文件系统本身提供的信息+减少系统调用。要遍历的目录越大优化的效果也越明显。

优化的支持情况

上面也说了,能做到优化需要文件系统把文件类型信息存储在目录的目录项数据里。这个需要文件系统的支持。

如果文件系统不支持的话最后还是需要依赖lstat去读取具体文件的元数据。

不同文件系统的信息实在太分散,还有不少过时的,所以我花了几天看代码+查文档做了下整理:

  1. btrfs,ext2,ext4:这个几个文件系统支持优化,man pages加文件系统代码都能证实这一点
  2. OpenZFS:这个文件系统不在Linux内核里,所以man pages里没提到,但也支持优化
  3. xfs:支持优化,但得在创建文件系统时使用类似
    mkfs.xfs -f -n ftype=1
    的选项才行
  4. F2FS,EROFS:文档没提过,但看内核的代码里是支持的,代码的位置在
    xxx_readdir
    这个函数附近。
  5. fat32,exfat:文档没提过,但看内核代码发现是支持的,不过fat家族的文件类型没有那么多花样,只有目录和普通文件这两种,所以代码里很粗暴的判断目录项是否设置了dir标志,有就是目录没有统统算普通文件。这么做倒是正常的,因为fat本来就不支持别的文件类型,毕竟这个文件系统连软链接都不支持,更不用指望Unix Domain Socket和命名管道了。
  6. ntfs:支持,然而如
    注释
    所说,因为ntfs和其他文件系统处理type的方式不一样,导致虽然文件系统本身支持大部分文件类型,但type信息里只能获得文件是不是目录。所以它后面对于不是目录的文件会去磁盘上读取文件的inode然后再从inode里获取文件类型——实际上相当于执行了一次lstat,相比lstat减少了进入系统调用时的一次上下文切换,所以ntfs上优化效果会不如其他文件系统。

这么一看的话基本上主流的常见的文件系统都支持这种优化。

这也是为什么go1.16会引入这个优化,不仅支持广泛而且提升很大,免费的加速谁不爱呢。

别的语言里怎么利用这个优化

看到这里,你应该发现这个优化其实是系统层面的,golang只不过是适配了一下而已。

确实是这样的,所以这个优化不光golang能吃到,c/c++/python都行。

先说说c里怎么利用:直接用系统提供的readdir函数就行,这个函数会调用getdents,然后就能自然吃到优化了。注意事项和go的一样,需要检测文件系统是否支持设置d_type。

c++:和c一样,另外libstdc++的filesystem就是拿readdir实现的,所以用filesystem标准库也能获得优化:

// https://github.com/gcc-mirror/gcc/blob/master/libstdc++-v3/src/filesystem/dir-common.h#L270
inline file_type
get_file_type(const std::filesystem::__gnu_posix::dirent& d [[gnu::unused]])
{
#ifdef _GLIBCXX_HAVE_STRUCT_DIRENT_D_TYPE
  switch (d.d_type)
  {
  case DT_BLK:
    return file_type::block;
  case DT_CHR:
    return file_type::character;
  case DT_DIR:
    return file_type::directory;
  case DT_FIFO:
    return file_type::fifo;
  case DT_LNK:
    return file_type::symlink;
  case DT_REG:
    return file_type::regular;
  case DT_SOCK:
    return file_type::socket;
  case DT_UNKNOWN:
    return file_type::unknown;
  default:
    return file_type::none;
  }
#else
  return file_type::none;
#endif
}

// 如果操作系统以及文件系统不支持,则回退到lstat
// https://github.com/gcc-mirror/gcc/blob/master/libstdc++-v3/include/bits/fs_dir.h#L342
file_type
_M_file_type() const
{
    if (_M_type != file_type::none && _M_type != file_type::symlink)
	    return _M_type;
    return status().type();
}

唯一的区别在于如果目标文件是软连接,也会调用stat。

python:使用os.scandir可以获得优化,底层和c一样使用readdir:
https://github.com/python/cpython/blob/main/Modules/posixmodule.c#L16211,实现方法甚至类名都和golang很像,代码就不贴了。

总结

go虽然性能上一直被诟病,但在系统编程上倒是不含糊,基本常见的优化都有做,可以经常关注下新版本的release notes去看看go在这方面做的努力。

看着简单的优化,背后的可行性验证确实很复杂的,尤其是不同文件系统在怎么存储额外的元数据上很不相同,光是看代码就花了不少时间。

前面提到的ntfs在优化效果上会打点折扣,所以我特意拿Windows设备测试了下,测试条件不变:

可以看到几乎没什么区别。如果不是看了linux的ntfs驱动,我是不知道会产生这样的结果的。所以这个优化Windows上效果不理想,但在Linux和MacOS上是适用的。

大胆假设,小心求证,系统编程和性能优化的乐趣也正在于此。

参考

exfat的fuse驱动填充d_type的逻辑:
https://github.com/relan/exfat/blob/master/libexfat/utils.c#L28

Linux的ntfs驱动需要获取文件的inode才能得到正确的file type:
https://github.com/torvalds/linux/blob/master/fs/ntfs3/dir.c#L337

1 ambari + bigtop 构建大数据基础平台

1.1 参考:

1.2 参考

amabri

bigtop

打包部署

2 ambari+bigtop编译、打包、部署

操作步骤

  • 时间:2024-07-18
  • 环境准备:
    • 系统centos7
    • yum源
    • 系统基础组件
    • 防火墙、selinux设置、句柄数,时区
    • java、scala、maven、gradle、ant、nodejs、环境配置
  • 编译步骤:
    • 编译ambari:最新分支branch-2.8
    • 编译ambari-metrics:最新分支branch-3.0
    • 编译bigtop:最新分支barnch-3.3,此处采用branch-3.2
    • 打包:将以下文件copy到目标目录
      • ambari-server(ambari)
      • ambari-agent(ambari)
      • ambari-metrics-collector(ambari-metrics)
      • ambari-metrics-grafana(ambari-metrics)
      • ambari-metrics-hadoop-sink(ambari-metrics)
      • ambari-metrics-monitor(ambari-metrics)
      • bigtop打包出来的大数据组件(bigtop)
    • ambari+bigtop部署
      • 服务器准备:4c-16G-60G三台
      • 环境检查:操作系统、默认语言、时区、机器名、域名解析、网络、防火墙关闭、selinux关闭、limits句柄数、禁用交换分区、unmask设置、磁盘挂载
      • 基础设置:免密、基础软件、ntp服务、离线镜像源+httpd服务,
      • 数据库安装、ambari元数据库配置
      • 创建并配置bigtop的yum源
      • 安装启动ambari-server,初始化(生成表)

2.0 基础环境准备

2.1 ambari编译

2.2 ambari-metrics编译

2.3 bigtop编译

2.4 制作发版镜像

#创建bdp3.2文件夹-所有rpm包将都拷贝到这个文件夹
mkdir -p bdp3.2
 
#将ambari包拷贝
mkdir -p bdp3.2/ambari  # 存放ambari项目打包出来的rpm包
cp ambari/ambari-server/target/rpm/ambari-server/RPMS/x86_64/ambari-server-2.8.0.0-0.x86_64.rpm bdp3.2/ambari/
cp ambari/ambari-agent/target/rpm/ambari-agent/RPMS/x86_64/ambari-agent-2.8.0.0-0.x86_64.rpm bdp3.2/ambari/
 
#将ambari-metrics包拷贝,# 存放ambari-metrics项目打包出来的rpm包
mkdir -p bdp3.2/ambari-metrics
cp ambari-metrics/ambari-metrics-assembly/target/rpm/ambari-metrics-collector/RPMS/x86_64/ambari-metrics-collector-3.0.1-1.x86_64.rpm bdp3.2/ambari-metrics/
cp ambari-metrics/ambari-metrics-assembly/target/rpm/ambari-metrics-grafana/RPMS/x86_64/ambari-metrics-grafana-3.0.1-1.x86_64.rpm bdp3.2/ambari-metrics/
cp ambari-metrics/ambari-metrics-assembly/target/rpm/ambari-metrics-hadoop-sink/RPMS/x86_64/ambari-metrics-hadoop-sink-3.0.1-1.x86_64.rpm bdp3.2/ambari-metrics/
cp ambari-metrics/ambari-metrics-assembly/target/rpm/ambari-metrics-monitor/RPMS/x86_64/ambari-metrics-monitor-3.0.1-1.x86_64.rpm bdp3.2/ambari-metrics/
 
#将bigtop包拷贝
mkdir -p bdp3.2/bigtop-3.2.1 # 存放bigtop项目打包出来的rpm包
cp -r bigtop/output/* bdp3.2/bigtop-3.2.1
 
# 制作镜像源,在bdp3.2 目录下生成repodata(也可以将文件拷贝到安装位置后,再制作镜像源)
createrepo bdp3.2/
# bdp3.2.tar.gz,将是最终发版的包
tar zcvf bdp3.2.tar.gz bdp3.2 

# yum源模板
vim ambari.repo  #必须是此文件名,安装ambari-agent时,界面有检查项
# BIGOP-3.2.1 必须是固定字符串,大写。ambari安装大数据组件时,有检查项,否则找不到数据源,3.2.1 是bigtop.bom 中的base_version
# http://172.16.76.107/chdp3.2 替换成自己的地址,确保访问地址后,能看到上边ambari,ambari-metrics,bigtop-3.2.1 这三个目录
[BIGOP-3.2.1]
name=BIGOP-3.2.1
baseurl=http://172.16.76.107/bdp3.2
failovermethod=priority
enabled=1
gpgcheck=0

2.5 使用镜像|镜像测试

搭建yum源服务器

#将bdp3.2.tar.gz 复制到目标服务器
tar zxfv bdp3.2.tar.gz 
ln -s /to/your/path/bdp3.2  /var/wwww/html
systemctl restart httpd  # 重启httpd服务
systemctl enable httpd.service # 开机自启
访问:http://${yum_hosts}/bdp3.2, 查看根目录下是否有ambari,ambari-metrics,bigtop-3.2.1,repodata四个文件夹

客户机访问bdp3.2的yum源

1)客户机添加yum源
vim /etc/init.d/ambari.repo # 内容参考上边的yum源模板
[BIGOP-3.2.1]
name=BIGOP-3.2.1
baseurl=http://172.16.76.107/bdp3.2  ## 请替换此处IP
failovermethod=priority
enabled=1
gpgcheck=0
2)客户机刷新yum缓存(务必执行此操作,否则安装时不会报错,也安装不上)
yum clean all
yum makecache
3)查看是否能够访问bdp3.2上的安装包,
yum search ambari-server
4)查看bigtop源下的包有哪些(注意,若是只有两三个,可以去掉wc -l,看一下是具体那几个)
/usr/bin/yum list available --showduplicates --disablerepo=* --enablerepo=BIGTOP-3.2.0  |wc -l 
/usr/bin/yum list available --showduplicates --disablerepo=* --enablerepo=BIGTOP-3.2.1  |wc -l 

bdp3.2 压缩包文件结构预览

bdp3.2/
├── ambari
│   └── x86_64
│       ├── ambari-agent-2.8.0.0-0.x86_64.rpm
│       └── ambari-server-2.8.0.0-0.x86_64.rpm
├── ambari-metrics
│   └── x86_64
│       ├── ambari-metrics-collector-3.0.1-1.x86_64.rpm
│       ├── ambari-metrics-grafana-3.0.1-1.x86_64.rpm
│       ├── ambari-metrics-hadoop-sink-3.0.1-1.x86_64.rpm
│       └── ambari-metrics-monitor-3.0.1-1.x86_64.rpm
├── bigtop3.2.1
│   ├── bigtop-ambari-mpack
│   │   ├── bigtop-ambari-mpack-2.7.5.0-1.el7.src.rpm
│   │   └── noarch
│   │       └── bigtop-ambari-mpack-2.7.5.0-1.el7.noarch.rpm
│   ├── bigtop-groovy
│   │   ├── bigtop-groovy-2.5.4-1.el7.src.rpm
│   │   └── noarch
│   │       └── bigtop-groovy-2.5.4-1.el7.noarch.rpm
│   ├── bigtop-jsvc
│   │   ├── bigtop-jsvc-1.2.4-1.el7.src.rpm
│   │   └── x86_64
│   │       ├── bigtop-jsvc-1.2.4-1.el7.x86_64.rpm
│   │       └── bigtop-jsvc-debuginfo-1.2.4-1.el7.x86_64.rpm
│   ├── bigtop-select
│   │   ├── bigtop-select-3.2.1-1.el7.src.rpm
│   │   └── noarch
│   │       └── bigtop-select-3.2.1-1.el7.noarch.rpm
│   ├── bigtop-utils
│   │   ├── bigtop-utils-3.2.1-1.el7.src.rpm
│   │   └── noarch
│   │       └── bigtop-utils-3.2.1-1.el7.noarch.rpm
│   ├── flink
│   │   ├── flink_3_2_1-1.15.3-1.el7.src.rpm
│   │   └── noarch
│   │       ├── flink_3_2_1-1.15.3-1.el7.noarch.rpm
│   │       ├── flink_3_2_1-jobmanager-1.15.3-1.el7.noarch.rpm
│   │       └── flink_3_2_1-taskmanager-1.15.3-1.el7.noarch.rpm
│   ├── hadoop
│   │   ├── hadoop_3_2_1-3.3.6-1.el7.src.rpm
│   │   └── x86_64
│   │       ├── hadoop_3_2_1-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-client-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-conf-pseudo-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-debuginfo-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-doc-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-hdfs-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-hdfs-datanode-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-hdfs-dfsrouter-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-hdfs-fuse-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-hdfs-journalnode-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-hdfs-namenode-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-hdfs-secondarynamenode-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-hdfs-zkfc-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-httpfs-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-kms-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-libhdfs-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-libhdfs-devel-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-libhdfspp-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-libhdfspp-devel-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-mapreduce-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-mapreduce-historyserver-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-yarn-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-yarn-nodemanager-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-yarn-proxyserver-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-yarn-resourcemanager-3.3.6-1.el7.x86_64.rpm
│   │       ├── hadoop_3_2_1-yarn-router-3.3.6-1.el7.x86_64.rpm
│   │       └── hadoop_3_2_1-yarn-timelineserver-3.3.6-1.el7.x86_64.rpm
│   ├── hbase
│   │   ├── hbase_3_2_1-2.4.17-1.el7.src.rpm
│   │   ├── noarch
│   │   │   └── hbase_3_2_1-doc-2.4.17-1.el7.noarch.rpm
│   │   └── x86_64
│   │       ├── hbase_3_2_1-2.4.17-1.el7.x86_64.rpm
│   │       ├── hbase_3_2_1-master-2.4.17-1.el7.x86_64.rpm
│   │       ├── hbase_3_2_1-regionserver-2.4.17-1.el7.x86_64.rpm
│   │       ├── hbase_3_2_1-rest-2.4.17-1.el7.x86_64.rpm
│   │       ├── hbase_3_2_1-thrift2-2.4.17-1.el7.x86_64.rpm
│   │       └── hbase_3_2_1-thrift-2.4.17-1.el7.x86_64.rpm
│   ├── hive
│   │   ├── hive_3_2_1-3.1.3-1.el7.src.rpm
│   │   └── noarch
│   │       ├── hive_3_2_1-3.1.3-1.el7.noarch.rpm
│   │       ├── hive_3_2_1-hbase-3.1.3-1.el7.noarch.rpm
│   │       ├── hive_3_2_1-hcatalog-3.1.3-1.el7.noarch.rpm
│   │       ├── hive_3_2_1-hcatalog-server-3.1.3-1.el7.noarch.rpm
│   │       ├── hive_3_2_1-jdbc-3.1.3-1.el7.noarch.rpm
│   │       ├── hive_3_2_1-metastore-3.1.3-1.el7.noarch.rpm
│   │       ├── hive_3_2_1-server2-3.1.3-1.el7.noarch.rpm
│   │       ├── hive_3_2_1-webhcat-3.1.3-1.el7.noarch.rpm
│   │       └── hive_3_2_1-webhcat-server-3.1.3-1.el7.noarch.rpm
│   ├── kafka
│   │   ├── kafka_3_2_1-2.8.2-1.el7.src.rpm
│   │   └── noarch
│   │       ├── kafka_3_2_1-2.8.2-1.el7.noarch.rpm
│   │       └── kafka_3_2_1-server-2.8.2-1.el7.noarch.rpm
│   ├── phoenix
│   │   ├── noarch
│   │   │   └── phoenix-5.1.3-1.el7.noarch.rpm
│   │   └── phoenix-5.1.3-1.el7.src.rpm
│   ├── solr
│   │   ├── noarch
│   │   │   ├── solr_3_2_1-8.11.2-1.el7.noarch.rpm
│   │   │   ├── solr_3_2_1-doc-8.11.2-1.el7.noarch.rpm
│   │   │   └── solr_3_2_1-server-8.11.2-1.el7.noarch.rpm
│   │   └── solr_3_2_1-8.11.2-1.el7.src.rpm
│   ├── spark
│   │   ├── noarch
│   │   │   ├── spark_3_2_1-3.2.3-1.el7.noarch.rpm
│   │   │   ├── spark_3_2_1-core-3.2.3-1.el7.noarch.rpm
│   │   │   ├── spark_3_2_1-datanucleus-3.2.3-1.el7.noarch.rpm
│   │   │   ├── spark_3_2_1-external-3.2.3-1.el7.noarch.rpm
│   │   │   ├── spark_3_2_1-history-server-3.2.3-1.el7.noarch.rpm
│   │   │   ├── spark_3_2_1-master-3.2.3-1.el7.noarch.rpm
│   │   │   ├── spark_3_2_1-python-3.2.3-1.el7.noarch.rpm
│   │   │   ├── spark_3_2_1-sparkr-3.2.3-1.el7.noarch.rpm
│   │   │   ├── spark_3_2_1-thriftserver-3.2.3-1.el7.noarch.rpm
│   │   │   ├── spark_3_2_1-worker-3.2.3-1.el7.noarch.rpm
│   │   │   └── spark_3_2_1-yarn-shuffle-3.2.3-1.el7.noarch.rpm
│   │   └── spark_3_2_1-3.2.3-1.el7.src.rpm
│   ├── tez
│   │   ├── noarch
│   │   │   └── tez_3_2_1-0.10.2-1.el7.noarch.rpm
│   │   └── tez_3_2_1-0.10.2-1.el7.src.rpm
│   └── zookeeper
│       ├── x86_64
│       │   ├── zookeeper_3_2_1-3.6.4-1.el7.x86_64.rpm
│       │   ├── zookeeper_3_2_1-debuginfo-3.6.4-1.el7.x86_64.rpm
│       │   ├── zookeeper_3_2_1-native-3.6.4-1.el7.x86_64.rpm
│       │   ├── zookeeper_3_2_1-rest-3.6.4-1.el7.x86_64.rpm
│       │   └── zookeeper_3_2_1-server-3.6.4-1.el7.x86_64.rpm
│       └── zookeeper_3_2_1-3.6.4-1.el7.src.rpm
└── repodata
    ├── 15cc6a5320a9f6e38cda7b8ec54e7e28ed527df25ee6f56c1bf20cfa421dfe73-other.sqlite.bz2
    ├── 1a5c0f03d6edae3170255a282d113afe9b22f654f246ea308f162e14234cf5f8-filelists.sqlite.bz2
    ├── 28cb1ff79328b8ebeaa55021078984e8da908081b0b389e4037d9b1e5854ae22-primary.xml.gz
    ├── 5066ad167311131c46e983c0d66c4901b92a4e862b11aff3492f02f90ec2eb36-other.xml.gz
    ├── 617177ba2b017e6e6b4a62b157fe3441ea97b74455d60792e78471f938b86cb8-filelists.xml.gz
    ├── 7ea377d3a0b59ffaaef9661cb7dd3f2144bd7bc27da1a11070b6d78d5116dddd-primary.sqlite.bz2
    └── repomd.xml
37 directories, 103 files

2.6 ambari 安装

安装中可能出现的问题及解决方案

  • 问题1:yum install ambari-server后出现以下日志,软件没有安装成功,也没有任何反应
    • 解决方案:yum clean all && yum makecache
Loaded plugins:aliases,changelog,fastestmirror, kabi, langpacks, tmprepo, verify, versionlock
Loading support for RedHat kernel ABI
  • 问题2:yum源配置
    /etc/yum.repos.d/ambari.repo (必须为此名称,安装过程有校验)
[BIGTOP-3.2.1]   # 必须为此名称,和ambari-server中的配置一致,包含大小写,下一行同理
name=BIGTOP-3.2.1
baseurl=http://192.168.76.107/chdp3.2
failovermethod=priority
enabled=1
gpgcheck=0 

说在前面的话

自从AI出来之后,学习的曲线瞬间变缓了,学习的路径也有了很大的变化。
与本人来说以前大多数都先知晓理论再找相关的框架官网或博客,然后去实践Demo,再加入到代码中,也就是
理论-》实践-》应用-》技能

而现在,可以在稍微知晓概念之后,直接找AI要Demo先跑起来,应用到代码中去,然后反向结合代码去问AI每一步实现的逻辑与理论,再消化成自己技能,也就是
概念-》应用-》理论知识-》技能

故此次尝试不同的写文思路,从拿到源代码的读者角度,开始思考与实践。

依照惯例,源代码在文末,需要自取~

环境准备

  • Docker Desktop
  • VS2022 SDK .NET8
  • Elastic Search 8.0+ Kibana(Docker)
  • SqlServer (Docker 或者本地 SqlServer)
  • Chat Gpt (可选)

效果一览

实现要求

  1. 分别记录EF、非EF日志,存放至不同的路径
  2. 其中非EF日志以AOP的思想,记录执行方法的前后信息,API接口调用详情与返回结果,
  3. 把所有的消息推送至elastic search。

实现效果

Docker

Docker中启动 ES 和 SqlServer

再 F5 启动项目

日志分别存储

  • EF 日志存储在 Logs/EFSql 文件夹下
  • 非EF日志存储在 Logs/AuditLogs 文件夹下

推送至ES

所有的日志推送至Elastic Search

从Program 初见端倪

OK,相信你现在已经把代码拉下来了,如果没有想生啃代码,不用跳过此节。

.net 8 与 .net 6 可以使用顶级语句,相对于.net 5 及以前的版本,去掉了Startup作为web的配置程序,也就是上手各大项目的入口, 这里直接看Program 来探究Demo 如何配置Web,来一窥端倪吧!

using Autofac;
using Autofac.Extensions.DependencyInjection;
using Autofac.Extras.DynamicProxy;
using Microsoft.EntityFrameworkCore;
using SampleDemo.Yzh.Net.Logger;
using SampleDemo.Yzh.Net.Repository;
using SampleDemo.Yzh.Net.Service;
using SampleDemo.Yzh.Net.Core;
using Serilog;
using ILogger = Serilog.ILogger;

var builder = WebApplication.CreateBuilder(args);
// Config
builder.Services.AddSingleton(new AppSettings(builder.Configuration));
// Serilog
builder.Host.AddCustomLog();
// EF
builder.Services.AddDbContextPool<TestContext>
    (o => o.UseSqlServer(builder.Configuration.GetConnectionString("NicoLocaldbStr"))
#if DEBUG
    .EnableSensitiveDataLogging()
#endif
    .UseLoggerFactory(LoggerFactory.Create(builder => builder.AddSerilog())
    ));
// Http Info
builder.Services.AddHttpContextAccessor();
// Auto Fac
builder.Host.UseServiceProviderFactory(new AutofacServiceProviderFactory());
builder.Host.ConfigureContainer<ContainerBuilder>(containerBuilder =>
{
    // 注册AopDemo拦截器为单例模式
    containerBuilder.RegisterType<AopDemo>().SingleInstance();

    // InterceptedBy指定拦截器
    containerBuilder.RegisterType<ValuesService>().As<IValuesService>().EnableInterfaceInterceptors().InterceptedBy(typeof(AopDemo));
    // 或者在IValuesService 加标签 [Intercept(typeof(AopDemo))]
    //containerBuilder.RegisterType<ValuesService>().As<IValuesService>().EnableInterfaceInterceptors();

    //向 Autofac 容器注册 Log.Logger 这个已经存在的日志记录器实例 ,指定这个实例作为 ILogger 接口的实现 ,此ILogger为 Serilog的ILogger
    //当应用程序中的组件通过依赖注入请求 ILogger 接口的实例时,它们会接收到 Log.Logger 这个单例对象
    containerBuilder.RegisterInstance(Log.Logger).As<ILogger>();
});

builder.Services.AddControllers();
var app = builder.Build();
// Middleware
app.UseMiddleware<ResponseLoggingMiddleware>(); //返回客户端响应的日志记录
app.UseAuthorization();
app.MapControllers();

app.Run();

这里我直接把
Program.cs
51 行代码全部拉出来,由此来看大致做了些什么。

  • using 中可以看到 引用了
    Autofac
    以及他的扩展包, 引入了
    Serilog
    ,并且指定了ILogger为 Serilog.ILogger,作用暂且按下不表

  • builder.Services.AddSingleton(new AppSettings(builder.Configuration))
    这里将配置以单例的形式封装在
    AppSettings
    类中,后续取配置直接从AppSettings类中取即可

  • builder.Host.AddCustomLog();
    这里用一个扩展方法,将Serilog的注入写入AddCustomLog()方法中,后续这将是该Demo核心方法,重点看

builder.Services.AddDbContextPool<TestContext>
    (o => o.UseSqlServer(builder.Configuration.GetConnectionString("NicoLocaldbStr"))
#if DEBUG
    .EnableSensitiveDataLogging()
#endif
    .UseLoggerFactory(LoggerFactory.Create(builder => builder.AddSerilog())
    ));
  • 上述代码,以DbContext池化的方式注入EF,并且在Debug模式下开启敏感日志,配置Serilog作为日志的记录工具

  • builder.Services.AddHttpContextAccessor();
    添加访问Http请求的信息,用于后续Reponse中间件开发提供HTTP信息

  • builder.Host.UseServiceProviderFactory
    以及
    builder.Host.ConfigureContainer
    配置Auto Fac,注释详解

  • app.UseMiddleware< ResponseLoggingMiddleware >();
    ResponseLoggingMiddleware中间件返回客户端响应的日志记录

再看Serilog拓展

从上文可知,使用了
AddCustomLog()
这样的一个扩展方法,注入了Serilog,OK! ,Ctrl + F12 进入一探究竟

    public static class CustomLogger
    {
        public static IHostBuilder AddCustomLog(this IHostBuilder builder)
        {
            Log.Logger = new LoggerConfiguration()
                .ReadFrom.Configuration(AppSettings.Configuration)
                .Enrich.FromLogContext()

                // 输出到控制台
                .WriteToConsole()
                // 输出到文件
                .WriteToFile()
                // 输出到Es
                .WriteToElasticsearch()
                .CreateLogger();

            builder.UseSerilog();
            return builder;
        }
    }
  • .ReadFrom.Configuration(AppSettings.Configuration):
    指示 Serilog 从应用程序的配置文件(如 appsettings.json)中读取配置信息。这里假设有一个 AppSettings 类,它持有应用程序的配置信息。

  • .Enrich.FromLogContext():
    添加从日志上下文中丰富日志事件的能力,例如,可以自动地将关联的 HTTP 请求信息添加到日志中。

  • .WriteToConsole():
    配置 Serilog 将日志输出到控制台。

  • .WriteToFile():
    配置 Serilog 将日志输出到文件。

  • .WriteToElasticsearch():
    配置 Serilog 将日志输出到 Elasticsearch。这对于在 Elasticsearch 中集中管理和分析日志非常有用。

WriteToConsole与WriteToFile 也是拓展方法,那就继续Ctrl + F12 继续下去咯

public static class LoggerConfigurationExtensions
{
    /// <summary>
    /// 输出在控制台
    /// </summary>
    /// <param name="loggerConfiguration"></param>
    /// <returns></returns>
    public static LoggerConfiguration WriteToConsole(this LoggerConfiguration loggerConfiguration)
    {
        // 输出普通日志
        loggerConfiguration = loggerConfiguration.WriteTo.Logger(lg =>
            lg.FilterRemoveSqlLog().WriteTo.Console());

        // 输出SQL
        loggerConfiguration = loggerConfiguration.WriteTo.Logger(lg =>
            lg.FilterSqlLog().WriteTo.Console());

        return loggerConfiguration;
    }

    /// <summary>
    /// 写入文件
    /// </summary>
    /// <param name="loggerConfiguration"></param>
    /// <returns></returns>
    public static LoggerConfiguration WriteToFile(this LoggerConfiguration loggerConfiguration)
    {
        // SQL语句写入 
        loggerConfiguration = loggerConfiguration.WriteTo.Logger(lg =>
            lg.FilterSqlLog()
                .WriteTo.Async(s => s.File(LogContextStatic.Combine(LogContextStatic.EFSql, @"EFSql.txt"), rollingInterval: RollingInterval.Day,
                    outputTemplate: LogContextStatic.FileMessageTemplate, retainedFileCountLimit: 31)));

        // 非SQL写入
        loggerConfiguration = loggerConfiguration.WriteTo.Logger(lg =>
            lg.FilterRemoveSqlLog()
                .WriteTo.Async(s => s.File(LogContextStatic.Combine(LogContextStatic.AuditLogs, @"AuditLog.txt"), rollingInterval: RollingInterval.Hour,
                outputTemplate: LogContextStatic.FileMessageTemplate, retainedFileCountLimit: 31)));
        return loggerConfiguration;
    }

    /// <summary>
    /// 推送至 ES
    /// </summary>
    /// <param name="loggerConfiguration"></param>
    /// <returns></returns>
    public static LoggerConfiguration WriteToElasticsearch(this LoggerConfiguration loggerConfiguration)
    {
        var esUri = AppSettings.GetValue("ElasticConfiguration:Uri");
        loggerConfiguration = loggerConfiguration.WriteTo.Elasticsearch(new ElasticsearchSinkOptions(
                new Uri(esUri))
        {
            AutoRegisterTemplate = true,
            IndexFormat = "nico-api-log-{0:yyyy.MM.dd}",
            ModifyConnectionSettings = con => con.ServerCertificateValidationCallback((sender, certificate, chain, sslPolicyErrors) => true)
            .BasicAuthentication(
                AppSettings.GetValue("ElasticConfiguration:Es_UserName"),
                AppSettings.GetValue("ElasticConfiguration:Es_Pwd"))
        });
        return loggerConfiguration;
    }

    /// <summary>
    /// 过滤出 SQL语句的日志
    /// </summary>
    /// <param name="lc"></param>
    /// <returns></returns>
    public static LoggerConfiguration FilterSqlLog(this LoggerConfiguration lc)
    {
        return lc.Filter.ByIncludingOnly(e =>
        e.Properties.ContainsKey("SourceContext") &&
        e.Properties["SourceContext"].ToString().Contains("Microsoft.EntityFrameworkCore.Database.Command"));
    }

    /// <summary>
    /// 过滤非 SQL语句的日志
    /// </summary>
    /// <param name="lc"></param>
    /// <returns></returns>
    public static LoggerConfiguration FilterRemoveSqlLog(this LoggerConfiguration lc)
    {
        return lc.Filter.ByExcluding(e =>
        e.Properties.ContainsKey("SourceContext") &&
        e.Properties["SourceContext"].ToString().Contains("Microsoft.EntityFrameworkCore.Database.Command"));
    }
}

注释非常详尽,不再赘述,这里需要解释核心的问题,如何区别的SQL与非SQL的?

日志上下文 (SourceContext):
Serilog 允许通过日志事件的属性来进行过滤。在这个例子中,使用了 SourceContext 属性,这是一个常用于标识日志来源的属性。对于由 Entity Framework Core 生成的日志,SourceContext 通常会包含值 "Microsoft.EntityFrameworkCore.Database.Command",这表示日志条目是由 EF Core 在执行数据库命令时产生的。

过滤器 (Filter.ByIncludingOnly):
这个方法允许配置日志系统仅包含符合特定条件的日志条目。在这里,条件是日志事件的属性中必须包含 SourceContext,并且其值包含 "Microsoft.EntityFrameworkCore.Database.Command"。这样,只有 EF Core 产生的数据库命令日志会被包括进来。

如果此节看懂了,下面实践绝对难不倒读者。

部署ES

本人在部署ES的时候遇到了很多坑,看到一些大佬的博客也是一笔带过,这里用Docker desktop 一步一步全部覆盖到位,跟着文章步骤走,绝对搞得定。

启动elasticsearch 、 kibana

首先先贴上 ES官方文档启动教程
https://www.elastic.co/guide/en/elasticsearch/reference/current/run-elasticsearch-locally.html

下载elasticsearch 、 kibana 镜像,本文截至24年7月30日,使用最新的ES版本
8.14.3

镜像列表

docker创建网络

docker network create elastic-net

创建好的网络

docker中启动ES,并且记住登录的密码,加入集群的Token

docker run --name es01 --net elastic -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -t elasticsearch:8.14.3

启动之后,会在终端显示密码,还有加入集群的Token,待会在启动kibana时要用,如果不小心关掉了,可以在docker desktop中,找到es的容器日志找到。

包含了密码,Token等

到这里ES启动完成,接下来启动Kibana

docker中启动kibana,加入集群

docker run --name kib01 --net elastic -p 5601:5601 kibana:8.14.3

提示进入后台配置页面

http://0.0.0.0/5601
可能进不去(改了host文件的原因),用
http://localhost/5601即可

将token传入 ,静待部署完成, 就进入了登录界面

账号默认为 elastic 密码(小写) 则为一开始保存的 密码,这里为 _*5O7P+5FKSV7nC_EDEE
记得改为你自己的
,成功登录之后就进入后台页面

ES后台创建数据集 (DataView)

创建 Data view , 此时需要重点注意 , Index pattern这里需要填对应索引的名称,匹配成功才能在Discover中看到日志

至此 ES 搭建完成,现在要去将 Log 推送到 ES平台来

启动!!!

上文中已经将ES部署好了,现在需要将项目跑起来,将日志推送到ES平台中去。

修改项目配置

AppSetting.json

数据库使用的是SQL Server , 需要修改为你自己的SQL 链接字符串即可。

    "ConnectionStrings": {
    // SQL Server
    "NicoLocaldbStr": "Data Source=localhost;Initial Catalog=nicolocaldb;User ID=sa;Password=MyStrongPwd!2#;Trust Server Certificate=True"
  }

这里需要加张表


-- SQL Server
CREATE TABLE TestEFTable (
    Id INT PRIMARY KEY IDENTITY(1,1),        -- Id 列为主键并设置为自增
    Name NVARCHAR(255) NULL,                 -- Name 列,允许 NULL,使用 NVARCHAR 类型来支持 Unicode 字符
    LastModifiedAt DATETIMEOFFSET NULL,      -- LastModifiedAt 列,允许 NULL,使用 DATETIMEOFFSET 类型
    LastModifiedBy INT NOT NULL              -- LastModifiedBy 列,不允许 NULL
);
  "ElasticConfiguration": {
    "Uri": "https://localhost:9200",    // ES 地址
    "Es_UserName": "elastic",           // 账号
    "Es_Pwd": "_*5O7P+5FKSV7nC_EDEE"    // 密码
  }

ES 的配置同样修改为自己的配置。

原神启动!(bushi

F5启动! (以下是操作视频)
https://www.bilibili.com/video/BV1w3vUeWE4n/

总结

稍微总结一下,

  • Demo集成了Elasticsearch,Serilog,拦截器,autofac,EF。
  • 区分EF 与非EF日志的记录与存放
  • 记录完整的操作视频,包教包会!

源代码仓库
[GitHub]
https://github.com/OrzCoCo-Y/LogSampleDemo
[Gitee]
https://gitee.com/yi_zihao/LogSampleDemo

参考资料

【ChatGPT】
【严架的Maomi 框架】
https://maomi.whuanle.cn
借鉴了其中HTTP日志记录,取代了自己写中间件
【老八的哲学开源框架】
https://github.com/anjoy8/Blog.Core
借鉴了其中服务注册

Naiad论文

《Naiad: A Timely Dataflow System》

前面通过文章
《论文图谱当如是:Awesome-Graphs用200篇图系统论文打个样》
向大家介绍了论文图谱项目Awesome-Graphs,并分享了Google的
Pregel
、OSDI'12的
PowerGraph
、SOSP'13的
X-Stream
。这次向大家分享Microsoft发表在SOSP'13的另一篇关于流处理系统论文Naiad,
TimelyDataflow
是它的开源实现。该论文促进了后续的流图系统的设计与创新,从其调度框架设计中也可以看到TuGraph Analytics调度器的影子。

对图计算技术感兴趣的同学可以多做了解,也非常欢迎大家关注和参与论文图谱的开源项目:

提前感谢给项目点Star的小伙伴,接下来我们直接进入正文!

摘要

Naiad是一个可执行有环数据流的分布式数据并行系统,提供了高吞吐的批处理、低延迟的流处理,以及迭代和增量计算的能力。

1. 介绍

支持特性:

  • 循环结构化,支持反向边(feedback)。
  • 有状态的数据流节点,支持无需全局协调的生产消费能力。
  • 节点收齐特定轮次/迭代的输入后的通知机制。

2. 及时数据流

数据流图可以包含嵌套的循环结构,时间戳用于区分数据是由哪个轮次/迭代产生的。

2.1 图结构

及时数据流图包含输入/输出节点,输入节点从外部的生产者接受消息序列,输出节点将消息序列发送到外部消费者。
外部的生产者为每个消息打标了一个轮次(epoch),当没有消息需要输入时,会主动通知输入节点。
生产者也可以关闭输入节点,表示输入节点将不会再收到任何消息。
输出节点的消息也会打标这个轮次,同样当没有消息需要输出时,也会通知外部消费者。

及时数据流图里可以包含嵌套的循环上下文(loop contexts):

  • 入口点(ingress vertex):数据流图的边进入循环上下文必须经过入口点,如I。
  • 出口点(egress vertex):数据流图的边离开循环上下文必须经过出口点,如E。
  • 反馈点(feedback vertex):循环上下文内必须包含反馈点,如F。


针对上图所表达的计算语义解释:

关键概念:逻辑时间戳(logical timestamp):

  • e:消息的轮次。
  • k:循环嵌套的深度。
  • c:向量,每层循环的迭代次数。


逻辑时间戳变化规则:

  • 经过入口点:c增加一个维度,初始化为0,表示循环开始。
  • 经过反馈点:c的最后一个维度+1,表示循环次数累计。
  • 经过出口点:c的最后一个维度提出,恢复成与入口点一致。

逻辑时间戳大小比较,t1=(e1, <c1, ..., cm>),t2=(e2, <c1, ..., cn>):

  • 条件1:整数比较,e1 < e2。
  • 条件2:字符串比较,c1 + ... + cm < c1 + ... + cn。

2.2 节点计算

数据流的节点可以接收、发送带逻辑时间戳的消息(message),以及通知(notification)。

每个节点v实现两个回调函数:

  1. v.OnRecieve(Edge e, Message m, Timestamp t):接收消息。
  2. v.OnNotify(Timestamp t):接收通知。

并可以调用系统提供的两个函数:

  1. this.SendBy(Edge e, Message m, Timestamp t):发送消息。
  2. this.NotifyAt(Timestamp t):发送通知。

对于数据流边e=(u, v),u.SendBy将触发v.OnRecieve,u.NotifyAt将触发v.onNotify。
数据流系统保证v.OnNotify(t)一定发生在v.OnRecieve(e, m, t')之后,其中t' < t,即保证处理完所有t之前的消息后再处理通知,以让节点具备机会清理t之前的工作状态。
这种机制保证了消息处理不会发生时光回溯(backwards in time)。

如下示例代码描述了一个双出的数据流节点实现distinct、count算子的逻辑。

class DistinctCount<S,T> : Vertex<T>
{
    Dictionary<T, Dictionary<S,int>> counts;
    void OnRecv(Edge e, S msg, T time)
    {
        if (!counts.ContainsKey(time)) {
            counts[time] = new Dictionary<S,int>();
            this.NotifyAt(time);
        }
        if (!counts[time].ContainsKey(msg)) {
            counts[time][msg] = 0;
            this.SendBy(output1, msg, time);
        }
        counts[time][msg]++;
    }
    void OnNotify(T time)
    {
        foreach (var pair in counts[time])
        this.SendBy(output2, pair, time);
        counts.Remove(time);
    }
}

2.3 实现及时数据流

数据流处理受限于未处理的事件(events:消息、通知)和数据流图的结构。

关键概念:pointstamp:

  • u.SendBy(e, m, t):生成pointstamp (t, e)。
  • u.NotifyAt(t):生成pointstamp (t, v)。

单线程调度器实现:

  • 维护一个激活pointstamp(active pointstamp) 集合,集合大小至少为1。对于每个pointstamp,有两个计数器:
    • OC(occurrence count):未完成的pointstamp数。
    • PC(precursor count):上游激活的pointstamp数。
  • 系统初始化时,为输入节点生成第一个pointstamp,其中t=e,OC=1,PC=0。当e完成后,继续生成t=e+1的pointstamp。
  • 当激活pointstamp p时,初始化PC为上游所有激活的pointstamp数,并递增下游节点所有pointstamp的PC值。
  • 当OC[p]=0时,从active集合删除p,并递减下游节点所有pointstamp的PC值。
  • 当PC[p]=0时,表示上游没有激活的pointstamp影响到p,则称p是frontier,调度器会把所有通知发送给frontier。

OC的计算规则为:

3. 分布式实现

  • Naiad集群包含多个进程,每个进程包含多个worker,worker管理数据流节点的一个分区。
  • worker之间通过本地的共享内存或者远程TCP连接交换消息。
  • 进程遵循分布式进度追踪协议(Progress Tracking Protocol),用于协调通知的分发。

3.1 数据并行

  • 逻辑数据流图:stages+connectors。
  • connectors包含一个分区函数。
  • 运行时逻辑数据流图被展开为物理数据流图,stage被替换为一组节点,connectors被替换为一组边。

3.2 Workers

  • 分发消息优先于分发通知。
  • 分发策略多样,如基于最早的pointstamp分发降低端到端延迟。
  • worker使用共享队列进行通信。
  • 如果分发的目标节点在同一个worker,那么SendBy会直接调用目标节点的OnRecieve。
  • 如果存在环则需要强制进入队列,或者控制递归深度避免系统过载。

3.3 分布式进度追踪

  • 每个worker维护各自的状态,通过广播OC进行状态共享。
  • 优化手段:
    • 使用映射的pointstamp实现进度跟踪,以降低并发冲突和更新规模。
    • 更新广播前先进行本地聚合。

3.4 错误容忍和可用性

  • Checkpoint和Restore接口。

3.5 预防抖动

  • 网络。
  • 数据结构竞争。
  • 垃圾回收。

4. 使用Naiad写程序

5. 性能评估

6. 现实应用

  • 批量迭代图计算
  • 批量迭代机器学习
  • 流式无环计算
  • 流式迭代图分析

7. 总结

Naiad通过允许程序按需协调,支持了混合的同步+异步计算。