2024年2月

我想很多人已经体验过GRPC提供的三种流式消息交换(Client Stream、Server Stream和Duplex Stream)模式,在.NET Core上构建的GRPC应用本质上是采用HTTP2/HTTP3协议的ASP.NET Core应用,我们当然也可以在一个普通的ASP.NET Core应用中实现这种流模式。不仅如此,HttpClient也提供了响应的支持,这篇文章通过一个简单的实例提供了相应的实现, 源代码从 这里 下载。

一、双向流的效果
二、[服务端]流式请求/响应的读写
三、[客户端]流式响应/请求的读写

一、双向流的效果

在提供具体实现之前,我们不妨先来演示一下最终的效果。我们通过下面这段代码构建了一个简单的ASP.NET Core应用。如代码片段所示,在调用WebApplication的静态方法CreateBuilder将WebApplicationBuilder创建出来后,我们调用其扩展方法UseKestrel将默认终结点的监听协议设置为
Http1AndHttp2AndHttp3
,这样我们的应用将提供针对不同HTTP协议的全面支持。

var url = "http://localhost:9999";
var builder = WebApplication.CreateBuilder(args);
builder.WebHost
    .UseKestrel(kestrel=> kestrel.ConfigureEndpointDefaults(listen=>listen.Protocols = HttpProtocols.Http1AndHttp2AndHttp3))
    .UseUrls(url);
var app = builder.Build();
app.MapPost("/", httpContext=> HandleRequestAsync(httpContext, async (request, writer) => {
    Console.WriteLine($"[Server]Receive request message: {request}");
    await writer.WriteStringAsync(request);
}));
await app.StartAsync();

await SendStreamRequestAsync(url, ["foo", "bar", "baz", "qux"], reply => {
    Console.WriteLine($"[Client]Receive reply message: {reply}\n");
    return Task.CompletedTask;
});

我们针对根路径(/)注册了一个HTTP方法为POST的路由终结点,终结点处理器调用
HandleRequestAsync
来处理请求。这个方法提供一个Func<string, PipeWriter, Task>类型的参数作为处理器,该委托的第一个参数表示接收到的单条请求消息,PipeWriter用来写入响应内容。在这里我们将接收到的消息进行简单格式化后将其输出到控制台上,随之将其作为响应内容进行回写。

在应用启动之后,我们调用
SendStreamRequestAsync 方法以流的方式发送请求,并处理接收到的响应内容。该方法的第一个参数为请求发送的目标URL,第二个参数是一个字符串数组,我们将以流的方式逐个发送每个字符串。最后的参数是一个Func<string,Task>类型的委托,用来处理接收到的响应内容(字符串),在这里我们依然是将格式化的响应内容直接打印在控制台上。

image

程序启动后控制台上将出现如上图所示的输出,客户端/服务端接收内容的交错输出体现了我们希望的“双向流式”消息交换模式。我们将在后续介绍HandleRequestAsync和
SendStreamRequestAsync
方法的实现逻辑。

二、[服务端]流式请求/响应的读写

HandleRequestAsync方法定义如下。如代码片段所示,我们利用请求的BodyReader和响应的BodyWriter来对请求和响应内容进行读写,它们的类型分别是PipeReader和PipeWriter。在一个循环中,在利用BodyReader将请求缓冲区内容读取出来后,我们将得到的ReadOnlySequence<byte>对象作为参数调用辅助方法TryReadMessage读取单条请求消息,并调用handler参数表示的处理器进行处理。当请求内容接收完毕后,循环终止。

static async Task HandleRequestAsync(HttpContext httpContext, Func<string, PipeWriter, Task> handler)
{
    var reader = httpContext.Request.BodyReader;
    var writer = httpContext.Response.BodyWriter;
    while (true)
    {
        var result = await reader.ReadAsync();
        var buffer = result.Buffer;
        while (TryReadMessage(ref buffer, out var message))
        {
            await handler(message, writer);
        }
        reader.AdvanceTo(buffer.Start, buffer.End);
        if (result.IsCompleted)
        {
            break;
        }
    }
}

由于客户端发送的单条字符串消息长度不限,为了精准地将其读出来,我们需要在输出编码后的消息内容前添加4个字节的整数来表示消息的长度。所以在如下所示的TryReadMessage方法中,我们会先将字节长度读取出来,再据此将消息自身内容读取出来,最终通过解码得到消息字符串。

static bool TryReadMessage(ref ReadOnlySequence<byte> buffer, [NotNullWhen(true)]out string? message)
{
    var reader = new SequenceReader<byte>(buffer);
    if (!reader.TryReadLittleEndian(out int length))
    {
        message = default;
        return false;
    }

    message = Encoding.UTF8.GetString(buffer.Slice(4, length));
    buffer = buffer.Slice(length + 4);
    return true;
}

响应消息的写入是通过如下针对PipeWriter的WriteStringAsync扩展方法实现的,这里的PipeWriter就是响应的BodyWriter,针对“Length + Payload“的消息写入也体现在这里。

public static class Extensions
{
    public static ValueTask<FlushResult> WriteStringAsync(this PipeWriter writer, string content)
    {
        var length = Encoding.UTF8.GetByteCount(content);
        var span = writer.GetSpan(4 + length);
        BitConverter.TryWriteBytes(span, length);
        Encoding.UTF8.GetBytes(content, span.Slice(4));
        writer.Advance(4 + length);
        return writer.FlushAsync();
    }
}

三、[客户端]流式响应/请求的读写

客户端利用HttpClient发送请求。针对HttpClient的请求通过一个HttpRequestMessage对象表示,其主体内容体现为一个HttpContent。流式请求的发送是通过如下这个
StreamContent
类型实现的,它派生于HttpContent。我们重写了SerializeToStreamAsync方法,利用自定义的
StreamContentWriter
将内容写入请求输出流。

public class StreamContent(StreamContentWriter writer) : HttpContent
{
    private readonly StreamContentWriter _writer = writer;
    protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context)
=> _writer.SetOutputStream(stream).WaitAsync(); protected override bool TryComputeLength(out long length) => (length = -1) != -1; } public class StreamContentWriter { private readonly TaskCompletionSource<Stream> _streamSetSource = new(); private readonly TaskCompletionSource _streamEndSource = new(); public StreamContentWriter SetOutputStream(Stream outputStream) { _streamSetSource.SetResult(outputStream); return this; } public async Task WriteAsync(string content) { var stream = await _streamSetSource.Task; await PipeWriter.Create(stream).WriteStringAsync(content); } public void Complete() => _streamEndSource.SetResult(); public Task WaitAsync() => _streamEndSource.Task; }

StreamContentWriter提供了四个方法,SetOutputStream方法用来设置请求输出流,上面重写的SerializeToStreamAsync调用了此方法。单条字符串消息的写入实现在WriteAsync方法中,它最终调用的依然是上面提供的WriteStringAsync扩展方法。整个流式请求的过程通过一个TaskCompletionSource对象提供的Task来表示,当客户端完成所有输出后,会调用Complete方法,该方法进一步调用这个TaskCompletionSource对象的SetResult方法。由于WaitAsync方法返回TaskCompletionSource对象提供的Task,SerializeToStreamAsync方法会调用此方法等待”客户端输出流“的终结。

如下的代码片段体现了SendStreamRequestAsync方法的实现。在这里我们创建了一个表示流式请求的HttpRequestMessage对象,我们将协议版本设置为HTTP2,作为主体内容的HttpContent正式根据StreamContentWriter对象创建的StreamContent对象。

static async Task SendStreamRequestAsync(string url,string[] lines, Func<string, Task> handler)
{
    using var httpClient = new HttpClient();
    var writer = new StreamContentWriter();
    var request = new HttpRequestMessage(HttpMethod.Post, url)
    {
        Version = HttpVersion.Version20,
        VersionPolicy = HttpVersionPolicy.RequestVersionExact,
        Content = new StreamingWeb.StreamContent(writer)
    };
    var task = httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
    _ = Task.Run(async () =>
    {
        var response = await task;
        var reader = PipeReader.Create(await response.Content.ReadAsStreamAsync());
        while (true)
        {
            var result = await reader.ReadAsync();
            var buffer = result.Buffer;
            while (TryReadMessage(ref buffer, out var message))
            {
                await handler(message);
            }
            reader.AdvanceTo(buffer.Start, buffer.End);
            if (result.IsCompleted)
            {
                break;
            }
        }
    });

    foreach (string line in lines)
    {
        await writer.WriteAsync($"{line} ({DateTimeOffset.UtcNow})");
        await Task.Delay(1000);
    }
    writer.Complete();
}

我们将这个HttpRequestMessage作为请求利用HttpClient发送出去,实际上发送的内容最终是通过调用StreamContentWriter对象的WriteAsync方法输出的,我们每隔1秒发送一条消息。HttpClient将请求发出去之后会得到一个通过HttpResponseMessage对象表示的响应,在一个异步执行的Task中,我们根据响应流创建一个PipeReader对象,并在一个循环中调用上面定义的TryReadMessage方法逐条读取接收到的单条消息进行处理。

写在开头

Java的集合世界中主要由List,Set,Queue,Map构成,我们在之前的博文中已经学习了List,接下来我们继续学习Set集合。
Set特点:存取无序,不可以存放重复的元素,不可以用下标对元素进行操作

HashSet

作为Set容器的代表子类,HashSet经常被用到,我们通过源码去分析它

【源码查看】

public class HashSet<E>
    extends AbstractSet<E>
    implements Set<E>, Cloneable, java.io.Serializable
{
    private transient HashMap<E,Object> map;

    // Dummy value to associate with an Object in the backing Map
    private static final Object PRESENT = new Object();

    public HashSet() {
        map = new HashMap<>();
    }

    public boolean add(E e) {
        return map.put(e, PRESENT)==null;
    }

    public boolean remove(Object o) {
        return map.remove(o)==PRESENT;
    }
}

虽然HashSet实现了Set接口,但通过源码我们能够看到,它的底层逻辑实现其实依据的是HashMap,通过操作map的key值来实现元素的增删改查,下面通过一个小测试类去用下HashSet。

【代码示例1】

public class Test {
    public static void main(String[] args) throws FileNotFoundException {
        // 创建一个新的HashSet
        HashSet<Integer> set = new HashSet<>();
        // 添加元素
        set.add(3);
        set.add(4);
        set.add(0);
        set.add(1);
        set.add(4);

        // 输出HashSet的元素个数
        System.out.println("HashSet size: " + set.size());

        // 判断元素是否存在于HashSet中
        boolean containsWanger = set.contains(2);
        System.out.println(containsWanger);

        // 删除元素
        boolean removeWanger = set.remove(1);
        System.out.println(set);

        // 修改元素,需要先删除后添加
        boolean removeChenmo = set.remove(3);
        boolean addBuChenmo = set.add(4);
        System.out.println(removeChenmo && addBuChenmo);

        // 输出修改后的HashSet
        System.out.println(set);
    }
}

输出:

HashSet size: 4
false
[0, 3, 4]
false
[0, 4]

由代码结果进一步证明了我们的结论:
1、存储数据不重复,但add重复数据并不报错,原因是第一个数据会被第二次重复数据覆盖掉;
2,无序,很多人发现输出了一个有序的数字集合,这个其实与我们所说的有序是有区别的,在
Set中的有序无序是指输入的顺序与输出的顺序是否一致
当然,想要实现有序可以通过LinkedHashSet,底层通过链表记录元素插入顺序。

面试考点
这里面其实包含着一个小小的Java面试考点,曾经有面试官问过这样的一个问题:

集合中的无序性和不可能重复性的什么意思?

  • 无序性:所谓无序性不等于随机性,也不等于输出无序,就如同上面我们看到的向HashSet中随机添加数字,输出是从大到小,看似有序,实际此序非彼序!真正的无序性是指存储的数据在底层数组中并非按照数组索引的顺序添加 ,而是根据数据的哈希值进行判断。
  • 不可重复性:指添加的元素按照 equals() 判断时 ,返回 false,因此,实现不可重复性,必须要同时重写 equals() 方法和 hashCode() 方法。

LinkedHashSet

那么有的小伙伴会问了:“我就想存一个不重复的数据集合,同时又想要他们有序怎么办呢?”,Java的开发人员已经早就为你想到了,这个办法就是用
LinkedHashSet

LinkedHashSet 是基于 LinkedHashMap 实现的,并且使用链表维护了元素的插入顺序,具有快速查找、插入和删除操作的优点,又可以维护元素的插入顺序!源码就不带大家看了,咱们直接上测试案例。

【代码示例2】

LinkedHashSet<String> set = new LinkedHashSet<>();
// 添加元素
set.add("Hello");
set.add("Java");
set.add("Build");
set.add("Java");
System.out.println(set);
// 删除元素
set.remove("Hello");

// 修改元素
set.remove("Java");
set.add("java");

// 查找元素
boolean bool = set.contains("Build");
System.out.println("Build哥:" + bool);

//输出
System.out.println(set);

输出:

[Hello, Java, Build]
Build哥:true
[Build, java]

通过输出结果我们可以得出结论:LinkedHashSet中的元素不可重复,有序。

TreeSet

通过上面两个集合类我们大概能够猜到,几乎所有的Set集合的底层都是通过Map去实现,TreeSet同样是基于TreeMap实现,TreeMap 基于红黑树实现,所以TreeSet也就自带了排序功能。

 public TreeSet() {
        this(new TreeMap<E,Object>());
    }

【代码示例3】

public class Test {
    public static void main(String[] args) {
        // 创建一个 TreeSet 对象
        TreeSet<Integer> set = new TreeSet<>();
        set.add(3);
        set.add(6);
        set.add(2);
        set.add(1);
        set.add(0);
        set.add(9);
        System.out.println(set);
    }
}

输出:

[0, 1, 2, 3, 6, 9]

总结

  1. HashSet、LinkedHashSet 和 TreeSet 都是 Set 接口的实现类,都能保证元素唯一,并且都不是线程安全的。
  2. HashSet、LinkedHashSet 和 TreeSet 的主要区别在于底层数据结构不同。HashSet 的底层数据结构是哈希表(基于 HashMap 实现)。LinkedHashSet 的底层数据结构是链表和哈希表,元素的插入和取出顺序满足 FIFO。TreeSet 底层数据结构是红黑树,元素是有序的,排序的方式有自然排序和定制排序。
  3. 底层数据结构不同又导致这三者的应用场景不同。HashSet 用于不需要保证元素插入和取出顺序的场景,LinkedHashSet 用于保证元素的插入和取出顺序满足 FIFO 的场景,TreeSet 用于支持对元素自定义排序规则的场景。
  4. 此外,HashSet、LinkedHashSet允许有 null 值,TreeSet不允许有null值,当向 TreeSet 插入 null 元素时,TreeSet 使用 compareTo 方法与 null 元素进行比较,报错:java.lang.NullPointerException。

结尾彩蛋

如果本篇博客对您有一定的帮助,大家记得
留言+点赞+收藏
呀。原创不易,转载请联系Build哥!