2024年9月

原创文章,欢迎转载,转载请注明出处,谢谢。


0. 前言

函数是 Go 的一级公民,本文从汇编角度出发看看我们常用的一些函数在干什么。

1. 函数

1.1 main 函数

在 main 函数中计算两数之和如下:

package main

func main() {
	x, y := 1, 2
	z := x + y
	print(z)
}

使用
dlv
调试函数(不了解
dlv
的请看
Go plan9 汇编: 打通应用到底层的任督二脉
):

# dlv debug
Type 'help' for list of commands.
(dlv) b main.main
Breakpoint 1 set at 0x45feca for main.main() ./ex4.go:3
(dlv) c
> main.main() ./ex4.go:3 (hits goroutine(1):1 total:1) (PC: 0x45feca)
     1: package main
     2:
=>   3: func main() {
     4:         x, y := 1, 2
     5:         z := x + y
     6:         print(z)
     7: }

disass
查看对应的汇编指令:

(dlv) 
TEXT main.main(SB) /root/go/src/foundation/ex4/ex4.go
        ex4.go:3        0x45fec0        493b6610                cmp rsp, qword ptr [r14+0x10]
        ex4.go:3        0x45fec4        763d                    jbe 0x45ff03
        ex4.go:3        0x45fec6        55                      push rbp
        ex4.go:3        0x45fec7        4889e5                  mov rbp, rsp
=>      ex4.go:3        0x45feca*       4883ec20                sub rsp, 0x20
        ex4.go:4        0x45fece        48c744241801000000      mov qword ptr [rsp+0x18], 0x1
        ex4.go:4        0x45fed7        48c744241002000000      mov qword ptr [rsp+0x10], 0x2
        ex4.go:5        0x45fee0        48c744240803000000      mov qword ptr [rsp+0x8], 0x3
        ex4.go:6        0x45fee9        e8d249fdff              call $runtime.printlock
        ex4.go:6        0x45feee        488b442408              mov rax, qword ptr [rsp+0x8]
        ex4.go:6        0x45fef3        e86850fdff              call $runtime.printint
        ex4.go:6        0x45fef8        e8234afdff              call $runtime.printunlock
        ex4.go:7        0x45fefd        4883c420                add rsp, 0x20
        ex4.go:7        0x45ff01        5d                      pop rbp
        ex4.go:7        0x45ff02        c3                      ret
        ex4.go:3        0x45ff03        e8d8cdffff              call $runtime.morestack_noctxt
        ex4.go:3        0x45ff08        ebb6                    jmp $main.main
(dlv) regs
    Rsp = 0x000000c00003e758

相信看过
Go plan9 汇编: 打通应用到底层的任督二脉
的同学对上述汇编指令已经有一定了解的。

这里进入 main 函数,执行到
sub rsp, 0x20
指令,该指令为 main 函数开辟 0x20 字节的内存空间。继续往下执行,分别将
0x1

0x2

0x3
放到
[rsp+0x18]

[rsp+0x10]

[rsp+0x8]
处(从汇编指令好像没看到
z := x + y
的加法,合理怀疑是编译器做了优化)。

继续,
mov rax, qword ptr [rsp+0x8]

[rsp+0x8]
地址的值
0x3
放到
rax
寄存器中。然后,调用
call $runtime.printint
打印
rax
的值。实现输出两数之后。后续的指令我们就跳过了,不在赘述。

1.2 函数调用

在 main 函数中实现两数之和,我们没办法看到函数调用的过程。
接下来,定义 sum 函数实现两数之和,在 main 函数中调用 sum。重点看函数在调用时做了什么。

示例如下:

package main

func main() {
	a, b := 1, 2
	println(sum(a, b))
}

func sum(x, y int) int {
	z := x + y
	return z
}

使用
dlv
调试函数:

# dlv debug
Type 'help' for list of commands.
(dlv) b main.main
Breakpoint 1 set at 0x45feca for main.main() ./ex6.go:3
(dlv) c
> main.main() ./ex6.go:3 (hits goroutine(1):1 total:1) (PC: 0x45feca)
     1: package main
     2:
=>   3: func main() {
     4:         a, b := 1, 2
     5:         println(sum(a, b))
     6: }
     7:
     8: func sum(x, y int) int {
(dlv) disass
Sending output to pager...
TEXT main.main(SB) /root/go/src/foundation/ex6/ex6.go
        ex6.go:3        0x45fec0        493b6610                cmp rsp, qword ptr [r14+0x10]
        ex6.go:3        0x45fec4        764f                    jbe 0x45ff15
        ex6.go:3        0x45fec6        55                      push rbp
        ex6.go:3        0x45fec7        4889e5                  mov rbp, rsp
=>      ex6.go:3        0x45feca*       4883ec28                sub rsp, 0x28
        ex6.go:4        0x45fece        48c744241801000000      mov qword ptr [rsp+0x18], 0x1
        ex6.go:4        0x45fed7        48c744241002000000      mov qword ptr [rsp+0x10], 0x2
        ex6.go:5        0x45fee0        b801000000              mov eax, 0x1
        ex6.go:5        0x45fee5        bb02000000              mov ebx, 0x2
        ex6.go:5        0x45feea        e831000000              call $main.sum
        ex6.go:5        0x45feef        4889442420              mov qword ptr [rsp+0x20], rax
        ex6.go:5        0x45fef4        e8c749fdff              call $runtime.printlock
        ex6.go:5        0x45fef9        488b442420              mov rax, qword ptr [rsp+0x20]

regs
查看寄存器状态:

(dlv) regs
    Rip = 0x000000000045feca
    Rsp = 0x000000c00003e758
    Rbp = 0x000000c00003e758
    ...

继续往下分析指令的执行过程:

  1. sub rsp, 0x28
    :
    rsp
    的内存地址减
    0x28
    ,意味着 main 函数开辟
    0x28
    字节的栈空间。
  2. mov qword ptr [rsp+0x18], 0x1

    mov qword ptr [rsp+0x10], 0x2
    :将
    0x1

    0x2
    分别放到内存地址
    [rsp+0x18]

    [rsp+0x10]
    中。
  3. mov eax, 0x1

    mov ebx, 0x2
    :将
    0x1

    0x2
    分别放到寄存器
    eax

    ebx
    中。

跳转到
0x45feea
指令:

(dlv) b *0x45feea
Breakpoint 2 set at 0x45feea for main.main() ./ex6.go:5
(dlv) c
> main.main() ./ex6.go:5 (hits goroutine(1):1 total:1) (PC: 0x45feea)
     1: package main
     2:
     3: func main() {
     4:         a, b := 1, 2
=>   5:         println(sum(a, b))
     6: }
     7:
     8: func sum(x, y int) int {
     9:         z := x + y
    10:         return z
(dlv) disass
Sending output to pager...
TEXT main.main(SB) /root/go/src/foundation/ex6/ex6.go
        ex6.go:3        0x45fec0        493b6610                cmp rsp, qword ptr [r14+0x10]
        ex6.go:3        0x45fec4        764f                    jbe 0x45ff15
        ex6.go:3        0x45fec6        55                      push rbp
        ex6.go:3        0x45fec7        4889e5                  mov rbp, rsp
        ex6.go:3        0x45feca*       4883ec28                sub rsp, 0x28
        ex6.go:4        0x45fece        48c744241801000000      mov qword ptr [rsp+0x18], 0x1
        ex6.go:4        0x45fed7        48c744241002000000      mov qword ptr [rsp+0x10], 0x2
        ex6.go:5        0x45fee0        b801000000              mov eax, 0x1
        ex6.go:5        0x45fee5        bb02000000              mov ebx, 0x2
=>      ex6.go:5        0x45feea*       e831000000              call $main.sum
        ex6.go:5        0x45feef        4889442420              mov qword ptr [rsp+0x20], rax
        ex6.go:5        0x45fef4        e8c749fdff              call $runtime.printlock
        ex6.go:5        0x45fef9        488b442420              mov rax, qword ptr [rsp+0x20]
        ex6.go:5        0x45fefe        6690                    data16 nop

在执行
call $main.sum
前,让我们先看下内存分布:

image

(绿色部分表示 main 函数栈)

继续执行
call $main.sum
:

(dlv) si
> main.sum() ./ex6.go:8 (PC: 0x45ff20)
TEXT main.sum(SB) /root/go/src/foundation/ex6/ex6.go
=>      ex6.go:8        0x45ff20        55                      push rbp
        ex6.go:8        0x45ff21        4889e5                  mov rbp, rsp
        ex6.go:8        0x45ff24        4883ec10                sub rsp, 0x10
        ex6.go:8        0x45ff28        4889442420              mov qword ptr [rsp+0x20], rax
        ex6.go:8        0x45ff2d        48895c2428              mov qword ptr [rsp+0x28], rbx
        ex6.go:8        0x45ff32        48c7042400000000        mov qword ptr [rsp], 0x0
        ex6.go:9        0x45ff3a        4801d8                  add rax, rbx
        ex6.go:9        0x45ff3d        4889442408              mov qword ptr [rsp+0x8], rax
        ex6.go:10       0x45ff42        48890424                mov qword ptr [rsp], rax
        ex6.go:10       0x45ff46*       4883c410                add rsp, 0x10
        ex6.go:10       0x45ff4a        5d                      pop rbp
        ex6.go:10       0x45ff4b        c3                      ret
(dlv) regs
    Rip = 0x000000000045ff20
    Rsp = 0x000000c00003e728
    Rbp = 0x000000c00003e758

可以看到,Rsp 寄存器往下减 8 个字节,压栈开辟 8 个字节空间。继续往下分析指令:

  1. push rbp
    :将
    rbp
    寄存器的值压栈,rbp 中存储的是地址
    0x000000c00003e758
    。由于进行了压栈操作,这里的
    Rsp
    会往下减 8 个字节。
  2. mov rbp, rsp
    :将当前 rsp 的值给
    rbp

    rbp

    sum
    函数栈的栈底。
  3. sub rsp, 0x10

    rsp
    往下减
    0X10
    个字节,开辟16 个字节的空间,做为
    sum
    的函数栈,此时
    rsp
    的地址为
    0x000000c00003e710
    ,表示函数栈的栈顶。

执行到这里,我们画出内存分布图如下:

image

继续往下分析:

  1. mov qword ptr [rsp+0x20], rax

    mov qword ptr [rsp+0x28], rbx
    :分别将
    rax
    寄存器的值 1 放到
    [rsp+0x20]:0x000000c00003e730

    rbx
    寄存器的值 2 放到
    [rsp+0x28]:0x000000c00003e738
  2. mov qword ptr [rsp], 0x0
    :将 0 放到
    [rsp]
    中。
  3. add rax, rbx
    :将 rax 和 rbx 的值相加,结果放到 rax 中,相加后 rax 中的值为 3。
  4. mov qword ptr [rsp+0x8], rax
    :将 3 放到
    [rsp+0x8]
    中。
  5. mov qword ptr [rsp], rax
    :将 3 放到
    [rsp]
    中。

根据上述分析,画出内存分布图如下:

image

可以看出,传给 sum 的形参 x 和 y 实际是在 main 函数栈分配的。

继续往下执行:

  1. add rsp, 0x10

    rsp
    寄存器加
    0x10
    回收
    sum
    栈空间。
  2. pop rbp
    :将存储在
    0x000000c00003e720
    的值
    0x000000c00003e758
    移到
    rbp
    中。
  3. ret

    sum
    函数返回。

在执行
ret
指令前最后看下寄存器的状态:

(dlv) regs
    Rip = 0x000000000045ff4b
    Rsp = 0x000000c00003e728
    Rbp = 0x000000c00003e758

我们知道
Rip
寄存器存储的是运行指令所在的内存地址,那么问题就来了,当函数返回时,要执行调用函数的下一条指令:

TEXT main.sum(SB) /root/go/src/foundation/ex6/ex6.go
        ex6.go:5        0x45feea*       e831000000              call $main.sum
        ex6.go:5        0x45feef        4889442420              mov qword ptr [rsp+0x20], rax

这里我们需要
main.sum
返回后执行的下一条指令是
mov qword ptr [rsp+0x20], rax
。可是
Rip
指令怎么获得指令所在的地址
0x45feef
呢?

答案在
call $main.sum
这里,这条指令会将下一条指令压栈,在
sum
函数调用
ret
返回时,将之前压栈的指令移到
Rip
寄存器中。这个压栈的内存地址是
0x000000c00003e728
,查看其中的内容:

(dlv) print *(*int)(uintptr(0x000000c00003e728))
4587247

4587247
的十六进制就是
0x45feef

执行
ret

(dlv) si
> main.main() ./ex6.go:5 (PC: 0x45feef)
        ex6.go:4        0x45fece        48c744241801000000      mov qword ptr [rsp+0x18], 0x1
        ex6.go:4        0x45fed7        48c744241002000000      mov qword ptr [rsp+0x10], 0x2
        ex6.go:5        0x45fee0        b801000000              mov eax, 0x1
        ex6.go:5        0x45fee5        bb02000000              mov ebx, 0x2
        ex6.go:5        0x45feea*       e831000000              call $main.sum
=>      ex6.go:5        0x45feef        4889442420              mov qword ptr [rsp+0x20], rax
        ex6.go:5        0x45fef4        e8c749fdff              call $runtime.printlock
        ex6.go:5        0x45fef9        488b442420              mov rax, qword ptr [rsp+0x20]
        ex6.go:5        0x45fefe        6690                    data16 nop
        ex6.go:5        0x45ff00        e85b50fdff              call $runtime.printint
        ex6.go:5        0x45ff05        e8f64bfdff              call $runtime.printnl
(dlv) regs
    Rip = 0x000000000045feef
    Rsp = 0x000000c00003e730
    Rbp = 0x000000c00003e758

可以看到
Rip
指向了下一条指令的位置。

继续往下执行:

  1. mov qword ptr [rsp+0x20], rax
    :将 3 放到
    [rsp+0x20]
    中,
    [rsp+0x20]
    就是存放
    sum
    函数返回值的内存地址。
  2. call $runtime.printint
    :调用
    runtime.printint
    打印返回值 3。

分析完上述调用函数的过程我们可以画出函数栈调用的完整内存分布如下:

image

2. 小结

本文从汇编角度看函数调用的过程,力图做到对函数调用有个比较通透的了解。


前言

在笔者 Java 后端开发的项目经历中,MySQL 和 MongoDB 都有使用过作为后端的数据库来对业务数据进行持久化,两者没有孰优孰劣之分,都可以在合适的场景下发挥出它们的优势。

今天要分享的是一个项目重构过程中如何将数据库选型由原来的 MongoDB 改为 MySQL 的思考,涉及到业务当前的痛点、选型分析、解决的核心思路,最后会给出简单的 demo。

本篇文章侧重在于两者在表设计思维上的转换,而业务数据迁移同步的方案,下一篇文章将给出。


一、痛点所在

该项目是一个【PC端管理后台】+【移动端h5页面】为主业务框架的系统,原来的预期是:在后台配置好活动所需的参数,h5 既可以放在 app 客户端打开,也可以作为url 链接的形式直接在浏览器打开。项目一期的时候,业务方认为这样的运营活动会带来不少的流量和用户。但是到后来业务重心有所调整,引流的方式发生变化,最终导致了项目的一个重构。

主要的原因有以下几点:

  1. 总体的数据量没有预想的那么大

    活动参与人数前期预估为30w+,经历过2个线上活动后的实际总参与人数为5w+,客户端注册用户数为3w+,占全部参与人数的65%左右,远不及预期规模;

  2. 核心接口的并发也没有预想的高

    h5 端的大约 5-8 个的核心接口在实际线上活动进行的最高 QPS 只达到 200-300 左右,CPU 与 内存占用率也未达到设置的告警线(60%);

  3. MySQL 在硬件资源成本上性价比更高

    以阿里云的 RDS for MySQL 与 云数据库 MongoDB 做对比,都是集群部署 + 8核16GB + 100GB 存储 + 1年时长的规格下,前者会比后者便宜7w+RMB;

  4. MySQL 的动态数据源切换方案更成熟

    当时后端的项目已经被全部要求接入多租户改造,市面上开源的、成熟的动态数据源切换方案并不多,而完全专门支持 MongoDB 的是少之又少。

综合以上几点原因,完全放弃该项目是没必要的,但也需要适应当前业务的变化和成本控制,预计花费30人/天,即 2 个后端开发在 2-3 周内完成对该系统的重构,接口和前端页面基本无需调整。


二、选型分析

这里就正式进入技术部分了,首要对比的是两者各自的特点以及适用的场景,这对于把握整个项目的走向是至为关键的。

2.1特点对比

表2-1
对比项 MySQL MongoDB
数据模型 关系型数据库,采用表格(table)的形式存储数据,每一行是一条记录 非关系型(NoSQL)、文档型数据库,数据以文档(document)的非结构化形式存储
查询方式 使用标准的 SQL 进行查询,提供了丰富的查询条件、连接(join)、排序、分页等功能 使用基于 JSON 结构特点的的查询语句,支持大量数据的聚合、统计、分析
事务支持 支持 ACID 事务,确保在多条操作组成的事务中数据的一致性和可靠性。特别是在InnoDB引擎中,提供了完整的事务支持 4.0 版本开始引入了多文档事务支持,可以保证在一定范围内的读写操作具备ACID特性。但对于需要严格事务特性的复杂业务场景不及 MySQL 成熟
数据处理 在处理复杂查询和高并发写入时,需要依赖索引来优化性能,或者通过分区、分片等手段进行水平扩展 在水平扩展和实时数据处理方面优势很大,通过分片(sharding)技术可以轻松应对海量数据存储和高并发读写
空间占用 由于数据结构紧凑,对数据的存储通常更为节省空间,特别是对于简单数据结构和关系清晰的数据集 由于文档存储的灵活性和包含元数据等因素,通常占用空间较大
项目集成 已经有成熟的第三方 ORM 框架支持,如:Mybatis、Mybatis Plus、io.mybatis、tk.mybatis等 目前集成在 Spring Boot 项目里的增删改查都是基于 MongoRepository 和 MongoTemplate 来实现的

2.2场景对比

  • MySQL
    1. Web 应用程序:如常见的 xx 管理后台、xx 管理系统,电商 web 网站,包括一些移动端 h5 的页面等;
    2. 企业级应用:如常见的客户关系管理系统(CRM)、人力资源管理系统(HRM)和供应链管理系统(SCM)等,MySQL 提供了强大的事务支持;
    3. 嵌入式开发:需要轻量级数据库的软件、硬件和设备,MySQL 可以作为一个嵌入式数据库引擎集成到各种应用程序中,提高应用程序的可移植性;
    4. 云计算和大数据:MySQL 在云数据库服务中被广泛使用,支持云原生应用程序和分布式数据处理框架,如 Hadoop 和 Spark 等。
  • MongoDB
    1. 处理实时数据:非常适合处理移动互联网应用常见的大部分场景,如用户活动、社交互动、在线购物等;
    2. 内容管理系统(CMS):用于处理文章、稿件、评论、图片、视频等富媒体内容的存储和增删改查,支持全文搜索和实时更新;
    3. 数据聚合仓库:存储原始或半处理的业务数据,利用聚合框架进行实时数据聚合、统计分析和数据可视化;
    4. 游戏数据管理:存储玩家账户信息、游戏进度、成就、虚拟物品、社交关系等,快速计算和更新游戏排行榜数据,支持实时查询等。


三、核心思路

我们知道,在 MongoDB 中,一条数据的记录(文档)格式是 json 的 格式,即强调 key-value 的关系。

表2-2

对于一个 MongoDB 的文档来说,里面可以包含很多这个集合的属性,就像一篇文章里面有很多章节一样。

以下面这个图2-1为例子,activity 是一个完整的集合,里面包含了很多属性,id、name、status等基本属性,还有 button 和 share 等额外属性,这些属性共同构成了这个集合。

但这样的结构在 MySQL 里是不能实现的,理由很简单,MySQL 强调关系,1:1 和 1:N 是十分常见的关系。
可以看到,下面将基本属性放在 activity 作为主表,而其它额外属性分别放在了 button 表和 share 表里,同时将主表的主键 id 作为了关联表的 ac_id 外键。

图2-1

那要怎么替换才能实现呢?MongoDB 改成 MySQL 的核心在于:原有的集合关系以及嵌套关系,需要拆表成1 : N 的范式关系,用主键-外键的方式做关联查询,同时避免 join 连接查询。


四、demo 示例

下面首先分别给出实际的表设计与实体映射,包括 MongoDB 和 MySQL 的,然后再通过简单的查询代码来体现两者的区别。

4.1实体映射

4.1.1MongoDB 实体
@EqualsAndHashCode(callSuper = true)
@Data
public class Activity extends BaseEntity {
    @Id
    private String id;
    private String name;
    private ActivityStatusEnum status;
    private ReviewStatusEnum review;
    private ActivityTypeEnum type;
    private ActivityButton button;
    private ActivityShare share;
}
4.1.2MySQL 实体
@Data
public class Activity extends BaseEntity {
    @Id
    private Integer id;
    private String name;
    private Integer status;
    private Integer review;
    private Integer type;
}
@Data
public class ActivityButton extends BaseEntity {
    @Id
    private Integer id;
    private Integer acId;
    private String signUp;
    private Integer status;
    private String desc;
}
@Data
public class ActivityShare extends BaseEntity {
    @Id
    private String id;
    private Integer acId;
    private String title;
    private String iconUrl;
}

4.2查询代码

下面就根据主键 id 和状态这两个条件进行活动详情的查询。

4.2.1MongoDB 查询
    /**
     * @apiNote 通过主键id和活动状态查询活动
     * @param id 主键id
     * @return 实体
     */
    @Override
    public Avtivity getDetailById(String id) {
        return this.repository.findById(id)
                .filter(val -> ActivityStatusEnum.ON.equals(val.getStatus()))
                .orElseThrow(() -> new RuntimeException("该活动不存在!"));
    }
4.2.2MySQL 查询
    @Resource
    private ActivityShareService activityShareService;
    @Resource
    private ActivityButtonService activityButtonService;
    @Override
    public ActivityVO detail(Integer id) {
        ExampleWrapper<Activity, Serializable> wrapper = this.wrapper();
        wrapper.eq(Activity::getid, id)
                .eq(Activity::getStatus(), DataStatusEnum.NORMAL.getCode());
        Activity activity = Optional.ofNullable(this.findOne(wrapper.example()))
            .orElseThrow(() -> new RuntimeException("该活动不存在!"));
        ActivityVO vo = new ActivityVO();
        vo.setName(Optional.ofNullable(activity.getName()).orElse(StringUtils.EMPTY));
        //查两个关联表
        vo.setShare(this.activityShareService.getShare(activity.getId()));
        vo.setButton(this.activityButtonService.getButton(activity.getId()));
        return vo;
    }


五、文章小结

使用 MySQL 替换 MongoDB 的小结如下:

  1. 做技术选型时要充分考虑对比两者的特点以及应用场景,选择最合适的
  2. 如非必要,那么还是继续沿用原来的设计;一旦选择重构,那么就要考虑成本
  3. 原有的集合关系以及嵌套关系,需要拆表成1 : N 的范式关系,用主键-外键的方式做关联

最后,如有不足和错误,还请大家指正。或者你有其它想说的,也欢迎大家在评论区交流!

开心一刻

清明节那天,看到一小孩在路边烧纸
时不时地偷偷往火堆里扔几张考试卷子
边烧边念叨:爷爷呀,你岁数大了,在那边多做做题吧,对脑子好,要是有不懂的地方,就把我老师带走,让他教您!

开心一刻

前提说明

假设
MySQL 5.7.36
的库
qsl_datax

mysql5

有表
qsl_datax_source
和 数据

CREATE TABLE `qsl_datax_source`  (
  `id` bigint(20) NOT NULL COMMENT '自增主键',
  `username` varchar(255) NOT NULL COMMENT '姓名',
  `password` varchar(255) NOT NULL COMMENT '密码',
  `birth_day` date NOT NULL COMMENT '出生日期',
  `remark` text,
  PRIMARY KEY (`id`)
) ENGINE = InnoDB ;
INSERT INTO `qsl_datax_source` VALUES (1, '张三', 'z123456', '1991-01-01', '张三');
INSERT INTO `qsl_datax_source` VALUES (2, '李四', 'l123456', '1992-01-01', '李四');
INSERT INTO `qsl_datax_source` VALUES (3, '王五', 'w123456', '1993-01-01', '王五');
INSERT INTO `qsl_datax_source` VALUES (4, '麻子', 'm123456', '1994-01-01', '麻子');

需要将表中数据同步到
MySQL 8.0.30

mysql8

sql_db
库的
qsl_datax_source
表中,并且只用
JDBC
的方式,该如何实现?你们可能觉得非常简单,直接引入
mysql-connector-j
依赖

<dependency>
    <groupId>com.mysql</groupId>
    <artifactId>mysql-connector-j</artifactId>
    <version>8.0.33</version>
</dependency>

然后直接写同步代码

public static void main(String[] args) throws Exception {
    String url5 = "jdbc:mysql://192.168.2.118:3307/qsl_datax?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai";
    String url8 = "jdbc:mysql://192.168.2.118:3311/sql_db?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai";
    Properties pro = new Properties();
    pro.put("user", "root");
    pro.put("password", "123456");
    // 加载驱动类
    Class.forName("com.mysql.cj.jdbc.Driver");
    // 建立连接
    Connection conn5 = DriverManager.getConnection(url5, pro);
    // 查数据
    Statement statement = conn5.createStatement();
    ResultSet resultSet = statement.executeQuery("SELECT * FROM qsl_datax_source");
    StringBuilder insertSql = new StringBuilder("INSERT INTO qsl_datax_source(id,username,password,birth_day,remark) VALUES ");
    while (resultSet.next()) {
        // 拼接sql
        insertSql.append("(")
                .append(resultSet.getLong("id")).append(",")
                .append("'").append(resultSet.getString("username")).append("',")
                .append("'").append(resultSet.getString("password")).append("',")
                .append("'").append(resultSet.getString("birth_day")).append("',")
                .append("'").append(resultSet.getString("remark")).append("'")
                .append("),");
    }
    // 因为mysql5和mysql8的账密是一样的,所以用的同一个 pro
    Connection conn8 = DriverManager.getConnection(url8, pro);
    Statement stmt = conn8.createStatement();
    int count = stmt.executeUpdate(insertSql.substring(0, insertSql.length() - 1));
    System.out.println("新插入记录数:" + count);

    resultSet.close();
    statement.close();
    stmt.close();
    conn5.close();
    conn8.close();
}

执行后输出

新插入记录数:4


MySQL 8.0.30
的库
sql_db
查看表
qsl_datax_source
的数据

同驱动同步成功

同步完成,这不是有手就行吗?

行不行

一般来说,高版本的驱动会兼容低版本的数据库,但也不绝对,或者说兼容不全;MySQL版本、驱动版本、JDK版本对应关系如下

mysql版本驱动版本jdk版本对应关系

mysql-connector-j 8.0.33 驱动兼容 MySQL 5.7.36,所以上面的同步没问题,但如果 MySQL 版本很低(比如:5.1.x),例如从
MySQL 5.1.8
同步到
MySQL 8.0.30
,如上同步代码还能同步成功吗(我就不去试了,你们也别去试了,因为引申目的已经达到了),所以保险做法是

mysql-connector-j 8.0.33 操作 MySQL 8.0.30

mysql-connector-java 5.1.49 操作 MySQL 5.7.36

mysql-connector-java 5.0.x 操作 MySQL 5.0.x

所以问题就来了

如何用 mysql-connector-java 5.1.49 从 MySQL 5.7.36 查数据后,用 mysql-connector-j 8.0.33 将数据插入 MySQL 8.0.30

多驱动操作

你们肯定也觉得简单,继续引入
mysql-connector-java 5.1.49
依赖

<dependency>
    <groupId>com.mysql</groupId>
    <artifactId>mysql-connector-j</artifactId>
    <version>8.0.33</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.49</version>
</dependency>

然后调整代码

public static void main(String[] args) throws Exception {
    String url5 = "jdbc:mysql://192.168.2.118:3307/qsl_datax?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai";
    String url8 = "jdbc:mysql://192.168.2.118:3311/sql_db?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai";
    Properties pro = new Properties();
    pro.put("user", "root");
    pro.put("password", "123456");
    // 加载驱动类
    Class.forName("com.mysql.jdbc.Driver");
    // 建立连接
    Connection conn5 = DriverManager.getConnection(url5, pro);
    // 查数据
    Statement statement = conn5.createStatement();
    ResultSet resultSet = statement.executeQuery("SELECT * FROM qsl_datax_source");
    StringBuilder insertSql = new StringBuilder("INSERT INTO qsl_datax_source(id,username,password,birth_day,remark) VALUES ");
    while (resultSet.next()) {
        // 拼接sql
        insertSql.append("(")
                .append(resultSet.getLong("id")).append(",")
                .append("'").append(resultSet.getString("username")).append("',")
                .append("'").append(resultSet.getString("password")).append("',")
                .append("'").append(resultSet.getString("birth_day")).append("',")
                .append("'").append(resultSet.getString("remark")).append("'")
                .append("),");
    }
    Class.forName("com.mysql.cj.jdbc.Driver");
    // 因为mysql5和mysql8的账密是一样的,所以用的同一个 pro
    Connection conn8 = DriverManager.getConnection(url8, pro);
    Statement stmt = conn8.createStatement();
    int count = stmt.executeUpdate(insertSql.substring(0, insertSql.length() - 1));
    System.out.println("新插入记录数:" + count);

    resultSet.close();
    statement.close();
    stmt.close();
    conn5.close();
    conn8.close();
}

和之前代码对比下

多驱动使用前后代码比较

调整甚微;执行后输出结果如下

Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
新插入记录数:4

如果只从结果来看,确实同步成功了,但第一行的
警告
值得得我们琢磨下

类 com.mysql.jdbc.Driver 加载中。这个类已经被弃用。新的驱动类是 com.mysql.cj.jdbc.Driver,这个驱动通过 SPI 机制已经自动注册了,不需要手动加载

从中我们会产生 2 个疑问

  1. com.mysql.jdbc.Driver 不应该是
    mysql-connector-java 5.1.49
    的吗,怎么会被弃用
  2. SPI 机制是什么,
    com.mysql.cj.jdbc.Driver
    什么时候加载的

我们先来看问题 2,关于 SPI 机制可查看

记一次 JDK SPI 配置不生效的问题 → 这么简单都不会,还是回家养猪吧

DriverManager
有静态代码块

static {
    loadInitialDrivers();
    println("JDBC DriverManager initialized");
}

loadInitialDrivers()
中有这样一段代码

loadInitialDrivers

自动加载了驱动,而驱动类中往往有类似如下代码

驱动类注册驱动实例

将驱动实例注册给
DriverManager
,所以不需要再去手动加载驱动类了

从 JDBC 4.0 开始,JDBC 驱动支持自动加载功能,不再需要调用 Class.forName 来加载驱动

我们回到问题 1,同步的告警信息

Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.

肯定是
mysql-connector-j 8.0.33
告警出来的,因为
mysql-connector-java 5.1.49
没有类

com.mysql.cj.jdbc.Driver

对不对?全局搜索下

This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'

同步告警信息出处

点进去,我们会发现
mysql-connector-j 8.0.33
也有类

com.mysql.jdbc.Driver

大家看仔细了,这个 Driver 是没有把自己的实例注册进
java.sql.DriverManager

mysql8驱动com_mysql_jdbc_Driver类

这说明什么,说明是从
mysql-connector-j 8.0.33
加载的类:
com.mysql.jdbc.Driver
,而不是从
mysql-connector-java 5.1.49
加载

我们来捋一捋整个同步流程

  1. 通过 SPI 机制,会加载文件
    META-INF/services/java.sql.Driver
    中配置的类

    mysql-connector-j 8.0.33 的 java.sql.Driver 文件内容


    mysql8驱动java_sql_Driver

    mysql-connector-java 5.1.49 的 java.sql.Driver 文件内容


    mysql5驱动_java_sql_Driver

    类加载器加载
    com.mysql.cj.jdbc.Driver
    的时候,毫无疑问找到的肯定是 mysql-connector-j 8.0.33 jar包中的,而加载
    com.mysql.jdbc.Driver
    的时候,类加载器找到的却是 mysql-connector-j 8.0.33 jar包中的,而非 mysql-connector-java 5.1.49 jar包中的,所以告警了

  2. 代码中手动调用
    Class.forName("com.mysql.jdbc.Driver");
    进行类加载,根据
    双亲委派模型
    ,已经加载过的类不会再加载,所以相当于没做任何操作

    前面的告警信息不是这里触发出来的!!!不信的话可以注释掉该行代码执行下,你们会发现仍有同样的告警信息

  3. 从 MySQL5 查数据,用的驱动实际是
    com.mysql.cj.jdbc.Driver


    连接mysql5的实际驱动

    因为 DriverManager 中合适的驱动只有这一个

  4. 代码中手动调用
    Class.forName("com.mysql.cj.jdbc.Driver");
    进行类加载,根据
    双亲委派模型
    ,已经加载过的类不会再加载,所以相当于没做任何操作

  5. 从 MySQL8 查数据,用的驱动毫无疑问也只能是
    com.mysql.cj.jdbc.Driver

所以整个同步,用的都是 mysql-connector-j 8.0.33 下的驱动,mysql-connector-java 5.1.49 压根就没用到,是不是在你们的意料之外?

小孩 震惊

所以如何实现我们最初的想法?

如何用 mysql-connector-java 5.1.49 从 MySQL 5.7.36 查数据后,用 mysql-connector-j 8.0.33 将数据插入 MySQL 8.0.30

maven-shade-plugin

甲方扔给两个存在包名与类名均相同的Jar包,要在工程中同时使用怎么办?
中谈到了好些解决办法,但 maven-shade-plugin 相对而言是最优解,其具体使用可参考

maven 插件之 maven-shade-plugin,解决同包同名 class 共存问题的神器

那如何应该到当前案例中来了,其实很简单,只需要用到 maven-shade-plugin 的
重定位 class
功能即可,请看我表演

  1. 对 mysql-connector-j 8.0.33 进行 class 重定位

    新建一个工程
    mysql-jdbc
    ,没有任何代码和配置文件


    mysql-jdbc8

    只有一个
    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.qsl</groupId>
        <artifactId>mysql-jdbc8</artifactId>
        <version>8.0.33</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.mysql</groupId>
                <artifactId>mysql-connector-j</artifactId>
                <version>8.0.33</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.6.0</version>
                    <executions>
                        <execution>
                            <!-- 和 package 阶段绑定 -->
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <relocations>
                                    <relocation>
                                        <pattern>com.mysql.jdbc</pattern>
                                        <shadedPattern>com.mysql.jdbc8</shadedPattern>
                                    </relocation>
                                </relocations>
                                <filters>
                                    <filter>
                                        <artifact>com.qsl:mysql-jdbc8</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.*</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    

    mvn install
    一下,将重新打包后的 jar 部署到本地仓库


    mysql-jdbc8_8_0_30
  2. 调整示例代码的 maven 依赖

    mysql-connector-j 8.0.33 调整成 mysql-jdbc8 8.0.33,mysql-connector-java 5.1.49 原样保留

    <dependencies>
        <dependency>
            <groupId>com.qsl</groupId>
            <artifactId>mysql-jdbc8</artifactId>
            <version>8.0.33</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>
    </dependencies>
    
  3. 调整同步代码

    去掉手动加载驱动,增加 connection 驱动信息版本输出

    public static void main(String[] args) throws Exception {
        String url5 = "jdbc:mysql://192.168.2.118:3307/qsl_datax?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai";
        String url8 = "jdbc:mysql://192.168.2.118:3311/sql_db?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai";
        Properties pro = new Properties();
        pro.put("user", "root");
        pro.put("password", "123456");
        // 建立连接
        Connection conn5 = DriverManager.getConnection(url5, pro);
        // 查数据
        Statement statement = conn5.createStatement();
        System.out.println("conn5 driver version: " + conn5.getMetaData().getDriverVersion());
        ResultSet resultSet = statement.executeQuery("SELECT * FROM qsl_datax_source");
        StringBuilder insertSql = new StringBuilder("INSERT INTO qsl_datax_source(id,username,password,birth_day,remark) VALUES ");
        while (resultSet.next()) {
            // 拼接sql
            insertSql.append("(")
                    .append(resultSet.getLong("id")).append(",")
                    .append("'").append(resultSet.getString("username")).append("',")
                    .append("'").append(resultSet.getString("password")).append("',")
                    .append("'").append(resultSet.getString("birth_day")).append("',")
                    .append("'").append(resultSet.getString("remark")).append("'")
                    .append("),");
        }
        // 因为mysql5和mysql8的账密是一样的,所以用的同一个 pro
        Connection conn8 = DriverManager.getConnection(url8, pro);
        System.out.println("conn8 driver version: " + conn8.getMetaData().getDriverVersion());
        Statement stmt = conn8.createStatement();
        int count = stmt.executeUpdate(insertSql.substring(0, insertSql.length() - 1));
        System.out.println("新插入记录数:" + count);
    
        resultSet.close();
        statement.close();
        stmt.close();
        conn5.close();
        conn8.close();
    }
    

处理就算完成,我们执行一下看结果

maven-shade-plugin改造后执行结果

之前的警告确实没了,但新的问题又来了:为什么驱动用的是同一个,mysql-connector-java 5.1.49 中的驱动为什么没有被用到?

一个bug改一天

mysql-connector-java 5.1.49 中的 com.mysql.jdbc.Driver 肯定是被正常加载了,并且注册到了 DriverManager 中,这点大家认同不?(不认同也没关系,后面会得到证明)那它为什么没有被使用了,我们需要跟一下
DriverManager.getConnection
的源码了;源码跟进去比较简单,我就带你们一步一步跟了,最终回来到如下方法

java.sql.DriverManager#getConnection(java.lang.String, java.util.Properties, java.lang.Class<?>)

这个方法里面有这么一段代码

for(DriverInfo aDriver : registeredDrivers) {
    // If the caller does not have permission to load the driver then
    // skip it.
    if(isDriverAllowed(aDriver.driver, callerCL)) {
        try {
            println("    trying " + aDriver.driver.getClass().getName());
            Connection con = aDriver.driver.connect(url, info);
            if (con != null) {
                // Success!
                println("getConnection returning " + aDriver.driver.getClass().getName());
                return (con);
            }
        } catch (SQLException ex) {
            if (reason == null) {
                reason = ex;
            }
        }

    } else {
        println("    skipping: " + aDriver.getClass().getName());
    }

}

我们打个断点跟一下(com.mysql.cj.jdbc.Driver 排在 com.mysql.jdbc.Driver 前面!!!)

debug_驱动列表

isDriverAllowed
作用是检查一个给定的
Driver
对象是否被允许通过指定的
ClassLoader
加载,我们不需要关注,而我们需要关注的是

Connection con = aDriver.driver.connect(url, info);

跟进去来到
com.mysql.cj.jdbc.NonRegisteringDriver#connect

debug_connect

感兴趣的可以继续跟进
ConnectionUrl.acceptsUrl(url)
,但我觉得没必要了,很明显就是根据正则表达式去匹配 url,看看是否适配,因为 MySQL5 的 url 与 MySQL8 的 URL 格式一致

String url5 = "jdbc:mysql://192.168.2.118:3307/qsl_datax?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai";
String url8 = "jdbc:mysql://192.168.2.118:3311/sql_db?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai";

因为 com.mysql.cj.jdbc.Driver 排在 com.mysql.jdbc.Driver 前面,所以用它连接了 MySQL5 和 MySQL8,前面的问题

为什么驱动用的是同一个,mysql-connector-java 5.1.49 中的驱动为什么没有被用到?

是不是就清楚了?你们可能又有疑问了:为什么不是 com.mysql.jdbc.Driver 排在前面?这个跟类加载的顺序有关,超出了本文范围,你们自行去查阅。那还能实现最初的目的吗

用 mysql-connector-java 5.1.49 从 MySQL 5.7.36 查数据后,用 mysql-connector-j 8.0.33 将数据插入 MySQL 8.0.30

肯定是能的,看我调整下代码

public static void main(String[] args) throws Exception {
    String url5 = "jdbc:mysql://192.168.2.118:3307/qsl_datax?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai";
    String url8 = "jdbc:mysql://192.168.2.118:3311/sql_db?useUnicode=true&useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai";
    Properties pro = new Properties();
    pro.put("user", "root");
    pro.put("password", "123456");

    // 建立连接
    Driver driver5 = getDriver("com.mysql.jdbc.Driver");
    Connection conn5 = driver5.connect(url5, pro);
    // 查数据
    Statement statement = conn5.createStatement();
    System.out.println("conn5 driver version: " + conn5.getMetaData().getDriverVersion());
    ResultSet resultSet = statement.executeQuery("SELECT * FROM qsl_datax_source");
    StringBuilder insertSql = new StringBuilder("INSERT INTO qsl_datax_source(id,username,password,birth_day,remark) VALUES ");
    while (resultSet.next()) {
        // 拼接sql
        insertSql.append("(")
                .append(resultSet.getLong("id")).append(",")
                .append("'").append(resultSet.getString("username")).append("',")
                .append("'").append(resultSet.getString("password")).append("',")
                .append("'").append(resultSet.getString("birth_day")).append("',")
                .append("'").append(resultSet.getString("remark")).append("'")
                .append("),");
    }
    // 因为mysql5和mysql8的账密是一样的,所以用的同一个 pro
    Driver driver8 = getDriver("com.mysql.cj.jdbc.Driver");
    Connection conn8 = driver8.connect(url8, pro);
    System.out.println("conn8 driver version: " + conn8.getMetaData().getDriverVersion());
    Statement stmt = conn8.createStatement();
    int count = stmt.executeUpdate(insertSql.substring(0, insertSql.length() - 1));
    System.out.println("新插入记录数:" + count);

    resultSet.close();
    statement.close();
    stmt.close();
    conn5.close();
    conn8.close();
}

private static Driver getDriver(String driverClassName) {
    Enumeration<Driver> drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
        Driver driver = drivers.nextElement();
        if (driver.getClass().getName().equals(driverClassName)) {
            return driver;
        }
    }
    throw new RuntimeException("未找到驱动:" + driverClassName);
}

执行一下看结果

改造成功_执行结果

此时我就想说一句:还有谁?

还有谁

总结

  1. 示例代码:
    mysql-driver-demo

    不包括 mysql-jdbc8 的代码

  2. 就 MySQL 而言,mysql-connector-j 8 驱动兼容 MySQL 5.5、5.6、5.7,实际工作中是可以用 mysql-connector-j 8 去连 MySQL 5.7的

  3. SQL Server 就存在驱动不兼容的情况

    Microsoft JDBC Driver for SQL Server 支持矩阵


    SQLServer驱动兼容情况
  4. maven-shade-plugin 来实现多版本驱动的共存,简单高效,值得掌握!

    maven 插件之 maven-shade-plugin,解决同包同名 class 共存问题的神器

版本:DataX v202309  DataXWeb 2.1.3-alpha-release

DataX:

Github:https://github.com/alibaba/DataX

功能介绍文档:https://github.com/alibaba/DataX/blob/master/introduction.md

文档上虽然只写了Linux系统,但实际部署Windows也可以

JDK版本使用1.8即可

Python如果环境的版本可以选择的话,可以使用2.6或者2.7,我这边使用的是3.12.5

Maven 3.x是编译时需要的条件

一开始下载的是v202308版本,安装包下载路径:https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz

因为要Python3.x,所以替换DataX /bin目录下py文件(替换的文件在:DataXWeb:doc/datax-web/datax-python3/)

由于DataX对Mysql 只支持5.x,但是我这边的Mysql DB是 8.x的

所以下载了v202309的源码,调整代码使其支持mysql 8.x

(修改代码的步骤:https://blog.csdn.net/weixin_41640312/article/details/132019719)

然后按照github中的步骤打包即可

问题:

打包过程中发现oceanbasev10writer报错,项目的libs下缺少特定jar文件,

解决:

去master分支找到了这个jar,下载后复制,即可打包成功(打包过程非常慢,不知道是不是网络的问题)

问题:

创建了Mysql 数据源之间的迁移Job(文档中并没有说mysql的限制版本,也就没有想到支持的Mysql版本这么低)

配置是正确的,但dataX一直报错

解决:

去搜索了一下,才发现版本限制,所以切换了版本

问题:

打包好最新版后,运行Mysql Job还是报错(在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数)

解决:

在打包后的datax\conf\core.json 中core.transport.speed.byte由-1修改为2000000

DataXWeb:

Github:https://github.com/WeiYe-Jing/datax-web

众所周知,DataX是使用Python命令行运行Job的Json文件配置来同步数据源

所以使用DataXWeb来搭配使用

一开始使用的是DataXWeb v2.1.2版本,但是配置字段映射方面有点不太容易理解,就换了2.1.3-alpha-release

1. 下载源码

2. 运行datax-admin&datax-executor(按需修改配置文件)

配置文件都有说明,按照说明配置DB,以及datax.py的路径等即可。

相对来讲新版的配置比旧版更容易理解,但是,页面的数据不是很即使,操作时,还是需要刷新,不知道以后会不会调整

至于DataX创建Job的步骤我就不提了,使用DataXWeb,可以很容易创建一个Job

其他:

附上DataX支持的数据源(github上都有)

DataX的核心架构

Job通过源端切分策略,切分为多个Task,然后调用Schedule模块,根据配置的并发参数等,将Task划分为TaskGroup(默认一个TaskGroup5个Task)

每一个Task中启用一个线程,完成Reader->Channel->Writer流程

Demosaic,中文直接发翻译为去马赛克, 但是其本质上并不是去除马赛克,这让很多第一次接触这个名词或者概念的人总会想的更多。因此,一般传感器在采集信息时一个位置只采集RGB颜色中的一个通道,这样可以减少采集量,降低成本,由于人的视觉对绿色最为敏感,所以一般情况下,绿色分量会比红色和蓝色分量多一倍的信息,这样,根据RGB不同的位置排布,就产生了RGGB、GBRG、GRBG、BGGR四种常用的模式,比如RGGB模式就如下所示:




RGGB            BGGR            GBRG             GRBG

去马赛克的目的就是从这些确实的信息中尽量的完美的复原原始信息,比如上图第一个点,只有红色分量是准确的,那就要想办法通过其他的信息重构出改点的绿色和蓝色分量。

目前,关于这方面的资料也是很多的,这里我描述下目前我已经优化和处理的四个算法,并分享部分代码。

一、双线性处理

一个最为直接和简单的想法就是利用领域的相关信息通过插值来弥补当前缺失的颜色分量,我们以RGGB格式为例,参考上图(坐标都是从0开始,X轴从左向右递增,Y轴从上向下递增)。

先不考虑边缘。

比如(1,1)坐标这个点,现在已经有了B这个分量,缺少R和G这个分量,但是他四个对角线上都有R分量,因此,可以使用这个4个像素的平均值来计算改点的R分量,而在其上下左右四个方向有恰好有4个点的G分量,同理可以用这个四个的平均值来评估当前点的G值。

在考虑(1,2)这个坐标点,他的当前有效值是G分量,缺少R和B分量,而他则没有(1,1)点那么幸运,他周边的有效R和B分量都只有2个,因此,只能利用这两个有效值的平均值来评估该点的R/B分量。

其他的点类似处理。

在考虑边缘,由于边缘处的像素总有某一个方向或某二个方向缺少像素,因此,可以利用镜像的关系把缺少的那一边取镜像的位置信息来补充。

一个简单的高效的C++代码如下所示:

int IM_DecodeBayerRG8ToBGR_Bilinear_PureC(unsigned char*Src, BitmapData Dest)
{
int Status =IM_STATUS_OK;int Width = Dest.Width, Height = Dest.Height, Stride =Dest.Stride;if (((Width & 1) == 1) || ((Height & 1) == 1)) returnIM_STATUS_INVALIDPARAMETER;if ((Width < 8) || (Height < 8)) returnIM_STATUS_INVALIDPARAMETER;
unsigned
char* RowCopy = (unsigned char*)malloc((Width + 2) * 3 * sizeof(unsigned char));if (RowCopy == NULL) returnIM_STATUS_OUTOFMEMORY;
unsigned
char* First =RowCopy;
unsigned
char* Second = RowCopy + (Width + 2);
unsigned
char* Third = RowCopy + (Width + 2) * 2;
unsigned
char* SrcP = (unsigned char*)Src;
Second[
0] = SrcP[0];
memcpy(Second
+ 1, SrcP, Width * sizeof(unsigned char));
Second[Width
+ 1] = SrcP[Width - 1];
memcpy(First, Second, (Width
+ 2) * sizeof(unsigned char)); //第一行和第二行一样Third[0] =SrcP[Width];
memcpy(Third
+ 1, SrcP + Width, Width * sizeof(unsigned char));
Third[Width
+ 1] = SrcP[Width + Width - 1];for (int Y = 0; Y < Height; Y++)
{
unsigned
char* LinePD = Dest.Scan0 + Y *Stride;if (Y != 0)
{
unsigned
char* Temp = First; First = Second; Second = Third; Third =Temp;
}
if (Y == Height - 1)
{
memcpy(Third, Second, (Width
+ 2) * sizeof(unsigned char));
}
else{
Third[
0] = SrcP[(Y + 1) *Width];
memcpy(Third
+ 1, SrcP + (Y + 1) * Width, Width * sizeof(unsigned char));
Third[Width
+ 1] = SrcP[(Y + 1) * Width + Width - 1];
}
if ((Y & 1) == 0) //偶数列
{for (int X = 0; X < Width; X++, LinePD += 3)
{
int P0 = First[X], P1 = First[X + 1], P2 = First[X + 2];int P3 = Second[X], P4 = Second[X + 1], P5 = Second[X + 2];int P6 = Third[X], P7 = Third[X + 1], P8 = Third[X + 2];if ((X & 1) == 0)
{
LinePD[
0] = (P0 + P2 + P6 + P8 + 2) >> 2;
LinePD[
1] = (P1 + P3 + P5 + P7 + 2) >> 2;
LinePD[
2] =P4;
}
else{
LinePD[
0] = (P1 + P7 + 1) >> 1;
LinePD[
1] =P4;
LinePD[
2] = (P3 + P5 + 1) >> 1;
}
}
}
else{for (int X = 0; X < Width; X++, LinePD += 3)
{
int P0 = First[X], P1 = First[X + 1], P2 = First[X + 2];int P3 = Second[X], P4 = Second[X + 1], P5 = Second[X + 2];int P6 = Third[X], P7 = Third[X + 1], P8 = Third[X + 2];if ((X & 1) == 0)
{
LinePD[
0] = (P3 + P5 + 1) >> 1;
LinePD[
1] =P4;
LinePD[
2] = (P1 + P7 + 1) >> 1;
}
else{
LinePD[
0] =P4;
LinePD[
1] = (P1 + P3 + P5 + P7 + 2) >> 2;
LinePD[
2] = (P0 + P2 + P6 + P8 + 2) >> 2;
}
}
}
}
free(RowCopy);
returnIM_STATUS_OK;
}

这个算法是非常高效的,而且极易使用指令集优化,在一台普通的配置的PC上(12th Gen Intel(R) Core(TM) i7-12700F   2.10 GHz)处理1902*1080的图,大概只需要2ms,SSE优化后可以做到0.6ms,但是这个方法忽略边缘结构和通道间的相关性,从而容易导致颜色伪彩和图像模糊,比如下面这个经典的测试图,解码后的结果有点惨不忍睹。


用彩色显示RGGB格式图(即把其他通道的颜色分量设计为0)                  双线性解码后的结果

在解码后的水平和垂直方向栅栏中都可以看到明显的色彩异常。

但是由于这个算法的极度高效性,在有些要求不高的场合,依旧可以使用该算法。

二、Hamilton-Adams算法

这也是个非常经典的算法,在说明这个算法之前,必须说明下色差恒定理论。

色差恒定准则与色比恒定准则都是基于颜色通道之间的相关性,目的都是把颜色通道之间的相关性信息引入颜色插值算法,提高插值的准确性。色差相比于色比有两点优势:

第一,色差的运算简单,更容易实现。第二, 色比在G通道接近0时误差较大,色差不存在这类问题。因此,绝大多数颜色插值算法中使用了色差。

那么色差恒定理论,其最为核心的思想就是:临近的两个彩色像素,其相同颜色分量之间的差异值应该近似差不多,用公式表示如下:

R(X,Y) - R(X-1,Y)  = G(X,Y) - G(X-1,Y) = B(X,Y) - B(X-1,Y)

那这是时候如果我们已经获取了某个颜色通道的所有位置的值,通过这个公式就很容易推导出其他位置缺失的值了。

我们还是以上面的(1,1)坐标点为例,假定我们已经获取了所有G通道的数据,也就是说这个(1,1)这个点实际只有R通道数据缺失了(B数据本身就有),这个时候根据颜色恒差理论,应该有

R(1,1) - R(0,0) = G(1,1) - G(0,0)   ------> R(1,1) = G(1,1) + R(0,0) - G(0, 0)

实际上满足这个等式还有(0,2)、(2,0)、(2,2)这三个点(这三个点的红色分量是准确值),所以为了得到更好的精度,我们可以通过下式最终确定R(1,1)的值。

R(1,1) =   (G(1,1) + R(0,0) - G(0, 0) +  G(1,1) + R(0,2) - G(0, 2) +  G(1,1) + R(2,0) - G(2, 0) + G(1,1) + R(2,2) - G(2, 2)) /4

整理后即为:

R(1,1) =   G(1,1) + (R(0,0)  + R(0,2) + R(2, 0) + R(2,2) - G(0,0)  -G(0,2) -G(2, 0) -G(2,2)) / 4

对于(1,2)这个点,G通道数据本来就有,缺少R和B,那根据颜色恒差理论,应该有

R(1,2) - G(1,2) = R(0,2) - G(0,2)

B(1,2) - G(1,2) = B(1,1) - G(1,1)

同样的道理,还有

R(1,2) - G(1,2) = R(2,2) - G(2,2)

B(1,2) - G(1,2) = B(1,3) - G(1,3)

类似(1,2)的方法,把他们综合起来可以得到更为精确的结果:

R(1,2) = G(1,2) + ((R(0,2) + R(2,2) - G(0,2) - G(2,2)) / 2

B(1,2) = G(1,2) + ((B(1,1) + B(1,3) - G(0,2) - G(2,2)) / 2

以上利用颜色恒差理论,就把各个通道之间的数据关联了起来。那么前面的计算都有一个前提条件,就是绿色通道的数据都已经知道了。

我们前面说过,绿色通道的数据量本身只缺少了一半,而缺少了那一半中任何一个点的数据都可以用周边四个点的领域数据填充,因此,如果我们绿色通道就这样处理,而红色和蓝色则用颜色恒差理论借助绿色通道的数据后结果如何呢,我们就把这样做的算法叫做CDCT把,一个简单的严重这个算法的代码如下所示:

int IM_DecodeBayerRG8ToBGR_CDCT_PureC(unsigned char*Src, BitmapData Dest)
{
int Status =IM_STATUS_OK;int Width = Dest.Width, Height = Dest.Height, Stride =Dest.Stride;if (((Width & 1) == 1) || ((Height & 1) == 1)) returnIM_STATUS_INVALIDPARAMETER;if ((Width < 8) || (Height < 8)) returnIM_STATUS_INVALIDPARAMETER;

unsigned
char* Blue = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));
unsigned
char* Green = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));
unsigned
char* Red = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));if ((Blue == NULL) || (Green == NULL) || (Red ==NULL))
{
Status
=IM_STATUS_OUTOFMEMORY;gotoFreeMemory;
}
//先直接复制数据,也无需把通道的值单独提取出来填充,因为后续会把无效的点填充的
memcpy(Blue, Src, Height * Width * sizeof(unsigned char));
memcpy(Green, Src, Height
* Width * sizeof(unsigned char));
memcpy(Red, Src, Height
* Width * sizeof(unsigned char));//因为Green分量占了1/2像素,先填充Green像素//因为后续的Green分量涉及到了3*3的领域,对于边缘的部分,直接使用四个点的平均,单独提出来,这部分计算量很小,无需加速
for (int X = 0; X < Width; X++)
{
IM_CalcBorderGreen_CDCT(Green, Width, Height, X,
0);
IM_CalcBorderGreen_CDCT(Green, Width, Height, X, Height
- 1);
}
for (int Y = 1; Y < Height - 1; Y++)
{
IM_CalcBorderGreen_CDCT(Green, Width, Height,
0, Y);
IM_CalcBorderGreen_CDCT(Green, Width, Height, Width
- 1, Y);
}
//填充Green通道的无效位置的数据
for (int Y = 1; Y < Height - 1; Y++)
{
int Index = Y *Width;for (int X = 1; X < Width - 1; X++)
{
//偶数行和偶数列 或者奇数行和奇数列,绿色分量都是无效的
if (((X + Y) & 1) == 0)
{
Green[Index
+ X] = (Green[Index + X - Width] + Green[Index + X + Width] + Green[Index + X - 1] + Green[Index + X + 1] + 2) / 4;
}
}
}
IM_RGGB_CalcRed_CDCT_PureC(Red, Green, Width, Height);
IM_RGGB_CalcBlue_CDCT_PureC(Blue, Green, Width, Height);
Status
=IM_CombineRGB_PureC(Blue, Green, Red, Dest.Scan0, Dest.Width, Dest.Height, Dest.Stride, Width);if (Status != IM_STATUS_OK) gotoFreeMemory;
FreeMemory:
if (Blue !=NULL) free(Blue);if (Green !=NULL) free(Green);if (Red !=NULL) free(Red);returnStatus;
}

其中 IM_RGGB_CalcRed_CDCT_PureC代码如下所示:

void IM_RGGB_CalcRed_CDCT_PureC(unsigned char* Red, unsigned char* Green, int Width, intHeight)
{
//R G R G R G//G B G B G B//R G R G R G//G B G B G B//R G R G R G//G B G B G B//色差恒定原理:色差是色度信号(R和B分量)与亮度信号(G分量)的差,在图像很小的范围内,当前像素的色差与其周围点的色差是差不多的,也就是类似下面的说法//R(I,J)-G(I,J) = R(I,J + 1)-G(I,J + 1),或者写成R(I,J)-R(I,J+1) = G(I,J)-G(I,J + 1)。这样利用已经前面已经完成重构的G分量,可以重构出其他未知的R和B分量。

for (int X = 0; X < Width; X++)
{
IM_RGGB_CalcBorderRed_CDCT(Red, Green, Width, Height, X,
0);
IM_RGGB_CalcBorderRed_CDCT(Red, Green, Width, Height, X, Height
- 1);
}
for (int Y = 1; Y < Height - 1; Y++)
{
IM_RGGB_CalcBorderRed_CDCT(Red, Green, Width, Height,
0, Y);
IM_RGGB_CalcBorderRed_CDCT(Red, Green, Width, Height, Width
- 1, Y);
}
//填充Red通道的无效位置的数据
for (int Y = 1; Y < Height - 1; Y++)
{
int Index = Y *Width;for (int X = 1; X < Width - 1; X++)
{
//偶数行奇数列, 水平方向填充红色分量
if ((Y & 1) == 0 && (X & 1) == 1)
{
Red[Index
+ X] = IM_ClampToByte((Red[Index + X - 1] + Red[Index + X + 1] - Green[Index + X - 1] - Green[Index + X + 1] + 1) / 2 + Green[Index +X]);
}
//奇数行偶数列, 垂直方向填充红色分量
else if ((Y & 1) == 1 && (X & 1) == 0)
{
Red[Index
+ X] = IM_ClampToByte((Red[Index + X - Width] + Red[Index + X + Width] - Green[Index + X - Width] - Green[Index + X + Width] + 1) / 2 + Green[Index +X]);
}
//奇数行奇数列, 水平垂直方向填充红色分量
else if ((Y & 1) == 1 && (X & 1) == 1)
{
Red[Index
+ X] = IM_ClampToByte((Red[Index + X - Width - 1] + Red[Index + X - Width + 1] + Red[Index + X + Width - 1] + Red[Index + X + Width + 1] - Green[Index + X - Width - 1] - Green[Index + X - Width + 1] - Green[Index + X + Width - 1] - Green[Index + X + Width + 1] + 2) / 4 + Green[Index +X]);
}
}
}
}

IM_RGGB_CalcBlue_CDCT_PureC是类似的道理。

经过测试,这样做的结果和直接双线性相比,基本没有什么差异的,所以直接这样还是不行的,

Hamilton-Adams等人在结合绿色通道的一阶导数和周边红色和蓝色的通道的二阶倒数的基础上,对绿色分量的插值提出了如下算法,这个过程考虑到了像素之间的边缘信息:


当水平方向的梯度大于垂直方向的梯度时,使用垂直方向的有关像素计算结果,否则使用水平方形的有关值,如果两者相等,则使用平均值。

实际操作时,都会定义一个阈值,如果水平和垂直的梯度之差的绝对值小于阈值,则使用平均值,如果在阈值之外,再考虑水平和垂直之间的关系。这样能获得更为合理的结果。

实际上,仔细看看,上面每个方向的G5的计算也是利用到了颜色恒差理论的,只是只是单独利用了水平或者垂直方向的像素而已。

我们分享一下更具上述思路编写的C++代码结果:

int IM_DecodeBayerRG8ToBGR_HamiltonAdams_PureC(unsigned char* Src, BitmapData Dest,    intThreshold)
{
int Status =IM_STATUS_OK;int Width = Dest.Width, Height = Dest.Height, Stride =Dest.Stride;//宽度和高度都必须是偶数
if (((Width & 1) == 1) || ((Height & 1) == 1)) returnIM_STATUS_INVALIDPARAMETER;if ((Width < 8) || (Height < 8)) returnIM_STATUS_INVALIDPARAMETER;

unsigned
char* Blue = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));
unsigned
char* Green = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));
unsigned
char* Red = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));if ((Blue == NULL) || (Green == NULL) || (Red ==NULL))
{
Status
=IM_STATUS_OUTOFMEMORY;gotoFreeMemory;
}
//先直接复制数据,也无需把通道的值单独提取出来填充,因为后续会把无效的点填充的
memcpy(Blue, Src, Height * Width * sizeof(unsigned char));
memcpy(Green, Src, Height
* Width * sizeof(unsigned char));
memcpy(Red, Src, Height
* Width * sizeof(unsigned char));//因为Green分量占了1/2像素,先填充Green像素//因为后续的Green分量涉及到了5*5的领域,对于边缘的部分,直接使用四个点的平均,单独提出来,这部分计算量很小,无需加速
for (int X = 0; X < Width; X++)
{
IM_CalcBorderGreen_CDCT(Green, Width, Height, X,
0);
IM_CalcBorderGreen_CDCT(Green, Width, Height, X,
1);
IM_CalcBorderGreen_CDCT(Green, Width, Height, X, Height
- 2);
IM_CalcBorderGreen_CDCT(Green, Width, Height, X, Height
- 1);
}
for (int Y = 2; Y < Height - 2; Y++)
{
IM_CalcBorderGreen_CDCT(Green, Width, Height,
0, Y);
IM_CalcBorderGreen_CDCT(Green, Width, Height,
1, Y);
IM_CalcBorderGreen_CDCT(Green, Width, Height, Width
- 2, Y);
IM_CalcBorderGreen_CDCT(Green, Width, Height, Width
- 1, Y);
}
//处理剩下的能否安全访问领域的算法
for (int Y = 2; Y < Height - 2; Y++)
{
int IndexC = Y *Width;int IndexN1 = (Y + 1) * Width, IndexN2 = (Y + 2) *Width;int IndexP1 = (Y - 1) * Width, IndexP2 = (Y - 2) *Width;
unsigned
char* Sample = (Y & 1) == 0 ?Red : Blue;for (int X = 2; X < Width - 2; X++)
{
//只有当X和Y都是偶数或者都是奇数时才需要处理
if (((X + Y) & 1) == 0)
{
//周边蓝色或者红色分量的二阶导数
int SecDH = 2 * Sample[IndexC + X] - Sample[IndexC + X + 2] - Sample[IndexC + X - 2];int SecDV = 2 * Sample[IndexC + X] - Sample[IndexN2 + X] - Sample[IndexP2 +X];//加上绿色分量的一阶导数得到梯度
int GradH = IM_Abs(Green[IndexC + X - 1] - Green[IndexC + X + 1]) +IM_Abs(SecDH);int GradV = IM_Abs(Green[IndexP1 + X] - Green[IndexN1 + X]) +IM_Abs(SecDV);//如果垂直或者水平的梯度差不多,则计算周边的平均值
if (IM_Abs(GradV - GradH) <Threshold)
Green[IndexC
+ X] = IM_ClampToByte((Green[IndexP1 + X] + Green[IndexN1 + X] + Green[IndexC + X - 1] + Green[IndexC + X + 1] + 2) / 4 + (SecDH + SecDV + 4) / 8);//如果水平差异小一些,则利用水平方向的平均值
else if (GradH <GradV)
Green[IndexC
+ X] = IM_ClampToByte((Green[IndexC + X - 1] + Green[IndexC + X + 1] + 1) / 2 + (SecDH + 2) / 4);//否则利用垂直方向的平均值
elseGreen[IndexC+ X] = IM_ClampToByte((Green[IndexP1 + X] + Green[IndexN1 + X] + 1) / 2 + (SecDV + 2) / 4);
}
}
}
IM_RGGB_CalcRed_CDCT_SSE(Red, Green, Width, Height);
IM_RGGB_CalcBlue_CDCT_SSE(Blue, Green, Width, Height);
Status
=IM_CombineRGB_PureC(Blue, Green, Red, Dest.Scan0, Dest.Width, Dest.Height, Dest.Stride, Width);if (Status != IM_STATUS_OK) gotoFreeMemory;
FreeMemory:
if (Blue !=NULL) free(Blue);if (Green !=NULL) free(Green);if (Red !=NULL) free(Red);returnStatus;
}

我们分享下前面说的CDCT算法以及HamiltonAdams算法的结果:


简单的CDCT结果                        HamiltonAdams结果

从上面右图的结果可以看到,相比于双线性,水平栅栏处似乎已经看不到色彩异常了,垂直栅栏处的异常点也大为减少,但是还是存在些许瑕疵。

在速度方面,由于直接使用双线性时,可以直接把数据数据有规律的填充到目的图中,而且计算量很简单,而使用色差恒定理论后,由于顺序的要求以及一些编码方面的原因,需要使用一些中间内存,而且计算量相对来说大了很多,因此,速度也慢了不少,上述C++的代码处理1080P的图像,需要大概7ms,经过SSE优化后的代码可以达到4ms左右的速度,这个速度在某些实时性要求比较高的场景下还是具有实际价值的。

为了进一步提高效果,在HA算法的基础上,后续还是有不少人提出了更多的改进算法,在 IPOL上,可以找到一篇
A Mathematical Analysis and Implementation of Residual Interpolation Demosaicking Algorithms
的综述性文章,文章里列举了数种类型的处理方式,那个网站是个也提供了在线的DEMO和代码,可以看到各自的处理后的结果,如下图所示:

不过这些都是比较传统的算法方面的东西了,而且我看了代码和有关效果,感觉这些算法更偏重于理论,实际操作起来,可能效率完全不够,同时IPOL上还提供了一篇基于CNN深度学习方面的算法,效果我看了下,确实还是很不错的,不过就是感觉速度比较堪忧,有兴趣可以在
A Study of Two CNN Demosaicking Algorithms
这里浏览。

那么我后面关注到的是IPOL的另外一篇文章:
Zhang-Wu Directional LMMSE Image Demosaicking
,通过测试发现这个文章的效果还是非常不错和稳定的,而且分析其代码,感觉可以做很大的优化工作。

这个文章对R和B通道的处理方式是和HA算法一样的,都是先要获取G通道的全部数据,然后采用色差恒定原理计算R和B,在计算G通道时,从水平和垂直方向计算 (G-R) 和 (G-B) 的颜色差异开始。然后,这些计算被视为对实际色差的噪声估计,并使用线性最小均方误差框架将它们优化组合。

所谓的优化组合可以这样理解,传统的HA算法,在计算G通道时,就是根据水平和垂直方向的梯度,决定最后是使用水平还是垂直方向的计算值,而ZhangWu算法则不完全使用水平或垂直的信息,而且根据某些过程计算出水平和垂直方向各自的权重然后融合。

这里简单的描述了一下色差噪声估计的公式:


这几个公式可以看成是一个简单的去燥过程,其中f表示原始的噪音信息,s呢就是一个对f的简单的低通滤波,比如高斯滤波,然后呢公式9计算s的一定领域大小的平均值, 公式10计算s在这个领域内的方差, 公式11则计算f和s差异平方的均值。

利用以上信息则可以估算出去除噪音后的估算值 u(公式12)以及对一个的估算误差(公式13)。

分别对水平和垂直反向上的颜色差异进行这样的滤波,然后利用上面的结果按照下述公式进行融合:

上面的公式是一个一维的去燥过程,我有空将其改为2维的图形去燥看看效果如何。

这些公式在论文的配套代码里都有有关的实现,有兴趣的朋友可以有去看看代码,我将其过程进行过程化,这个函数大概有如下代码构成:

//Zhang–Wu Directional LMMSE Image Demosaicking
int IM_DecodeBayerRG8ToBGR_ZhangWu_SSE(unsigned char*Src, BitmapData Dest)
{
int Status =IM_STATUS_OK;int Width = Dest.Width, Height = Dest.Height, Stride =Dest.Stride;//宽度和高度都必须是偶数 if (((Width & 1) == 1) || ((Height & 1) == 1)) returnIM_STATUS_INVALIDPARAMETER;//宽度或者高度小于8有些领域会溢出 if ((Width < 8) || (Height < 8)) returnIM_STATUS_INVALIDPARAMETER;
unsigned
char* Blue = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));
unsigned
char* Green = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));
unsigned
char* Red = (unsigned char*)malloc(Width * Height * sizeof(unsigned char));short* DiffH = (short *)malloc(Width * Height * sizeof(short));short* DiffV = (short *)malloc(Width * Height * sizeof(short));short* LowPassH = (short*)malloc(Width * Height * sizeof(short));short* LowPassV = (short *)malloc(Width * Height * sizeof(short));if ((Blue == NULL) || (Green == NULL) || (Red == NULL) || (DiffH == NULL) || (DiffV == NULL) || (LowPassH == NULL) || (LowPassV ==NULL))
{
Status
=IM_STATUS_OUTOFMEMORY;gotoFreeMemory;
}
//先直接复制数据,也无需把通道的值单独提取出来填充,因为后续会把无效的点填充的 memcpy(Blue, Src, Height * Width * sizeof(unsigned char));//memcpy(Green, Src, Height * Width * sizeof(unsigned char));//绿色通道因为其特殊性,后续会在代码里进行填充,不需要单独复制数据的memcpy(Red, Src, Height* Width * sizeof(unsigned char));//获取水平方向上准确信号和插值信号的差异 IM_GetHoriDiffSignal_SSE(Src, DiffH, Width, Height);//获取垂直方向上准确信号和插值信号的差异 IM_GetVertDiffSignal_SSE(Src, DiffV, Width, Height);//对水平差异进行1*9的高斯滤波 IM_HoriLowPass1X9_SSE(DiffH, LowPassH, Width, Height);//对垂直差异进行9*1的高斯滤波 IM_VertLowPass9X1_SSE(DiffV, LowPassV, Width, Height);//通过LMMSE算法计算完整的绿色通道 Status = IM_RGGB_CalcGreen_ZW_SSE(Src, Green, DiffH, DiffV, LowPassH, LowPassV, Width, Height, 4);if (Status != IM_STATUS_OK) gotoFreeMemory;//使用色差恒定原理计算出Red通道的数据 IM_RGGB_CalcRed_CDCT_SSE(Red, Green, Width, Height);//使用色差恒定原理计算出Blue通道的数据 IM_RGGB_CalcBlue_CDCT_SSE(Blue, Green, Width, Height);//把RGB单通道数据组合成RGB彩色图像 Status =IM_CombineRGB_SSE(Blue, Green, Red, Dest.Scan0, Dest.Width, Dest.Height, Dest.Stride, Width);if (Status != IM_STATUS_OK) gotoFreeMemory;
FreeMemory:
if (Blue !=NULL) free(Blue);if (Green !=NULL) free(Green);if (Red !=NULL) free(Red);if (DiffH !=NULL) free(DiffH);if (DiffV !=NULL) free(DiffV);if (LowPassH !=NULL) free(LowPassH);if (LowPassV !=NULL) free(LowPassV);returnStatus;
}

具体的实现就不做过多的探讨,原始作者的C代码效率非常的低下,简单的测试了下1080P的图大概需要1分钟左右,这完全没有实际的意义的,所以需要进行深度的优化,比如水平方向的滤波,原作者采用1*9的滤波器,我稍作改进并用SSE指令优化如下:

//不支持In-Place操作
void IM_HoriLowPass1X9_SSE(short* Src, short* Dest, int Width, intHeight)
{
for (int Y = 0; Y < Height; Y++)
{
int Index = Y *Width;//边缘采用镜像的关系 4 3 2 1 0 1 2 3 4 5 6 7 Dest[Index + 0] = ((Src[Index + 4] + Src[Index + 4]) * 4 + (Src[Index + 3] + Src[Index + 3]) * 8 + (Src[Index + 2] + Src[Index + 2]) * 16 + (Src[Index + 1] + Src[Index + 1]) * 23 + Src[Index + 0] * 26 + 64) / 128;
Dest[Index
+ 1] = ((Src[Index + 3] + Src[Index + 5]) * 4 + (Src[Index + 2] + Src[Index + 4]) * 8 + (Src[Index + 1] + Src[Index + 3]) * 16 + (Src[Index + 0] + Src[Index + 2]) * 23 + Src[Index + 1] * 26 + 64) / 128;
Dest[Index
+ 2] = ((Src[Index + 2] + Src[Index + 6]) * 4 + (Src[Index + 1] + Src[Index + 5]) * 8 + (Src[Index + 0] + Src[Index + 4]) * 16 + (Src[Index + 1] + Src[Index + 3]) * 23 + Src[Index + 2] * 26 + 64) / 128;
Dest[Index
+ 3] = ((Src[Index + 1] + Src[Index + 7]) * 4 + (Src[Index + 0] + Src[Index + 6]) * 8 + (Src[Index + 1] + Src[Index + 5]) * 16 + (Src[Index + 2] + Src[Index + 4]) * 23 + Src[Index + 3] * 26 + 64) / 128;//W-8 W-7 W-6 W-5 W-4 W-3 W-2 W-1 W-2 W-3 W-4 W-5 Dest[Index + Width - 4] = ((Src[Index + Width - 8] + Src[Index + Width - 2]) * 4 + (Src[Index + Width - 7] + Src[Index + Width - 1]) * 8 + (Src[Index + Width - 6] + Src[Index + Width - 2]) * 16 + (Src[Index + Width - 5] + Src[Index + Width - 3]) * 23 + Src[Index + Width - 4] * 26 + 64) / 128;
Dest[Index
+ Width - 3] = ((Src[Index + Width - 7] + Src[Index + Width - 3]) * 4 + (Src[Index + Width - 6] + Src[Index + Width - 2]) * 8 + (Src[Index + Width - 5] + Src[Index + Width - 2]) * 16 + (Src[Index + Width - 4] + Src[Index + Width - 2]) * 23 + Src[Index + Width - 3] * 26 + 64) / 128;
Dest[Index
+ Width - 2] = ((Src[Index + Width - 6] + Src[Index + Width - 4]) * 4 + (Src[Index + Width - 5] + Src[Index + Width - 3]) * 8 + (Src[Index + Width - 4] + Src[Index + Width - 3]) * 16 + (Src[Index + Width - 3] + Src[Index + Width - 1]) * 23 + Src[Index + Width - 2] * 26 + 64) / 128;
Dest[Index
+ Width - 1] = ((Src[Index + Width - 5] + Src[Index + Width - 5]) * 4 + (Src[Index + Width - 4] + Src[Index + Width - 4]) * 8 + (Src[Index + Width - 3] + Src[Index + Width - 4]) * 16 + (Src[Index + Width - 2] + Src[Index + Width - 2]) * 23 + Src[Index + Width - 1] * 26 + 64) / 128;int BlockSize = 8, Block = (Width - 8) /BlockSize;for (int X = 4; X < 4 + Block * BlockSize; X +=BlockSize)
{
//(V0 * 4 + V1 * 8 + V2 * 16 + V3 * 23 + V4 * 26 + V5 * 23 + V6 * 16 + V7 * 8 + V8 * 4 + 64) / 128 __m128i V0 = _mm_loadu_si128((__m128i*)(Src + Index + X - 4));
__m128i V1
= _mm_loadu_si128((__m128i*)(Src + Index + X - 3));
__m128i V2
= _mm_loadu_si128((__m128i*)(Src + Index + X - 2));
__m128i V3
= _mm_loadu_si128((__m128i*)(Src + Index + X - 1));
__m128i V4
= _mm_loadu_si128((__m128i*)(Src + Index + X + 0));
__m128i V5
= _mm_loadu_si128((__m128i*)(Src + Index + X + 1));
__m128i V6
= _mm_loadu_si128((__m128i*)(Src + Index + X + 2));
__m128i V7
= _mm_loadu_si128((__m128i*)(Src + Index + X + 3));
__m128i V8
= _mm_loadu_si128((__m128i*)(Src + Index + X + 4));

__m128i V08
= _mm_slli_epi16(_mm_add_epi16(V0, V8), 2); //(V0 + V8) * 4 __m128i V17 = _mm_slli_epi16(_mm_add_epi16(V1, V7), 3); //(V1 + V7) * 8 __m128i V26 = _mm_slli_epi16(_mm_add_epi16(V2, V6), 4); //(V2 + V6) * 16 __m128i V35 = _mm_mullo_epi16(_mm_add_epi16(V3, V5), _mm_set1_epi16(23)); //(V3 + V5) * 23 __m128i V44 = _mm_mullo_epi16(V4, _mm_set1_epi16(26)); //V4 * 26 __m128i Sum=_mm_add_epi16(_mm_add_epi16(_mm_add_epi16(V08, V17), _mm_add_epi16(V26, V35)), V44);
__m128i Avg
= _mm_srai_epi16(_mm_add_epi16(Sum, _mm_set1_epi16(64)), 7);
_mm_storeu_si128((__m128i
*)(Dest + Index +X), Avg);
}
for (int X = 4 + Block * BlockSize; X < Width - 4; X++)
{
Dest[Index
+ X] = ((Src[Index + X - 4] + Src[Index + X + 4]) * 4 + (Src[Index + X - 3] + Src[Index + X + 3]) * 8 + (Src[Index + X - 2] + Src[Index + X + 2]) * 16 + (Src[Index + X - 1] + Src[Index + X + 1]) * 23 + Src[Index + X] * 26 + 64) / 128;
}
}
}

这极大的提高了运行速度。

在LMMSE的计算过程中,作者的M大小取的是4,即涉及到的领域大小也是9个像素,作者在代码里使用硬循环实现,实际上这个也就是普通一进一出的统计,完全可以做点优化的,特别是垂直方向的循环,每次都要跳一行像素,cachemiss很大,所以通过适当的改动结构,能极大的提高速度。经过优化,我们测试这个算法处理处理一副1080P的图像,SSE版本大概耗时12ms,我自己优化后的C++代码耗时约27ms(使用了编译器自己的向量化方式编译,而且非原始作者的C++代码),这个速度应该说在实时系统中还是可以接受的,而且这个过程还是可以进行多线程并行化的,如果开两个线程,SSE版本有望做到8ms一帧。

同HA算法相比,这个算法得到的结果更加完美,而且有瑕疵的地方更少,如下所示:


Zhang Wu算法结果                              原始图像

我们再分享一组测试及图像:


Mosaic                                          Bilinear


HA                                            Zhang Wu

整体来看,效果最好是的Zhang Wu算法, 其次是HA, 最一般的就是Bilinear了,但是,也不要一板子拍死Bilinear, 在很多不是很复杂的场景的情况下,使用他得到的效果依旧是可以满足需求的,关键是他真的快啊。

当然,如果对算法执行效率和效果都要做一个均衡的话,应该说HA算法就比较适中了。

为了比较方便,我编写了一个简易的DEMO,供大家做算法验证。

https://files.cnblogs.com/files/Imageshop/DeMosaic.rar?t=1725238639&download=true

如果想时刻关注本人的最新文章,也可关注公众号:

参考资料:

// https://blog.csdn.net/OdellSwan/article/details/136887148 ISP算法 | Demosaic(一)
// https://blog.csdn.net/OdellSwan/article/details/137246117 ISP算法 | Demosaic(二)
// https://cloud.tencent.com/developer/article/1934157 ISP图像处理之Demosaic算法及相关
// https://zhuanlan.zhihu.com/p/594341024 Demosaic(二)Hamilton & Adams插值算法
// https://zhuanlan.zhihu.com/p/144651850 Understanding ISP Pipeline - Demosaicking
//
https://blog.csdn.net/feiyanjia/article/details/124366793