2023年4月

简介

在一些实际的场景里,我们需要通过利用一些端口转发工具,比如系统自带的命令行工具或第三方小软件,来绕过网络访问限制触及目标系统。下文为大家总结了linux系统和windows系统端口转发常用的一些方法。

注:Linux实现端口转发需要内核支持,请确保内核参数
net.ipv4.ip_forward
值为
1

一、采用iptables实现

iptables是一个功能丰富的Linux防火墙工具,可以用于配置网络地址转换(NAT)规则,从而实现端口转发。

# 端口映射规则
iptables -t nat -A PREROUTING -p tcp --dport [目标端口] -j DNAT --to-destination [映射地址]:[映射端口]
iptables -t nat -A POSTROUTING -p tcp -d [映射地址] --dport [映射端口] -j SNAT --to-source [本地地址]

# 案例:将远程主机 192.168.1.101 的 80 端口映射到本地的 8080 端口,输入以下命令
iptables -t nat -A PREROUTING -p tcp --dport 8080 -j DNAT --to-destination 192.168.1.101:80
iptables -t nat -A POSTROUTING -p tcp -d 192.168.1.101 --dport 80 -j SNAT --to-source [本地IP]
# 注意要将 [本地IP] 替换为本地 IP 地址。

# 查看转发规则
iptables -t nat --list --line-number

参数说明

  • -t nat
    :指定转换表(nat 表);
  • -A PREROUTING
    :指定要添加到哪个链中;
  • -p tcp
    :指定协议为 TCP;
  • --dport [目标端口]
    :指定需要映射到的目标端口;
  • -j DNAT
    :指定使用目标地址转换;
  • --to-destination [映射地址]:[映射端口]
    :指定目标地址和端口;
  • -A POSTROUTING
    :指定要添加到哪个链中;
  • -d [映射地址]
    :指定目标地址;
  • --dport [映射端口]
    :指定目标端口;
  • -j SNAT
    :指定使用 源地址转换;
  • --to-source [本地地址]
    :指定本地地址。

二、采用firewalld实现

firewalld是另一个功能强大的Linux防火墙工具,也可以用于配置网络地址转换(NAT)规则来实现端口转发。

# 规则
firewall-cmd --zone=public --add-forward-port=port=[本机端口]:proto=[协议]:toaddr=[远程主机IP]:toport=[远程主机端口] --permanent

# 案例:将192.168.1.10的80端口转发到本机的8080
firewall-cmd --zone=public --add-forward-port=port=8080:proto=tcp:toaddr=192.168.1.10:toport=80 --permanent
firewall-cmd --reload

# 查看已设置的规则
firewall-cmd --list-all

参数说明

  • --zone=public
    :指定了这条规则的生效区域是public
  • --add-forward-port
    :指定了添加一条端口转发规则,具体规则参数解释如下:
    • port
      :需要转发的端口
    • proto
      :协议类型,支持tcp、udp、icmp
    • toport
      :目标端口
    • toaddr
      :可选参数,转发到指定IP地址

三、采用ssh隧道实现

利用SSH为TCP链接提供的隧道功能实现端口转发。
# 规则
ssh -N -L [远程端口]:[本地地址]:[本地端口] root@[远程地址]

# 案例:将服务器 172.20.150.199 上的 8000 端口映射到本地的 8888 端口
ssh -N -L 8000:localhost:8888 root@172.20.148.199

更多高级用法可访问
https://www.lixueduan.com/posts/linux/07-ssh-tunnel/

四、采用nc实现

NC(也称作Netcat)是一个类Unix操作系统中的网络工具,有着强大的端口转发功能。

# 格式
nc -lp [本地映射端口] -c "nc [远程主机地址] [远程主机端口]"

# 案例:将192.168.1.110的3306映射到本机的3000端口
nc -lp 3000 -c "nc 192.168.1.110 3306"

五、采用ncat实现

ncat是一个多功能网络工具,可用于端口转发。它是nc(netcat)的升级版本。

# 规则
ncat --sh-exec "ncat [远程主机地址] [远程主机端口]" -l [本地映射端口]  --keep-open

# 案例:将10.100.39.144的80端口转发到本地8000
ncat --sh-exec "ncat 10.100.39.144 80" -l 8000 --keep-open

六、采用socat实现

socat是一款非常强大的网络工具,在实现端口映射时,可以用它来建立TCP连接,将本地端口映射到远程机器上的一个端口。下面是使用socat实现端口映射的步骤:

# 规则
socat TCP-LISTEN:[本地映射端口],reuseaddr,fork TCP:[远程主机地址]:[远程主机端口]

# 案例: 将192.168.199.236的22端口,映射到本机的7777端口
socat TCP-LISTEN:7777,reuseaddr,fork TCP:192.168.199.236:22

七、Windows系统使用netsh实现

在 Windows 操作系统中,可以使用 netsh 命令实现网络端口转发。以下是使用 netsh 实现端口转发的步骤:

# 端口转发
netsh interface portproxy add v4tov4 listenaddress=0.0.0.0 listenport=80 connectaddress=10.100.38.14 connectport=80
netsh interface portproxy add v4tov4 listenaddress=0.0.0.0 listenport=443 connectaddress=10.100.38.14 connectport=443

# 展示
netsh interface portproxy show v4tov4
侦听 ipv4:                 连接到 ipv4:

地址            端口        地址            端口
--------------- ----------  --------------- ----------
0.0.0.0         80          10.100.38.14    80
0.0.0.0         443         10.100.38.14    443

# 删除
netsh interface portproxy delete v4tov4 listenaddress=0.0.0.0 listenport=80

话接上篇 [ASP.NET Core - 缓存之内存缓存(上)],所以这里的目录从 2.4 开始。

2.4 MemoryCacheEntryOptions

MemoryCacheEntryOptions 是内存缓存配置类,可以通过它配置缓存相关的策略。除了上面讲到的过期时间,我们还能够设置下面这些:

  • 设置缓存优先级。
  • 设置在从缓存中逐出条目后调用的 PostEvictionDelegate。
    回调在与从缓存中删除项的代码不同的线程上运行。
  • 限制缓存大小
var memoryCacheEntryOption = new MemoryCacheEntryOptions();
memoryCacheEntryOption.AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(3);
// 缓存优先级,Low, Normal, High, NeverRemove,Normal是默认值,与缓存删除时的策略有关
memoryCacheEntryOption.SetPriority(CacheItemPriority.Normal);
// 注册缓存项删除回调事件
memoryCacheEntryOption.RegisterPostEvictionCallback(PostEvictionDelegate);
_cache.Set(CacheKey, cacheValue, memoryCacheEntryOption);

public void PostEvictionDelegate(object cacheKey, object cacheValue, EvictionReason evictionReason, object state)
{
	var memoryCache = (IMemoryCache)state;

	Console.WriteLine($"Entry {cacheKey}:{cacheValue} was evicted: {evictionReason}.");
}

image

缓存大小限制要配合 MemoryCache 实例的配置来使用。MemoryCache 实例可以选择指定并强制实施大小限制。 缓存大小限制没有定义的度量单位,因为缓存没有度量条目大小的机制。 如果设置了缓存大小限制,则所有条目都必须指定大小。 ASP.NET Core 运行时不会根据内存压力限制缓存大小。 由开发人员限制缓存大小。 指定的大小采用开发人员选择的单位。

例如:

  • 如果 Web 应用主要缓存字符串,则每个缓存条目的大小可以是字符串长度。
  • 应用可以将所有条目的大小指定为 1,大小限制是条目计数。
    如果未设置
    SizeLimit
    ,则缓存会无限增长。 系统内存不足时,ASP.NET Core 运行时不会剪裁缓存。 应用必须构建为:
    • 限制缓存增长。
    • 在可用内存受限时调用
      Compact

      Remove

这里的意思是,缓存大小没有单位,我们可以设置一个总的大小,然后为每个缓存条目设置一个大小。如果没有设置大小的情况下,缓存可能会无限增长,直至用完服务器上的所有内存。

// 我们可以在进行内存缓存注册的时候设置缓存大小限制
services.AddMemoryCache(options =>
{
	options.SizeLimit = 1024;
});

// 之后设定每个缓存项的大小,可根据开发人员的判断设置,例如这里无论多大都设置为 1,则最多有 1024 个缓存项
memoryCacheEntryOption.SetSize(1);
_cache.Set(CacheKey, cacheValue, memoryCacheEntryOption);

2.5 缓存清理

缓存到期后不会在后台自动清理。 没有计时器可以主动扫描缓存中的到期项。 而缓存上的任何活动(Get、Set、Remove)都可触发在后台扫描到期项。 如果使用了CancellationTokenSource ,则其计时器 (
CancelAfter
) 也会删除条目并触发扫描到期项,这个是下面要讲到的。

除了在对缓存进行操作时,会触发对相应的缓存项进行过期检查之外,我们还可以通过手动调用 Remove 方法和 Compact 方法对缓存进行清理。

_cache.Remove(cacheKey);
_cache.Compact(.25);

Compact 方法会尝试按以下顺序删除指定百分比的缓存:

  • 所有到期项。
  • 按优先级排列的项。 首先删除最低优先级的项。
  • 最近最少使用的对象。
  • 绝对到期时间最短的项。
  • 可调到期时间最短的项。

永远不会删除优先级为
NeverRemove
的固定项。 上面的代码的作用就是删除cacheKey对于的缓存项,并调用 Compact 以删除 25% 的缓存条目。

2.6 缓存组

缓存项的过期策略除了上面讲到的过期时间的设置之外,还可以通过
CancellationChangeToken
来控制,通过它可以同时控制多个缓存对象的过期策略,实现相关的缓存同时过期,构成一个组的概念。

以下为示例代码:

首先先定义两个方法,将缓存设置和缓存读取分开:

public interface ICacheService
{
	public void PrintDateTimeNow();

	public void SetGroupDateTime();

	public void PrintGroupDateTime();
}

public class CacheService : ICacheService 
{
	public const string CacheKey = "CacheTime";
	public const string DependentCancellationTokenSourceCacheKey = "DependentCancellationTokenSource";
	public const string ParentCacheKey = "Parent";
	public const string ChildCacheKey = "Chlid";
	private readonly IMemoryCache _cache;
	public CacheService(IMemoryCache memoryCache)
	{
		_cache = memoryCache;
	}

	public void PrintDateTimeNow()
	{
		var time = DateTime.Now;
		if (!_cache.TryGetValue(CacheKey, out DateTime cacheValue))
		{
			cacheValue = time;
			// 设置绝对过期时间
			// 两种实现的功能是一样的,只是时间设置的方式不同而已
			// 传入的是 AbsoluteExpirationRelativeToNow, 相对当前的绝对过期时间,传入时间差,会根据当前时间算出绝对过期时间
			// _cache.Set(CacheKey, cacheValue, TimeSpan.FromSeconds(2));
			// 传入的是 AbsoluteExpiration,绝对过期时间,传入一个DateTimeOffset对象,需要明确的指定具体的时间
			// _cache.Set(CacheKey, cacheValue, DateTimeOffset.Now.AddSeconds(2));

			//var memoryCacheEntryOption = new MemoryCacheEntryOptions();
			//// 滑动过期时间是一个相对时间
			//memoryCacheEntryOption.SlidingExpiration = TimeSpan.FromSeconds(3);
			//_cache.Set(CacheKey, cacheValue, memoryCacheEntryOption);

			var memoryCacheEntryOption = new MemoryCacheEntryOptions();
			memoryCacheEntryOption.AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(3);
			// 缓存优先级,Low, Normal, High, NeverRemove,Normal是默认值,与缓存删除时的策略有关
			memoryCacheEntryOption.SetPriority(CacheItemPriority.Normal);
			memoryCacheEntryOption.RegisterPostEvictionCallback(PostEvictionDelegate);
			// 之后设定每个缓存项的大小,可根据开发人员的判断设置,例如这里无论多大都设置为 1,则最多有 1024 个缓存项
			memoryCacheEntryOption.SetSize(1);
			_cache.Set(CacheKey, cacheValue, memoryCacheEntryOption);
		}
		time = cacheValue;

		Console.WriteLine("缓存时间:" + time.ToString("yyyy-MM-dd HH:mm:ss"));
		Console.WriteLine("当前时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));
	}

	public void PostEvictionDelegate(object cacheKey, object cacheValue, EvictionReason evictionReason, object state)
	{
		var memoryCache = (IMemoryCache)state;

		Console.WriteLine($"Entry {cacheKey}:{cacheValue} was evicted: {evictionReason}.");
	}

	public void SetGroupDateTime()
	{
		// 这里为了将 CancellationTokenSource 保存起来,以便在外部可以获取得到
		var cancellationTokenSource = new CancellationTokenSource();
		_cache.Set(
			DependentCancellationTokenSourceCacheKey,
			cancellationTokenSource);

		using var parentCacheEntry = _cache.CreateEntry(ParentCacheKey);

		parentCacheEntry.Value = DateTime.Now;

		Task.Delay(TimeSpan.FromSeconds(1)).Wait();

		_cache.Set(
			ChildCacheKey,
			DateTime.Now,
			new CancellationChangeToken(cancellationTokenSource.Token));
	}

	public void PrintGroupDateTime()
	{
	   if(_cache.TryGetValue(ParentCacheKey, out DateTime parentCacheDateTime))
		{
			Console.WriteLine("ParentDateTime:" + parentCacheDateTime.ToString("yyyy-MM-dd HH:mm:ss"));
		}
		else
		{
			Console.WriteLine("ParentDateTime is canceled");
		}

		if (_cache.TryGetValue(ChildCacheKey, out DateTime childCacheDateTime))
		{
			Console.WriteLine("ChildDateTime:" + childCacheDateTime.ToString("yyyy-MM-dd HH:mm:ss"));
		}
		else
		{
			Console.WriteLine("ChildDateTime is canceled");
		}
	}
}

之后改造一下入口文件中的测试代码:

var service = host.Services.GetRequiredService<ICacheService>();
service.SetGroupDateTime();

service.PrintGroupDateTime();

service.PrintGroupDateTime();

var cache = host.Services.GetRequiredService<IMemoryCache>();
var cancellationTokenSource = cache.Get<CancellationTokenSource>(CacheService.DependentCancellationTokenSourceCacheKey);
cancellationTokenSource.Cancel();

service.PrintGroupDateTime();

从控制台输出可以看得到前两次缓存获取正常,当调用 CancellationTokenSource.Cancel() 方法取消请求之后,缓存过期了。

image

如果使用
CancellationTokenSource
,则允许将多个缓存条目作为一个组逐出。 使用上述代码中的 using 模式,在 using 范围内创建的缓存条目会继承触发器和到期设置。不过这种方式只有 using 范围的缓存项和 using 范围内使用 CancellationTokenSource 的缓存项可以构成一个组,如果范围内还有其他的缓存项,是不算在一个组内的。

image

如果要把多个缓存项全部纳入一个组,还可以用以下的方式:

_cache.Set(ParentCacheKey,
	DateTime.Now,
	new CancellationChangeToken(cancellationTokenSource.Token));

_cache.Set(
	ChildCacheKey,
	DateTime.Now,
	new CancellationChangeToken(cancellationTokenSource.Token));

_cache.Set(
	ChildsCacheKey,
	DateTime.Now,
	new CancellationChangeToken(cancellationTokenSource.Token));

image

2.7 一些注意事项

  • 使用回调重新填充缓存项时:

    • 多个请求可以发现缓存的键值为空,因为回调没有完成。
    • 这可能会导致多个线程重新填充缓存项。

  • 当使用一个缓存条目创建另一个缓存条目时,子条目会复制父条目的到期令牌和基于时间的到期设置。 手动删除或更新父条目时,子条目不会过期。

官方文档上还有另外几条,但是我这边在上面有讲过了,这里就不再重复了。



参考文章:
ASP.NET Core 中的内存中缓存



ASP.NET Core 系列:

目录:
ASP.NET Core 系列总结
上一篇:
ASP.NET Core - 缓存之内存缓存(上)

在使用Kafka的过程中,消费者断掉之后,再次开始消费时,消费者会从断掉时的位置重新开始消费。

场景再现:比如昨天消费者晚上断掉了,今天上午我们会发现kafka消费的数据不是最新的,而是昨天晚上的数据,由于数据量比较多,也不会及时的消费到今天上午的数据,这个时候就需要我们对偏移量进行重置为最新的,以获取最新的数据。

前提,我们使用的AutoOffsetReset配置是Latest,即从连接到Kafka那一刻开始消费之后产生的消息,之前发布的消息不在消费,这也是默认的配置。

关于AutoOffsetReset这个枚举的配置项如下:

    • latest
      (default) which means consumers will read messages from the tail of the partition
      最新(默认) ,这意味着使用者将从分区的尾部读取消息,只消费最新的信息,即自从消费者上线后才开始推送来的消息。那么会导致忽略掉之前没有处理的消息。
    • earliest
      which means reading from the oldest offset in the partition
      这意味着从分区中最早的偏移量读取;自动从消费者上次开始消费的位置开始,进行消费。
    • none
      throw exception to the consumer if no previous offset is found for the consumer's group
      如果没有为使用者的组找到以前的偏移量,则不会向使用者抛出异常。

接下来,我们直接使用下面这一段代码即可:

使用Assign订阅指定的分区,注意最后还需要使用Subscribe方法订阅

consumer.Assign(new TopicPartitionOffset(new TopicPartition(topic, new Partition(1)),Offset.End));//从指定的Partition订阅消息使用Assign方法

consumer.Subscribe(topic);//订阅消息使用Subscribe方法

从指定的分区获取数据,并且指定了对应的偏移量

关于Offset这个枚举不同配置项的说明如下:

Offset 可以被设置为 Beginning、End、Stored 和 Unset。这些值的含义如下:

  1. Beginning:从 Kafka 分区的最早消息(Offset 为 0)开始消费。如果分区中有新消息产生,消费者会继续消费这些消息。

  2. End:从 Kafka 分区的最新消息开始消费。如果消费者在启动后到达了 Kafka 分区的末尾,它将停止消费,并等待新消息的到来。

  3. Stored:从消费者存储的 Offset 开始消费。这个 Offset 通常是消费者在上次停止消费时存储的 Offset。如果存储的 Offset 失效或者已过期,消费者会从最新的消息(End)开始消费。

  4. Unset:在消费者启动时,Offset 没有被设置。在这种情况下,消费者将根据 auto.offset.reset 配置项的值来决定从哪里开始消费。如果 auto.offset.reset 的值为 latest,则从最新的消息开始消费;如果 auto.offset.reset 的值为 earliest,则从最早的消息开始消费。

需要注意的是,如果设置了 Stored 的 Offset,但是在 Kafka 中找不到对应的消息,消费者将会从最新的消息(End)开始消费。

因此,存储的 Offset 必须要有效才能够被正确地使用

作者:京东物流 陈维

一、引言

G.J.Myers在《软件测试的艺术》中提出:从心理学角度来说,测试是一个为了寻找错误而运行程序的过程。

那么安全测试则是一个寻找系统潜在安全问题的过程,通过测试手段发现系统中可能存在的安全问题和风险,分析并进行优化,保障系统的安全质量。

从应用安全维度出发,展开系列安全测试工作,包括不限于:安全前置扫描、安全渗透测试、数据安全、SDL流程引入等。

本文我们将以围绕系统安全质量提升为目标,讲述在
安全前置扫描
上实践开展过程。

希望通过此篇文章,帮助大家更深入、透彻地了解安全测试,能快速开展安全测试。

二、安全前置实践

1.
工单分析-明确来源

在开展扫描前,首先对现有工单漏洞进行分析。

(1)漏洞来源分析

漏洞占比分布:

开源组件-版本问题、代码扫描 ,这两类占比91%;

这两类主要为
编译时,平台自动调用安全部代码扫描接口发起的扫描

安全部按照
规则
,则
形成漏洞工单下发研发

白盒漏洞分布:

检测分支:master分支、uat分支、test分支等。

即:所有在jdos上进行部署的分支都会进行扫描,扫描出的问题都是工单的产生来源。

JSRC类分析:

外部白帽在JSRC上提交的问题:
https://security.jd.com
,相关部门再下发形成工单。

(2)形成预防措施

通过上述分析,开展的具体措施:

1.
开展前置扫描。
在行云部署编译之前,主动发起安全前置扫描,避免遗漏到线上。并且统一代码安全扫描规则,避免内部扫描过代码仍存在代码扫描类漏洞。

2.
安全质量卡控
。研发测试落实
代码安全扫描
,安全扫描作为上线必备环节,触发形成自动扫描,漏出问题修复后才可进行上线编译。

3.
开展渗透测试。
针对外网系统和内网敏感系统已上线系统开展渗透测试,新需求接入安全SDL安全研发生命周期进行管理。

4.
前置扫描-解决存量

通过对应用代码白盒扫描,应用域名的黑盒扫描,前置识别问题,预防缺陷,减少漏洞。以及在扫描过程中进行工具提效,近一步提高前置识别预防的范围。

(1)代码白盒扫描

①基于流水线源代码安全审计原子的master分支扫描

在部门刚开始做扫描时,使用流水线方式,优先流水线方式,实现持续的集成扫描,流水线主要步骤为:

扫描分支:master分支

触发条件:码提交触发、定时触发

邮件通知:通过邮件进行扫描报告链接下发

问题跟进:人工查看报告-漏洞分类整理-下发任务至研发

总结:

能有效地覆盖master分支的扫描,但是存在的问题是:

覆盖分支有限,造成非master分支漏洞遗漏;

如需新增覆盖分支,则需新建流水线,耗时不变;

人工方式的问题梳理,效率低,易出错。

②活跃分支的预防扫描

部署平台上的编译分支,除master外,其他编译分支也会产生漏洞工单。

仅进行master分支扫描,不能完全预防白盒漏洞问题。

故:抓取活跃分支-提交活跃分支代码扫描-形成全分支扫描覆盖

识别活跃分支:

安全代码扫描平台:

活跃分支扫描结果。

总结:

基于以上,实现了master分支+活跃分支的扫描覆盖,完全覆盖,可完全前置识别白盒漏洞问题。

(2)应用黑盒扫描

Step1

获取域名基于域名、解析IP的黑盒扫描。

**Step2:**白盒漏洞扫描执行:

整理漏洞扫描结果:

(3)提效工具开发

问题
:白盒&黑盒扫描,包含【提交任务-获取结果-漏洞整理-问题下发】的实施步骤,过程中,纯手工操作:时

间长,问题收集、整理,易遗漏&出错 。白盒扫描覆盖率低,遗漏的问题形成工单。

方案
:基于开放接口实现批量提交任务-获取结果-报告整理工具

收益

效率提升:人工4小时->1小时,提效75%

覆盖率提升:master分支->近两周活跃分支+master分支,扫描覆盖率100%,发现更多问题,避免遗漏。

1.
漏洞修复-闭环跟踪

完成白盒和黑盒扫描之后,要将扫出的漏洞推送至研发解决,以及完成漏洞的闭环跟踪验证。

(1)基于行云缺陷跟踪处理

•以应用对应的代码库为维度,进行安全漏洞扫描;

•一个代码库一次扫描出一份报告,报告中展示工程代码当前存在的所有安全类问题;

•每次扫描出的结果会在行云上记录一个问题,反馈到研发接口人,由研发接口人分配到具体研发;

总结:

•基于行云缺陷录入管理,录入过程耗时耗力,未实现自动录入;

•过程不精细;

(2)基于任务批量管理平台进行下发

•扫描完成之后->对问题进行整理->
通过OE接口人(或OE接口)进行批量下发任务;

•研发修复解决;

(3)安全流程建设

•每周测试接口人、研发接口人,组织会议对本周安全工单、漏洞问题进行复盘;

•周二、周四上线日的黑白合扫描的常态化执行,发送安全测试报告邮件;

•每周安全测试周报;每月安全测试月报;

•研发安全自测意识建立,行云部署编译之前,使用平台进行自测;

1.浅析漏洞

(1)扫描原理-污点分析

使用污点分析检测程序漏洞的工作原理如下图所示:

•基于数据流的污点分析:

在不考虑隐式信息流的情况下,可以将污点分析看做针对污点数据的数据流分析。根据污点传播规则跟踪污点信息或者标记路径上的变量污染情况,进而检查污点信息是否影响敏感操作。

•基于依赖关系的污点分析:

考虑隐式信息流,在分析过程中,根据程序中的语句或者指令之间的依赖关系,检查 Sink 点处敏感操作是否依赖于 Source 点处接收污点信息的操作。

参考资料: https://firmianay.gitbooks.io/ctf-all-in-one/content/doc/5.5_taint_analysis.html#基本原理

三、总结

本文我们讲述了体验保障的安全质量提升过程。重点讲述黑盒、白盒的扫描过程。

首先对漏洞工单进行了分析,确定了漏洞的来源、种类、分布,摸清了漏洞的现阶段情况。

然后通过进行安全前置扫描,对工单中的白盒、黑盒问题前置识别。过程中通过开发工具来提升效率,最终形成一套可行的前置开展方案。

但需注意:除了解决存量漏洞问题,还需要新增类问题,需要持续不断地建设,需要实现安全测试的常态化运行。并且要利用更多自动化工具,去进行提效。

本文作者:李伟,社区里大家叫小伟,Apache RocketMQ Committer,RocketMQ Python客户端项目Owner ,Apache Doris Contributor,腾讯云RocketMQ开发工程师。

最近突然感觉:很多软件、硬件在设计上是有root reason的,不是by desgin如此,而是解决了那时、那个场景的那个需求。一旦了解后,就会感觉在和设计者对话,了解他们的思路,学习他们的方法,思维同屏:活到老学到老。

1大家思考

1.1 Consumer Queue Offset是连续的吗, 为什么?

1.2 Commit Log Offset是连续的吗, 为什么?

1.3 Java写的文件,默认是大端序还是小端序,为什么?

2Commit Log真实分布

在大家思考之际, 我们回想下commit log是怎么分布的呢?

在Broker配置的存储根目录下,通过查看Broker实际生成的commit log文件可以看到类似下面的数据文件分布:

图片

Broker真实数据文件存储分布

可以看到,真实的存储文件有多个, 每一个都是以一串类似数字的字符串作为文件名的,并且大小1G。

我们结合源码可以知道,实际的抽象模型如下:

图片

Commit Log存储文件分布抽象

由上图得知:

  • Commit Log是一类文件的称呼,实际上Commit Log文件有很多个, 每一个都可以称为Commit Log文件。

如图中表示了总共有T个Commit Log文件,他们按照由过去到现在的创建时间排列。

  • 每个Commit Log文件都保存消息, 并且是按照消息的写入顺序保存的,并且总是在写创建时间最大的文件,并且同一个时刻只能有一个线程在写。

如图中第1个文件,1,2,3,4...表示这个文件的第几个消息,可以看到第1234个消息是第1个Commit Log文件的最后一个消息,第1235个消息是第2个Commit Log的第1个消息。

说明1:每个Commit Log文件里的全部消息实际占用的存储空间大小<=1G。这个问题大家自行思考下原因。

说明2:每次写Commit Log时, RocketMQ都会加锁,代码片段见
https://github.com/apache/rocketmq/blob/7676cd9366a3297925deabcf27bb590e34648645/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L676-L722

图片

append加锁

我们看到Commit Log文件中有很多个消息,按照既定的协议存储的,那具体协议是什么呢, 你是怎么知道的呢?

3Commit Log存储协议

关于Commit Log存储协议,我们问了下ChatGPT, 它是这么回复我的,虽然不对,但是这个回复格式和说明已经非常接近答案了。

图片

ChatGPT回复

我们翻看源码,具体说明下:
https://github.com/apache/rocketmq/blob/rocketmq-all-4.9.3/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1547-L1587

图片

Commit Log存储协议

我整理后, 如下图;

图片

我理解的Commit Log存储协议

说明1:我整理后的消息协议编号和代码中不是一致的,代码中只是标明了顺序, 真实物理文件中的存储协议会更详细。

说明2:在我写的《RocketMQ分布式消息中间件:核心原理与最佳实践》中,这个图缺少了Body内容,这里加了,也更详细的补充了其他数据。

这里有几个问题需要说明下:

  • 二进制协议存在字节序,也就是常说的大端、小端。大小端这里不详细说明感兴趣的同学自己google或者问ChatGPT,回答肯定比我说的好。

  • 在java中, 一个byte占用1个字节,1个int占用4个字节,1个short占用2个字节,1个long占用8个字节。

  • Host的编码并不是简单的把IP:Port作为字符串直接转化为byte数组,而是每个数字当作byte依次编码。在下一节的Golang代码中会说明。

  • 扩展信息的编码中,使用了不可见字符作为分割,所以扩展字段key-value中不能包含那2个不可见字符。具体是哪2个,大家找找?

我们看到这个协议后,如何证明你的物理文件就是按照这个协议写的呢?

4用Golang解开RocketMQ Commit Log

RocketMQ是用java写的,根据上文描述的存储协议,我用Golang编写了一个工具,可以解开Commit Log和Cosumer Queue,代码地址:

https://github.com/rmq-plus-plus/rocketmq-decoder

这个工具目前支持2个功能:

  • 指定Commit Log位点,直接解析Commit Log中的消息,并且打印。
  • 指定消费位点,先解析Consumer Queue,得到Commit Log Offset后,再根据Commit Log Offset直接解析Commit Log,并且打印。

在Golang中没有依赖RocketMQ的任何代码,纯粹是依靠协议解码。

图片

golang-import

这里贴了一段golang中解析Commit Log Offset的例子:在java中这个offset是一个long类型,占用8个字节。

在golang中,读取8个字节长度的数据,并且按照大端序解码为int64,就可以得到正常的Commit Log Offset。

图片

Golang-demo

我跑了一个demo结果,大家参考:

图片

读取consumer-queue-commit-log

5回答最初的问题

以下为个人见解,大家参考:

1.1 Consumer Queue Offset是连续的吗, 为什么?

是连续的。

consumer queue offset,是指每个queue中索引消息的下标,下标当然是连续的。消费者也是利用了这个连续性,避免消费位点提交空洞的。

每个索引消息占用相同空间,都是20字节,结构如下:

图片

consumer-queue索引消息结构

这里物理位点也就是Commit Log Offset。

1.2 Commit Log Offset是连续的吗, 为什么?

不是连续的。

Commit Log Offset是指的每个消息在全部Commit Log文件中的字节偏移量, 每个消息的大小是不确定的,所以Commit Log Offset,也即是字节偏移量肯定是不一样的。

并且可以知道,每两个偏移量的差的绝对值就是前一个消息的消息字节数总长度。

并且上文中图 “Commit Log存储文件分布抽象”中的有误解,每个小方格的大小其实是不一样的。

1.3 Java写的文件,默认是大端序还是小端序,为什么?

大端序。字节序其实有数据存储顺序和网络传输顺序两种,java中默认用的大端序,保持和网络传输一样,这样方便编解码。

每段网络传输层的数据报文最前面的字节是表达后面的数据是用什么协议传输的,这样数据接收者在接受数据时, 按照字节顺序,先解析协议,再根据协议解码后面的字节序列,符合人类思考和解决问题的方式。

以上是我的理解,有任何问题,可以进社区群细聊。

讨论说明:由于RocketMQ一些版本可能有差异,本文在4.9.3版本下讨论。