2024年10月

作者:来自 vivo 互联网存储团队- Yuan Jianwei

本文介绍了 TiKV 磁盘空间问题的排查思路与解决方案。

一、背景介绍

在业务快速扩张的年代,vivo 内部的很多业务为了可以快速上线,给现网功能提供支撑,在 KV 类型的选型下许多场景都选用了轻量快速的 Redis 集群。但是随着业务的不断发展与稳定,当数据量级达到一定程度的时候,数据性质开始发生变化:有的历史 Redis 集群热度下沉,但是依然基于 Redis 集群作为载体进行 KV 存储。这种类型的数据不仅量大,而且访问频次不高。

业务的发展阶段变化也会对数据载体的诉求也会相应发生变化。对于大规模而热度不算高的 KV 存储场景而言,业务对降低成本的诉求日益增多。为了满足这种类型的业务诉求,vivo 基于 TiKV 自研一套 KV 系统供业务使用。为了让业务可以更加便捷的接入,我们基于计算存储分离架构进行设计,把 TiKV 作为存储层开发 Redis 协议兼容的 KV 存储组件。架构如下:

图片

为了让机器资源可以得到更加高效的利用,我们使用不同业务混合部署的模式在相同的服务器上部署 TiKV 实例。随着集群使用的场景日益增多,使用过程中面临的问题也会各种各样,TiKV 集群会出现诸如磁盘空间占用与预期不符,读写时延抖动,负载(存储占用,region 个数,或者热读热写)不均衡等等的问题。

这次先从磁盘空间占用问题着手,从几个比较常见的排查点进行着手和排查。其他的问题排查和修复会通过其他的文章后续进行分享。

二、问题与排查

磁盘空间的占用空间如果较高,例如磁盘剩余空间占比低于 pd 的 low-space-ratio,会导致 pd 的 scheduler 判断节点所在机器磁盘资源不足,无法执行调度作业,导致集群的各种负载会越发不均衡;更加严重的情况可能会直接影响当前物理机器上的所有 TiKV 实例的可用性。

当前我们使用 TiKV 的 4.X,5.X 以及 6.X 版本,其中有的特性在更新的版本中会得到修复。

从我们运维的经验来看,磁盘空间占用构成主要分以下几块:

  • 日志

  • buffer (占位文件)

  • 数据

后续将根据这几种维度在 TiKV 的日常运维工作中进行排查和分析。总的来说,组成如下:

(1)日志

① Tikv 日志

② RocksDB 日志

(2)buffer

① 占位符文件

(3)数据

① RocksDB 数据

  • raft 数据(老版本)

  • KV 数据

② titan 数据

  • raft 数据(老版本)

  • KV 数据

下面基于我们在日常运维事项中出现频次较多的一些维度进行排查和分析,供大家线上运维工作一些参考。

2.1 日志文件占用较多

磁盘如果空间占用较大,可以优先对日志文件进行删除减缓磁盘空间资源使用。

在一些老版本 TiKV 配置是没有 RocksDB 和 raftdb 日志的滚动保留配置,这种情况下会导致一些较老的日志长期保存,长期运行会导致日志文件占据磁盘空间较大。

通常这种日志文件会在 Data 路径下:

图片

2.1.1 排查

可以使用以下指令快速统计当前日志的占用总量:

#排查raftdb.info日志
du -sh -c raftdb.info.*
 
#排查rocksdb.info日志
du -sh -c rocksdb.info.*
 
#其他按照匹配规则进行匹配

输出如下:

...
37M     raftdb.info.2022-08-12-18:17:22.338660642
36M     raftdb.info.2022-08-13-18:17:35.358329064
36M     raftdb.info.2022-08-14-18:17:47.024173873
36M     raftdb.info.2022-08-15-18:17:56.130172134
6.5G    total

如果在上述排查过程中,发现日志占用的比较多(例如 100G 粒度的日志),可以认为对磁盘空间资源的使用是比较影响的。应该对该集群进行操作。

2.1.2. 修复

原则上,日志文件其实是需要长期保存,因为后续进行问题定位等场景是需要的。这里的修复方式我们提供几种思路:

  1. 如果大家有一些类似 graylog 等日志搜集的组件,可以在确保日志上传完成之后,及时进行日志文件释放;

  2. 允许情况下,日志文件与数据日志文件存放在不同的磁盘或者载体中,区分日志文件对数据文件在容量上以及磁盘 io 的影响;

  3. 对日志文件进行转移(例如传到其他更低成本的服务器之后进行压缩等),再进行删除;

  4. 对较新的版本开启日志 rotation,根据需求自动删除。

情况允许的话
尽量保留较多的日志
,对日志进行转移,方便后续问题排查。

2.1.3. 成因

日志文件较多基本是以下几个方面造成:

  1. 原生的日志文件没有配置 rotation;

  2. 在目前的运维过程中我们发现,如果在 TiKV 的 API 层面有存在调用 unsafeDestroyRange 接口会导致 info 级别日志在 rocksdb.info.xx 日志产生(可以针对性排查)。

2.2. 占位文件占用空间较大

通常占位文件名字是 space_placeholder_file ,可以基本在部署路径 bin 或者 Data 下。通常用于 TiKV 节点重启的时候可以有更多的预留空间提供回放使用。

但是如果基于混部的方式进行部署,各个节点会重复使用这些占位文件,这种场景下随着节点数的增多,占位文件占用的总空间是比较可观的。

可以先使用 ll 指令查看占位文件大小,看是否对磁盘空间资源占用较大。

2.2.1. 排查

可以在 Tikv 的部署路径下通过以下指令进行排查。

ll|grep space_place

输出如下:

-rw-r--r-- 1 root root 199941579980 Aug  5  2021 space_placeholder_file

如果发现 space_placeholder_file 的占用空间比较大(对比磁盘 KV 原有的空间资源),可以认为占用的磁盘空间资源对当前 TiKV 节点的影响较大,需要处理。

2.2.2. 修复

如果发现当前磁盘空间确实比较低(尤其在一些可能影响到 pd 调度的场景下),可以进行临时删除。

注意:space_palceholder_file 会在 TiKV 启动的时候建立,在重启的时候会作为一个缓冲区使用。如果对磁盘空间利用率要求不至于极致,或者作业现场并没有对磁盘的空间资源要求很高,可以优先考虑其他维度的排查和操作。

另外,如果希望 TiKV 启动的时候不另外占用 space_palceholder_file,需要更改 TiKV 配置:

[storage]
reserve-space= "0MB"

否则后续启动持续生效。

建议在磁盘空间中,尤其混部场景下,可以使用统一的占位文件,在需要重启的场景下进行释放,节点恢复之后再把占位文件进行占据,保留缓冲区域。

2.2.3. 成因

在前期的一些部署中,默认保留了占位文件,防止磁盘 KV 在没有充足的空间,可以用于防御性的磁盘释放。

但是对于一些
同磁盘混布
的集群,这种过多的资源占用可能对机器磁盘资源带来额外负担。这种情况下可以通过删除占位文件进行资源释放。

2.3. TiKV 的 GC 过慢导致失效版本数据堆积较多

存在一种情况是因为 TiKV 的 compaction filter 开启的话, GC 速度太慢,导致大部分数据都还没 GC。

2.3.1. 排查

排查项有两个:

1. 排查监控:Tikv details.GC

如果发现 GC speed 监控中基本没有多少速度,但是业务侧是存在频繁的对同一个 key刷数据,或者删除数据,并没有对应的速冻的话,TiKV 层面的 GC 速度不够快。

图片

同时,如果因为 compaction filter 开启的话,TiKV AutoGC Working 曲线为0。

图片

2. 排查配置: tikv.toml

可以查看 TiKV 的配置,查看 TiKV 部署路径下的 tikv.toml,查找项目 gc.enable-compaction-filter。

如果上述两项都得以验证,可以认为 compaction-filter 选项影响到了当前的 GC 进度。

2.3.2. 修复

操作步骤如下:

1. 对 TiKV 的配置 tikv.toml 进行更改:gc.enable-compaction-filter 修改成 true 。

[gc]
# Enable GC by compaction filter or not.
enable-compaction-filter = true

2. 对 TiKV 的配置 tikv.toml 进行更改:rocksdb.rate-bytes-per-sec 修改成建议配置。

[rocksdb]
rate-bytes-per-sec = "500MB"

注意配置的大小:

  • 这里的“500MB”根据实际的磁盘测试写入上限来配置;

  • 因为开启了 gc.enable-compaction-filter 之后,TiKV 的 GC 必然会增加,最终会带来磁盘 IO 资源消耗,为了可以减少 GC 对磁盘的 IO 资源,使用 rocksdb.rate-bytes-per-sec 对节点的磁盘 IO 资源进行控制;

  • 对于磁盘独占的 TiKV 节点,可以配置成“500MB”(具体参考磁盘写入上限),如果后续发现节点性能下降,需要适当调低;

  • 对于磁盘混布的 TiKV 节点,可以配置成“500MB”(具体参考磁盘写入上限)/<节点数>,如果后续发现节点性能下降,需要适当调低。

3. 重启&观察

操作成功后,应该会看到 Tikv details.GC.GC speed 监控项目以及 Tikv details.GC.TiKV AutoGC Working 监控项目发生变化。

图片

图片

之后,可以观察磁盘的使用情况,应该会存在长期的缓慢下降。

TiKV 开启 gc.enable-compaction-filter 使用注意:

  • 因为开启了 gc.enable-compaction-filter 之后, TiKV 的 GC 必然会增加,最终会带来磁盘的负载,此处需要考虑好扫描的时候可能对 TiKV 带来的负载;

  • 由于对集群存在长期影响,建议
    提前与业务进行沟通
    ,并且关闭阶段前期
    观察对业务的影响

  • 如果发现开启效果对现网业务存在一定影响,可以
    先尝试调低
    rocksdb.rate-bytes-per-sec ,先进行观察。

2.3.3. 成因

最初的部署为了保障性能优先,部署都默认关闭了 gc.enable-compaction-filter。但是具体的业务场景和资源消耗需要根据业务运行的状态而定。

此处需要在比较充分了解好业务的使用场景后最好在部署最初就决定好是否机器 gc.enable-compaction-filter。

注意

和这里情况比较相近的是,如果一些使用情况是
直接调用 TiKV 的事务接口
,需要额外配置线程按照数据的安全范围推进 gc-safepoint,否则这里的 gc 配置就算开启了,也会因为不在 gc-safepoint 的安全范围内,不进行数据删除。

2.4. TiKV 的 titan 数据占用磁盘空间大

后续的 TiKV 版本为了减缓写放大的问题,开发了 Titan 组件,可以供一些大于设定值的数据写入到 blob 文件中, sst 文件通过索引的方式指向 blob 文件,通过减少 sst 中的体积来降低写放大的问题。

但是我们在使用 Titan 的场景中也会遇到一些场景,如果业务对一些 key 删除较多,或者多版本覆盖写的场景下, Titan 的数据会明显较多。这种场景下需要对具体问题进行排查。

2.4.1 排查

主要从下面三个角度去排查和验证验证:

1. 查看监控面板

排查 Tikv detail 这个 dashboard,如果发现面板 Server.CF size 中的 default 较高,可以开始怀疑 titan 占用的空间比较高。

图片

如果发现上述面板中的情况,可以进一步通过在面板 TitanDB-kv.Live blob size,以及 TitanDB-raft.Live blob size 中查看。

图片

2. 是否大量数据写入了 titan 中。

如果上述监控面板发现异常,可以基于排查部署路径中的具体路径,确认是否 titan 占据太多资源。

进入 Data 路径后,存放数据的路径主要是 DB 和 raft。可以使用 du 工具进行查看。

以下是一个示例:

[xxx@xxx:/deploy_pathv/data]
$ du -h -d 1 db/titandb
937G    db/titandb
 
[xxx@xxx:/deploy_path/data]
$ du -h -d 1 raft/titandb/
1.1T    raft/titandb/

如果使用 du 工具发现占用的空间明显比较大,则可以认为这个 TiKV 节点的 titan 存放较多的数据。

3. 是否 discardable-ratio 的占比有问题。

在 Titan-kv 中的 Blob file discardable ratio distribution 中,可以观察 blob 文件的 discardable ratio 分布。

图片

如果发现 le50 的比较高,并且查看 tikv.toml 发现没有配置 rocksdb.defaultcf.titan.discardable-ratio,大概可以推测:titan 中有大约一半的数据是可以进行清空。

2.4.2. 修复

操作过程主要是配置 rocksdb.defaultcf.titan.discardable-ratio,步骤如下:

  1. 添加rocksdb.defaultcf.titan.discardable-ratio,实例如下

[rocksdb.defaultcf.titan]
discardable-ratio = 0.2

rocksdb.defaultcf.titan.discardable-ratio 调整注意:

  • 默认参数项目是0.5,表示 titan 中需要超过一半的失效数据才可以触发 titan 数据的 gc 机制。这样的设定是牺牲空间换取性能。但是这样的设定对一些预算敏感的业务可能不适用。但是调低参数之后必然导致 titan 数据的读写会存在一定的性能下降。

  • 参数调整的依据可以参考以下:

    ① 如果业务对性能不太敏感,可以调整到0.1;

    ② 如果业务对性能比较敏感但是有磁盘空间资源释放需求,可以调整到0.2。

2. 滚动重启调整后的节点,需要把 leader region 先迁移后再进行重启,恢复。

rocksdb.defaultcf.titan.discardable-ratio 调整以及重启注意:

  • 因为调整之后必然对 TiKV 的读写性能存在一定影响。建议重启之前提前与业务沟通,并且调整之后可以对单个 TiKV 节点进行观察,观察 Tikv detail 这个 dashboard 的以下 pannel:

    ① gRPC.gRPC QPS

    ② gRPC.99% gRPC message duration

    ③ gRPC.Average gRPC message duration

  • 如果发现调整后影响业务严重,建议回退操作,并且重新评估以及调整参数rocksdb.defaultcf.titan.discardable-ratio,尝试找到适合业务的比率。

2.4.3. 成因

为了降低写放大 Titan 默认开启,如果业务写入的数据是超过 Titan 的默认大小(可以额外配置,目前默认配置是1KB大小作为写入 Titan 的判断依据),此处的逻辑会导致数据写入 Titan 中。

因为写入到 Titan,而目前现网中所有的磁盘 KV 的配置都是使用默认的配置,导致 Titan 中的数据会默认翻倍存储(保留一倍的失效数据)。Titan 这样的配置本意在于空间换取时间,可以更加高效地把磁盘 IO 资源进利用,减少因为 GC 带来的负担。但是对于一些业务写入较低但是对磁盘空间资源比较敏感的场景下,应该
根据实际的情况调整 ratio
确保满足业务的诉求。

三、总结

通过日志,占位文件,数据几个维度,我们对常见的 TiKV 占用磁盘较大的问题进行介绍和给出修复方案,以及其中的注意事项。

如果大家有其他的一些磁盘占用空间较大的问题,欢迎和我们进行交流和探讨。

后续我们会从性能,调度等方面入手,继续对TiKV的一些线上问题进行解析和讲解。请大家多多期待。

书接上回,我们继续来聊散列表的代码实现。

相信通过前面两章对散列表的学习,大家应该已经掌握了散列表的基础知识,今天我们就选用简单的取模方式构建散列函数,分别实现链式法和开放寻址法中的线性探测法来解决碰撞问题,而再散列法则以方法的形式分别在两种实现方法中实现。

01
、链式法实现

1、元素定义

通过前面链式法的详细讲解,我们知道链式法需要构建散列桶,每个桶又指向一个链表,所以首先需要定义一个链表节点对象用来存储散列表的记,而记录中包括key、value以及指向下个节点的指针,代码如下:

//存储散列表的记录
private class Entry
{
    //键
    public TKey Key;
    //值
    public TValue Value;
    //下一个节点
    public Entry Next;
    public Entry(TKey key, TValue value)
    {
        Key = key;
        Value = value;
        Next = null;
    }
}

2、初始化 Init

定义好链表,我们还需要定义散列桶,其实就是定义一个数组,同时我们在定义两个私有变量分别维护桶的数量和散列表总的元素个数。

而初始化方法主要就是根据指定初始容量来初始化这些变量,如果不指定初始容量则默认为16,具体代码如下:

//散列桶数组
private Entry[] _buckets;
//桶的数量
private int _size;
//元素数量
private int _count;
//初始化指定容量的散列表
public MyselfHashChaining<TKey, TValue> Init(int capacity = 16)
{
    //桶数量
    _size = capacity;
    //初始化桶数组
    _buckets = new Entry[capacity];
    _count = 0;
    return this;
}

3、获取散列元素数量 Count

获取散列表元素数量只需返回维护元素数量的私有字段即可,实现如下:

//元素数量
public int Count
{
    get
    {
        return _count;
    }
}

4、插入 Insert

插入方法相对比较复杂,我们可以大致分为以下几步:

(1)检测负载因子是否达到阈值,超过则触发再散列动作;

(2)构建好新的键值对象;

(3)检测新的键所在的桶是否有元素,没有元素则直接插入新对象;

(4)如果键所在桶有元素,则遍历桶中链表,已存在相同key则更新value,否则插入新对象;

(5)维护元素数量;

具体代码实现如下:

//插入键值
public void Insert(TKey key, TValue value)
{
    //负载因子达到 0.75 触发重新散列
    if (_count >= _size * 0.75)
    {
        Rehash();
    }
    //计算key的散列桶索引
    var index = CalcBucketIndex(key);
    //新建一条散列表记录
    var newEntry = new Entry(key, value);
    //判断key所在桶索引位置是否为空
    if (_buckets[index] == null)
    {
        //如果为空,则直接存储再此桶索引位置
        _buckets[index] = newEntry;
    }
    else
    {
        //如果不为空,则存储在此桶里的链表上
        //取出此桶中的记录即链表的头节点
        var current = _buckets[index];
        //遍历链表
        while (true)
        {
            //如果链表中存在相同的key,则更新其value
            if (current.Key.Equals(key))
            {
                //更新值
                current.Value = value;
                return;
            }
            //如果当前节点没有后续节点,则停止遍历链表
            if (current.Next == null)
            {
                break;
            }
            //如果当前节点有后续节点,则继续遍历链表后续节点
            current = current.Next;
        }
        //如果链表中不存在相同的key
        //则把新的散列表记录添加到链表尾部
        current.Next = newEntry;
    }
    //元素数量加1
    _count++;
}
//计算key的散列桶索引
private int CalcBucketIndex(TKey key)
{
    //使用取模法计算索引,使用绝对值防止负数索引
    return Math.Abs(key.GetHashCode() % _size);
}

5、删除 Remove

删除逻辑和插入逻辑类似,都需要先计算key所在的散列桶,然后再处理桶中链表,只需要把链表上相应的节点删除即可,具体代码如下:

//根据key删除记录
public void Remove(TKey key)
{
    //计算key的散列桶索引
    var index = CalcBucketIndex(key);
    //取出key所在桶索引位置的记录即链表的头节点
    var current = _buckets[index];
    //用于暂存上一个节点
    Entry previous = null;
    //遍历链表
    while (current != null)
    {
        //如果链表中存在相同的key,则删除
        if (current.Key.Equals(key))
        {
            if (previous == null)
            {
                //删除头节点
                _buckets[index] = current.Next;
            }
            else
            {
                //删除中间节点
                previous.Next = current.Next;
            }
            //元素数量减1
            _count--;
            return;
        }
        //当前节点赋值给上一个节点变量
        previous = current;
        //继续遍历链表后续节点
        current = current.Next;
    }
    //如果未找到key则报错
    throw new KeyNotFoundException($"未找到key");
}

6、查找 Find

查找逻辑和插入、删除逻辑类似,都是先计算key所在桶位置,然后处理桶中链表,直至找到相应的元素,代码如下:

//根据key查找value
public TValue Find(TKey key)
{
    //计算key的散列桶索引
    var index = CalcBucketIndex(key);
    //取出key所在桶索引位置的记录即链表的头节点
    var current = _buckets[index];
    //遍历链表
    while (current != null)
    {
        //如果链表中存在相同的key,则返回value
        if (current.Key.Equals(key))
        {
            return current.Value;
        }
        //如果当前节点有后续节点,则继续遍历链表后续节点
        current = current.Next;
    }
    //如果未找到key则报错
    throw new KeyNotFoundException($"未找到key");
}

7、获取所有键 GetKeys

获取所有键,是遍历所有散列桶即桶中链表上的所有元素,最后取出所有key。

//获取所有键
public TKey[] GetKeys()
{
    //初始化所有key数组
    var keys = new TKey[_count];
    var index = 0;
    //遍历散列桶
    for (var i = 0; i < _size; i++)
    {
        //获取每个桶链表头节点
        var current = _buckets[i];
        //遍历链表
        while (current != null)
        {
            //收集键
            keys[index++] = current.Key;
            //继续遍历链表后续节点
            current = current.Next;
        }
    }
    //返回所有键的数组
    return keys;
}

8、获取所有值 GetValues

获取所有值,是遍历所有散列桶即桶中链表上的所有元素,最后取出所有value。

//获取所有值
public TValue[] GetValues()
{
    //初始化所有value数组
    var values = new TValue[_count];
    var index = 0;
    //遍历散列桶
    for (var i = 0; i < _size; i++)
    {
        //获取每个桶链表头节点
        var current = _buckets[i];
        //遍历链表
        while (current != null)
        {
            //收集值
            values[index++] = current.Value;
            //继续遍历链表后续节点
            current = current.Next;
        }
    }
    //返回所有值的数组
    return values;
}

9、再散列 Rehash

再散列也是比较有挑战的一个方法,这里并没有像上一篇文章中说的去实现分批次迁移老数据,而是一次性迁移,对分批次迁移感兴趣的可用自己实现试试。

这里的实现是非常简单的,就是遍历所有老数据,然后对每个老数据重新执行一次插入操作,具体代码如下:

//再散列
public void Rehash()
{
    //扩展2倍大小
    var newSize = _size * 2;
    //更新桶数量
    _size = newSize;
    //初始化元素个数
    _count = 0;
    //暂存老的散列表数组
    var oldBuckets = _buckets;
    //初始化新的散列表数组
    _buckets = new Entry[newSize];
    //遍历老的散列桶
    for (var i = 0; i < oldBuckets.Length; i++)
    {
        //获取老的散列桶的每个桶链表头节点
        var current = oldBuckets[i];
        //遍历链表
        while (current != null)
        {
            //调用插入方法
            Insert(current.Key, current.Value);
            //暂存下一个节点
            var next = current.Next;
            if (next == null)
            {
                break;
            }
            //继续处理下一个节点
            current = next;
        }
    }
}

02
、开放寻址法实现

1、元素定义

该元素的定义和链式法实现的元素定义略有不同,首先不需要指向下一个节点的指针,其次需要一个标记位用来标记空位或被删除。因为如果删除后直接置空则可能会导致后续查找过程中出现误判,因为如果置空,而后面还有相同散列值元素,但是探测方法探测到空值后会停止探测后续元素,从而引发错误,具体实现代码如下:

//存储散列表
private struct Entry
{
    //键
    public TKey Key;
    //值
    public TValue Value;
    //用于标记该位置是否被占用
    public bool IsActive;
}

2、初始化 Init

初始化方法主要就是根据指定初始容量来初始化散列表以及其大小和总的元素数量,如果不指定初始容量则默认为16,具体代码如下:

//散列表数组
private Entry[] _array;
//散列表的大小
private int _size;
//元素数量
private int _count;
//初始化指定容量的散列表
public MyselfHashOpenAddressing<TKey, TValue> Init(int capacity = 16)
{
    //散列表的大小
    _size = capacity;
    //初始化散列表数组
    _array = new Entry[capacity];
    _count = 0;
    return this;
}

3、获取散列元素数量 Count

获取散列表元素数量只需返回维护元素数量的私有字段即可,实现如下:

//元素数量
public int Count
{
    get
    {
        return _count;
    }
}

4、插入 Insert

此插入方法和链式法实现整体思路相差不大具体实现上略有差别,我们可以大致分为以下几步:

(1)检测负载因子是否达到阈值,超过则触发再散列动作;

(2)检测新的键所在的位置是否有元素,没有元素或位置非被占用则直接插入新对象;

(4)如果键所在位置有元素并且位置被占用,则线性探测后续位置,已存在相同key则更新value,否则插入新对象;

(5)维护元素数量;

具体代码实现如下:

//插入键值
public void Insert(TKey key, TValue value)
{
    //负载因子达到 0.75 触发重新散列
    if (_count >= _size * 0.75)
    {
        Rehash();
    }
    //计算key的散列表索引
    var index = CalcIndex(key);
    //遍历散列表,当位置为非占用状态则结束探测
    while (_array[index].IsActive)
    {
        //如果散列表中存在相同的key,则更新其value
        if (_array[index].Key.Equals(key))
        {
            _array[index].Value = value;
            return;
        }
        //否则,使用线性探测法,继续探测下一个元素
        index = (index + 1) % _size;
    }
    //在非占用位置处添加新元素
    _array[index] = new Entry
    {
        Key = key,
        Value = value,
        IsActive = true
    };
    //元素数量加1
    _count++;
}
//计算key的散列表索引
private int CalcIndex(TKey key)
{
    //使用取模法计算索引,使用绝对值防止负数索引
    return Math.Abs(key.GetHashCode() % _size);
}

5、删除 Remove

删除逻辑和插入逻辑类似,都需要先计算key所在的散列表中的索引,循环探测后续位置元素如果发现相同的key,则标记元素为非占用状态,具体代码如下:

//根据key删除元素
public void Remove(TKey key)
{
    //计算key的散列表索引
    var index = CalcIndex(key);
    //遍历散列表,当位置为非占用状态则结束探测
    while (_array[index].IsActive)
    {
        //如果散列表中存在相同的key,则标记为非占用状态
        if (_array[index].Key.Equals(key))
        {
            _array[index].IsActive = false;
            //元素数量减1
            _count--;
            return;
        }
        //否则,使用线性探测法,继续探测下一个元素
        index = (index + 1) % _size;
    }
    //如果未找到key则报错
    throw new KeyNotFoundException($"未找到key");
}

6、查找 Find

查找逻辑和插入、删除逻辑类似,都是先计算key所在索引,如果有元素并且位置标记为被占用且key相同则返回此元素,否则线性探测后续元素,如果最后未找到则报错,代码如下:

//根据key查找value
public TValue Find(TKey key)
{
    //计算key的散列表索引
    int index = CalcIndex(key);
    while (_array[index].IsActive)
    {
        //如果散列表中存在相同的key,则返回value
        if (_array[index].Key.Equals(key))
        {
            return _array[index].Value;
        }
        //否则,使用线性探测法,继续探测下一个元素
        index = (index + 1) % _size;
    }
    //如果未找到key则报错
    throw new KeyNotFoundException($"未找到key");
}

7、获取所有键 GetKeys

获取所有键,是遍历所有散列表所有元素,最后取出标记为被占用状态的所有key。

//获取所有键
public IEnumerable<TKey> GetKeys()
{
    //遍历散列表
    for (var i = 0; i < _size; i++)
    {
        //收集所有占用状态的键
        if (_array[i].IsActive)
        {
            yield return _array[i].Key;
        }
    }
}

8、获取所有值 GetValues

获取所有值,是遍历所有散列表所有元素,最后取出标记为被占用状态的所有value。

//获取所有值
public IEnumerable<TValue> GetValues()
{
    //遍历散列表
    for (var i = 0; i < _size; i++)
    {
        //收集所有占用状态的值
        if (_array[i].IsActive)
        {
            yield return _array[i].Value;
        }
    }
}

9、再散列 Rehash

这里的实现和链式法实现思路一样,就是遍历所有老数据,然后对每个老数据重新执行一次插入操作,具体代码如下:

//再散列
public void Rehash()
{
    //扩展2倍大小
    var newSize = _size * 2;
    //暂存老的散列表数组
    var oldArray = _array;
    //初始化新的散列表数组
    _array = new Entry[newSize];
    //更新散列表大小
    _size = newSize;
    //初始化元素个数
    _count = 0;
    //遍历老的散列表数组
    foreach (var entry in oldArray)
    {
        if (entry.IsActive)
        {
            //如果是占用状态
            //则重新插入到新的散列表数组中
            Insert(entry.Key, entry.Value);
        }
    }
}


:测试方法代码以及示例源码都已经上传至代码库,有兴趣的可以看看。
https://gitee.com/hugogoos/Planner

别再忽视!PostgreSQL  Public 模式的风险以及安全迁移

作者:桦仔

10余年DBA工作经验

微信:debolop

QQ交流群:740052625

公众号:数据库实战派

问题起因

前几天有群友在群里面咨询

PG12,13,14,public模式是否可以删除或改名?


因为这位群友的公司的PG规范做了修改,不让使用public模式存放数据,但是遗留问题没办法。

另外一位群友说到

你还真不好动public。扩展的插件的函数大多默认都在public 下。

PG中默认的public模式带来的问题

  • 安全性问题

public 模式默认对所有数据库用户都开放访问权限。换句话说,所有连接到数据库的用户默认都可以访问 public 模式中的对象(除非你手动修改权限)。

  • 命名冲突

public 模式是所有用户和所有扩展默认使用的模式,容易发生命名冲突。

  • 可维护性和隔离性

使用 public 模式进行业务操作会使数据库的架构设计显得杂乱无章,随着时间推移,尤其是在大型项目或多个项目共享数据库时,public模式中的对象数量会急剧增加

  • 版本和扩展的兼容性问题

许多 PostgreSQL 扩展默认使用 public 模式,如果修改 public 模式或删除它,可能会导致扩展无法正常工作


能否重命名 public 模式

我们能不能通过下面命令对public 模式名重命名 ?

ALTER SCHEMA public RENAME TO you_schema;

实际上重命名 public 模式是不推荐的做法,原因如下

  1. 依赖性问题
    :许多扩展、插件和默认的 PostgreSQL 设置都假定 public 模式存在。如果直接修改 public 的名称,会导致这些依赖出现问题。
  2. 升级问题
    :未来如果 PostgreSQL 版本升级,系统或新安装的扩展可能仍然依赖于 public 模式存在。

因此,最好的做法是保留 public 模式,但不在业务中使用它。


如何解决这个问题

实际上,我们可以使用迁移的方式,新建一个模式,然后把public模式下的所有业务对象迁移到新建模式下

具体步骤

第一步
:创建新的模式

CREATE SCHEMA employee;


第二步
:迁移所有对象:对表、视图、函数、存储过程等对象分别执行 SET SCHEMA 操作,将它们从 public 模式迁移到 employee 模式。

迁移对象时小心依赖关系,如外键、索引、函数依赖等,迁移时需要确保这些依赖关系不被破坏

使用以下命令逐个迁移:

-- 迁移所有表
ALTER TABLE public.table_name SET SCHEMA employee;

-- 迁移所有视图
ALTER VIEW public.view_name SET SCHEMA employee;

-- 迁移所有函数
ALTER FUNCTION public.function_name SET SCHEMA employee;

-- 迁移所有存储过程
ALTER PROCEDURE public.procedure_name SET SCHEMA employee;

使用 SQL 动态语句和 PL/pgSQL 编写一个循环来批量迁移 public 模式中的所有表、视图、函数和存储过程到 employee 模式。

DO $$ 
DECLARE
    obj record;
BEGIN
    -- 迁移所有表
    FOR obj IN
        SELECT tablename
        FROM pg_tables
        WHERE schemaname = 'public'
    LOOP
        EXECUTE format('ALTER TABLE public.%I SET SCHEMA employee;', obj.tablename);
    END LOOP;

    -- 迁移所有视图
    FOR obj IN
        SELECT viewname
        FROM pg_views
        WHERE schemaname = 'public'
    LOOP
        EXECUTE format('ALTER VIEW public.%I SET SCHEMA employee;', obj.viewname);
    END LOOP;

    -- 迁移所有函数
    FOR obj IN
        SELECT routine_name, routine_schema
        FROM information_schema.routines
        WHERE specific_schema = 'public'
    LOOP
        EXECUTE format('ALTER FUNCTION public.%I() SET SCHEMA employee;', obj.routine_name);
    END LOOP;

    -- 迁移所有存储过程
    FOR obj IN
        SELECT routine_name, routine_schema
        FROM information_schema.routines
        WHERE specific_schema = 'public' AND routine_type = 'PROCEDURE'
    LOOP
        EXECUTE format('ALTER PROCEDURE public.%I() SET SCHEMA employee;', obj.routine_name);
    END LOOP;

END $$;


第三步
:设置 search_path 通过调整 search_path 让数据库默认使用 employee 模式。

search_path 的设置顺序非常重要。

将 employee 模式放在前面,确保在业务操作时优先查找 employee 模式的对象,而 public 作为备选模式保留(方便扩展和插件的使用)。

可以修改 PostgreSQL 的 postgresql.conf 文件,或者在会话级别设置 search_path:

SET search_path TO employee, public;


第四步
:考虑扩展和插件

许多扩展和插件默认使用 public 模式,例如 PostGIS、pgcrypto 等。

为了避免问题,最好不要修改 public 模式,而是保持其作为扩展使用的默认模式。


为什么SQL Server 没有这个问题

SQL Server 没有像 PostgreSQL 那样对 public 模式的强烈依赖,并且其设计理念与 PostgreSQL 的 public 模式存在一些关键区别。

  1. 权限管理的不同

在 SQL Server 中,dbo 是默认的 schema,所有数据库用户默认情况下并不会拥有对 dbo 这个 schema 中对象的完全访问权限。只有拥有 db_owner 角色的用户才可以完全控制 dbo 这个 schema。

也就是说,除非用户显式授予对 dbo 中对象的访问或修改权限,否则,普通用户是不能随意访问或修改 dbo 这个 schema 下的对象的。

相比之下,PostgreSQL 的 public 这个 schema 在默认情况下是对所有用户开放的。这意味着所有用户都可以在 public 这个 schema 中创建对象,除非手动限制权限。

PostgreSQL的设计会增加意外权限授予和数据泄露的风险,因此在 PostgreSQL 中有时需要避免使用 public schema。


  1. 模式设计理念的不同

在 PostgreSQL 中,public schema 设计为一个所有用户共享的默认命名空间,因此经常发生命名冲突、权限管理不严等问题。

在 SQL Server 中,dbo 是为拥有数据库完全控制权的用户预留的默认命名空间,通常普通用户和 DBA 可以自行创建自定义 schema 来组织和隔离各自的数据库对象。


参考文章

https://sdwh.dev/posts/2021/03/SQL-Server-What-Is-dbo/

https://www.ibm.com/support/pages/microsoft-sql-server-tables-get-generated-dbo-schema

https://www.postgresql.org/docs/current/ddl-schemas.html

https://www.crunchydata.com/blog/be-ready-public-schema-changes-in-postgres-15

本文版权归作者所有,未经作者同意不得转载。

我们在给数据库用户赋予权限时,有时候不想让该用户看到太多过程表和过程视图,这时就需要限定用户的访问权限

第一步:创建用户

创建数据库连接后,进入安全性——登录名,单击右键,新建登录名,并设置默认数据库

第二步:设置用户映射

点击用户映射,勾选指定要访问的数据库,数据库成员身份默认为public,无需更改,然后单击确定

这时候,我们可以看到指定的数据库用户中增加了刚刚新增的用户

第三步:设置只能访问指定的数据表或视图

指定数据库——安全性——用户——需要指定的用户,单击右键——属性——安全对象——搜索——特定对象——对象类型(选择自己需要指定用户访问的对象类型,我这里选择的是视图),然后点击浏览,勾选需要给用户授权访问的视图,确定即可

第四步:给指定表或视图赋予具体权限

如果是只读,则只需要勾选“选择”权限,设置好后点击确定即可

注意:这里需要每张表或视图逐一设置

第五步:检查权限

用刚刚新增的用户登录数据库,我们发现只能看到改数据库下刚刚赋予权限的视图,其他的表和视图都不可见,达到目标

看很多其他的文章,都说要做这步设置

USE [JTDataPatform]
GO
EXEC dbo.sp_changedbowner N'JT' 

实践证明,执行了这条语句之后,后面如果想删除该用户会遇到各种奇葩的错误提示导致用户无法删除,这时候只需要再重新将owner的权限赋值给sa,然后就可以顺利删除自己新建的用户啦

USE [JTDataPatform]
GO
EXEC dbo.sp_changedbowner N'sa'

实现 .NET 4.0 下的 Task 类相似功能:TaskExCum 组件详解

引言

随着 .NET 技术的发展,异步编程模型逐渐成为现代应用程序开发中的标准实践之一。.NET 4.5 引入了
Task
类,极大地简化了异步编程的过程。然而,许多遗留系统仍在使用 .NET 4.0 或更低版本,这些版本并未直接支持
Task
类的全部功能。为此,我们开发了
TaskExCum
组件,旨在为 .NET 4.0 提供与 .NET 4.5 相似的
Task
功能,包括
Task.Run()

Task.WhenAll()
方法。

组件概述

TaskExCum
是一个静态类,提供了以下主要功能:

  • Run
    方法:用于异步执行任务,并获取任务的结果。
  • WhenAll
    方法:用于等待多个任务完成,并收集所有任务的结果。

实现步骤

接下来,我们将详细讲解
TaskExCum
组件的实现步骤,以便读者能够更好地理解其工作原理,并将其应用于自己的项目中。

步骤 1: 创建
TaskExCum

首先,我们需要创建一个静态类
TaskExCum
,并在其中定义静态方法。

public static class TaskExCum
{
    // 方法定义将在后续步骤中添加
}

步骤 2: 实现
Run
方法

Run
方法允许开发者异步执行任务,并获取任务的结果。我们为
Run
方法提供了两种重载形式,分别用于执行无返回值的操作(
Action
)和有返回值的操作(
Func<TResult>
)。

public static Task<TResult> Run<TResult>(Func<TResult> function)
{
#if NET45
    // 如果目标框架是 .NET 4.5 或更高版本,使用 Task.Run 方法
    return Task.Run(function);
#else
    // 如果目标框架是 .NET 4.0,使用 Task.Factory.StartNew 方法
    return Task.Factory.StartNew(
        function, 
        CancellationToken.None, 
        TaskCreationOptions.None, 
        TaskScheduler.Default
    );
#endif
}

public static Task Run(Action action)
{
#if NET45
    // 如果目标框架是 .NET 4.5 或更高版本,使用 Task.Run 方法
    return Task.Run(action);
#else
    // 如果目标框架是 .NET 4.0,使用 Task.Factory.StartNew 方法
    return Task.Factory.StartNew(
        action, 
        CancellationToken.None, 
        TaskCreationOptions.None, 
        TaskScheduler.Default
    );
#endif
}

详细解释

  • 条件编译
    :使用
    #if NET45
    编译符号,当项目目标框架为 .NET 4.5 及更高版本时,使用
    Task.Run
    方法。
  • Task.Factory.StartNew
    :当项目目标框架为 .NET 4.0 时,使用
    Task.Factory.StartNew
    方法来启动任务。
    • CancellationToken.None
      :表示没有取消令牌,任务不会被外部取消。
    • TaskCreationOptions.None
      :表示没有特殊的任务创建选项。
    • TaskScheduler.Default
      :这是关键点之一。
      TaskScheduler.Default
      表示使用默认的线程池调度器,这意味着任务会在线程池中的一个线程上执行,而不是每次都启动一个新的线程。这有助于提高性能和资源利用率。

步骤 3: 实现
WhenAll
方法

WhenAll
方法用于等待多个任务完成,并收集所有任务的结果。我们为
WhenAll
方法提供了多种重载形式,以支持不同类型的任务集合。

public static Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks)
{
#if NET45
    // 如果目标框架是 .NET 4.5 或更高版本,使用 Task.WhenAll 方法
    return Task.WhenAll(tasks);
#else
    // 如果目标框架是 .NET 4.0,调用 WhenAllCore 方法
    return WhenAllCore(tasks);
#endif
}

public static Task<TResult[]> WhenAll<TResult>(params Task<TResult>[] tasks)
{
#if NET45
    // 如果目标框架是 .NET 4.5 或更高版本,使用 Task.WhenAll 方法
    return Task.WhenAll(tasks);
#else
    // 如果目标框架是 .NET 4.0,调用 WhenAllCore 方法
    return WhenAllCore(tasks);
#endif
}

public static Task WhenAll(IEnumerable<Task> tasks)
{
#if NET45
    // 如果目标框架是 .NET 4.5 或更高版本,使用 Task.WhenAll 方法
    return Task.WhenAll(tasks);
#else
    // 如果目标框架是 .NET 4.0,调用 WhenAllCore 方法
    return WhenAllCore(tasks);
#endif
}

详细解释

  • 条件编译
    :使用
    #if NET45
    编译符号,当项目目标框架为 .NET 4.5 及更高版本时,使用
    Task.WhenAll
    方法。
  • WhenAllCore
    :当项目目标框架为 .NET 4.0 时,调用
    WhenAllCore
    方法来实现相同的功能。

步骤 4: 实现
WhenAllCore
方法

WhenAllCore
方法是
WhenAll
方法的核心实现,负责处理任务集合,等待所有任务完成,并收集结果或异常信息。

private static Task WhenAllCore(IEnumerable<Task> tasks)
{
    return WhenAllCore(tasks, (completedTasks, tcs) => tcs.TrySetResult(null));
}

private static Task<TResult[]> WhenAllCore<TResult>(IEnumerable<Task<TResult>> tasks)
{
    return WhenAllCore(tasks.Cast<Task>(), (completedTasks, tcs) =>
    {
        tcs.TrySetResult(completedTasks.Select(t => ((Task<TResult>)t).Result).ToArray());
    });
}

private static Task<TResult> WhenAllCore<TResult>(IEnumerable<Task> tasks, Action<Task[], TaskCompletionSource<TResult>> setResultAction)
{
    if (tasks == null)
    {
        throw new ArgumentNullException("tasks");
    }

    Contract.EndContractBlock();
    Contract.Assert(setResultAction != null);

    var tcs = new TaskCompletionSource<TResult>();
    var array = (tasks as Task[]) ?? tasks.ToArray();

    if (array.Length == 0)
    {
        // 如果任务集合为空,直接设置结果
        setResultAction(array, tcs);
    }
    else
    {
        Task.Factory.ContinueWhenAll(array, completedTasks =>
        {
            var exceptions = new List<Exception>();
            bool hasCanceled = false;

            foreach (var task in completedTasks)
            {
                if (task.IsFaulted)
                {
                    // 收集所有失败任务的异常信息
                    exceptions.AddRange(task.Exception.InnerExceptions);
                }
                else if (task.IsCanceled)
                {
                    // 检查是否有任务被取消
                    hasCanceled = true;
                }
            }

            if (exceptions.Count > 0)
            {
                // 如果有异常,设置异常结果
                tcs.TrySetException(exceptions);
            }
            else if (hasCanceled)
            {
                // 如果有任务被取消,设置取消结果
                tcs.TrySetCanceled();
            }
            else
            {
                // 如果没有异常且没有任务被取消,设置成功结果
                setResultAction(completedTasks, tcs);
            }
        }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    }

    return tcs.Task;
}

详细解释

  • 参数验证
    :检查传入的任务集合是否为
    null
    ,如果是,则抛出
    ArgumentNullException
  • TaskCompletionSource
    :创建一个
    TaskCompletionSource
    对象,用于管理任务的完成状态。
  • 任务转换
    :将任务集合转换为数组,以便于后续处理。
  • 任务数量检查
    :如果任务集合为空,直接调用
    setResultAction
    设置结果。
  • 等待所有任务完成
    :使用
    Task.Factory.ContinueWhenAll
    方法等待所有任务完成。
    • 异常处理
      :遍历已完成的任务,收集所有失败任务的异常信息。
    • 取消处理
      :检查是否有任务被取消。
    • 设置结果
      :如果没有异常且没有任务被取消,调用
      setResultAction
      设置结果。
    • TaskScheduler.Default
      :这里再次使用
      TaskScheduler.Default
      ,确保任务在默认的线程池中执行,而不是每次都启动新的线程。

步骤 5: 添加异常处理逻辑

为了确保组件的健壮性,我们还需要在
WhenAllCore
方法中添加异常处理逻辑,确保所有异常都能被捕获并正确处理。

private static void AddPotentiallyUnwrappedExceptions(ref List<Exception> targetList, Exception exception)
{
    var ex = exception as AggregateException;
    Contract.Assert(exception != null);
    Contract.Assert(ex == null || ex.InnerExceptions.Count > 0);

    if (targetList == null)
    {
        targetList = new List<Exception>();
    }

    if (ex != null)
    {
        // 如果异常是 AggregateException,添加其内部异常
        targetList.Add(ex.InnerExceptions.Count == 1 ? ex.InnerExceptions[0] : ex);
    }
    else
    {
        // 否则,直接添加异常
        targetList.Add(exception);
    }
}

详细解释

  • 异常类型检查
    :检查传入的异常是否为
    AggregateException
  • 异常列表初始化
    :如果
    targetList

    null
    ,则初始化一个新的列表。
  • 异常添加
    :根据异常的类型,将异常或其内部异常添加到列表中。

示例代码

为了帮助读者更好地理解如何使用
TaskExCum
组件,下面是一些示例代码。

示例 1: 使用
Run
方法

using System;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        try
        {
            // 异步执行任务并等待结果
            string result = TaskExCum.Run(() => "Hello from Task!").Result;
            Console.WriteLine(result);
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error: {ex.Message}");
        }
    }
}

示例 2: 使用
WhenAll
方法

using System;
using System.Linq;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        try
        {
            // 创建多个任务
            var tasks = Enumerable.Range(1, 5).Select(i => TaskExCum.Run(() => i * i)).ToArray();
            
            // 等待所有任务完成并获取结果
            int[] results = TaskExCum.WhenAll(tasks).Result;
            
            // 输出结果
            foreach (var result in results)
            {
                Console.WriteLine(result);
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error: {ex.Message}");
        }
    }
}

结论

通过
TaskExCum
组件,即使是在 .NET 4.0 这样的老框架版本中,我们也能够享受到现代异步编程模型带来的便利。希望这个组件能够帮助那些需要在旧版 .NET 框架中实现异步操作的开发者们,提高他们的开发效率和代码质量。如果你有任何建议或改进意见,欢迎留言交流!


详情请看:
https://www.cnblogs.com/Bob-luo/p/18515670
以上就是关于
TaskExCum
组件的详细介绍。希望通过这篇文章,读者能够更好地理解和使用这个组件,从而在自己的项目中实现高效的异步编程。如果有任何问题或需要进一步的帮助,请随时留言!