手把手教你用最少的代码模拟gRPC四种消息交换模式
一、定义ProtoBuf消息
我们选择简单的"Hello World"场景进行演示:客户端请求指定一个或者多个名字,回复以"Hello, {Name}!"。为此我们在一个ASP.NET Core应用中定义了如下两个ProtoBuf消息HelloRequest和HelloReply,生成两个同名的消息类型。 syntax = "proto3"; message HelloRequest { string names = 1; } message HelloReply { string message = 1; }二、请求/响应的读写
gRPC框架的核心莫过于在服务端针对请求消息的读取和对响应消息的写入;以及在客户端针对请求消息的写入和对相应消息的读取。这四个核心功能被实现在如下这两个扩展方法中。如下面的代码片段所示,扩展方法WriteMessageAsync将指定的ProtoBuf消息写入PipeWriter对象中。为了确保消息能够被准确地读取,我们利用前置的四个字节存储了消息的字节数。 public static class ReadWriteExtensions { public static ValueTask WriteMessageAsync(this PipeWriter writer, IMessage message) { var length = message.CalculateSize(); var span = writer.GetSpan(4+length); BitConverter.GetBytes(length).CopyTo(span); message.WriteTo(span.Slice(4, length)); writer.Advance(4 + length); return writer.FlushAsync(); } public static async Task ReadAndProcessAsync(this PipeReader reader, MessageParser parser, Func handler) where TMessage:IMessage { while(true) { var result = await reader.ReadAsync(); var buffer = result.Buffer; while (TryReadMessage(ref buffer, out var message)) { await handler(message!); } reader.AdvanceTo(buffer.Start, buffer.End); if(result.IsCompleted) { break; } } bool TryReadMessage(ref ReadOnlySequence buffer, out TMessage? message) { if(buffer.Length < 4) { message = default; return false; } Span lengthBytes = stackalloc byte[4]; buffer.Slice(0,4).CopyTo(lengthBytes); var length = BinaryPrimitives.ReadInt32LittleEndian(lengthBytes); if (buffer.Length < length + 4) { message = default; return false; } message = parser.ParseFrom(buffer.Slice(4, length)); buffer = buffer.Slice(length + 4); return true; } } }
ReadAndProcessAsync扩展方法从指定的PipeReader对象中读取指定类型的ProtoBuf消息,并利用指定处理器(一个Func委托)对它进行处理。由于写入时指定了消息的字节数,所以我们可以将承载消息的字节"精准地"读出来,并利用指定的MessageParser对其进行序列化。 三、Unary
我们知道正常的gRPC开发需要将包含一个或者多个操作的服务定义在ProtoBuf文件中,并利用它生成一个基类,我们通过继承这个基类并重写操作对应方法。对于ASP.NET Core gRPC来说,服务操作对应的方法最终会转换成对应的终结点并以路由的形式进行注册。这个过程其实并不复杂,但不是本篇文章关注的终结点。本文会直接注册四个对应的路由终结点来演示四个基本的消息交换模式。
Unary调用最为简单,就是简单的Request/Reply模式。在如下的代码中,我们注册了一个针对请求路径"/unary"的路由,对应的处理方法为如下所示的HandleUnaryCallAsync。该方法直接调用上面定义的ReadAndProcessAsync扩展方法将请求消息(HelloRequest)从请求的BodyReader中读取出来,并生成一个对应的HelloReply消息予以应答。后者利用上面的WriteMessageAsync扩展方法写入响应的BodyWriter。 using GrpcService; using System.IO.Pipelines; using System.Net; var app = WebApplication.Create(); app.MapPost("/unary", HandleUnaryCallAsync); await app.StartAsync(); await UnaryCallAsync(); static async Task HandleUnaryCallAsync(HttpContext httpContext) { var reader = httpContext.Request.BodyReader; var write = httpContext.Response.BodyWriter; await reader.ReadAndProcessAsync(HelloRequest.Parser, async hello => { var reply = new HelloReply { Message = #34;Hello, {hello.Names}!" }; await write.WriteMessageAsync(reply); }); } static async Task UnaryCallAsync() { using (var httpClient = new HttpClient()) { var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/unary") { Version = HttpVersion.Version20, VersionPolicy = HttpVersionPolicy.RequestVersionExact, Content = new MessageContent(new HelloRequest { Names = "foobar" }) }; var reply = await httpClient.SendAsync(request); await PipeReader.Create(await reply.Content.ReadAsStreamAsync()).ReadAndProcessAsync(HelloReply.Parser, reply => { Console.WriteLine(reply.Message); return Task.CompletedTask; }); } }
UnaryCallAsync模拟了客户端针对Unary服务操作的调用,具体的调用由我们熟悉的HttpClient对象完成。如代码片段所示,我们针对路由地址创建了一个HttpRequestMessage对象,并对其HTTP版本进行了设置(2.0),代表请求主体内容的HttpContent是一个MessageContent对象,具体的定义如下。MessageContent将代表ProtoBuf消息的IMessage对象作为主体内容,在重写的SerializeToStreamAsync,我们调用上面定义的WriteMessageAsync扩展方法将指定的IMessage对象写入输出流中。 public class MessageContent : HttpContent { private readonly IMessage _message; public MessageContent(IMessage message) => _message = message; protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context) =>await PipeWriter.Create(stream).WriteMessageAsync(_message); protected override bool TryComputeLength(out long length) { length = -1; return false; } }
创建的HttpRequestMessage对象利用HttpClient发送出去后,我们得到对应的HttpResponseMessage对象,并调用ReadAndProcessAsync扩展方法将主体内容读取出来并反序列化成HelloReply对象,其承载的问候消息将以如下的形式输出到控制台上。
四、Server Stream
Server Stream这种消息交换模式意味着服务端可以将内容以流的形式响应给客户端。作为模拟,客户端会携带一个名字列表("foo,bar,baz,qux"),服务端以流的形式针对每个名字回复一个问候消息,具体的实现体现在针对请求路径"/serverstream"的路由处理方法HandleServerStreamCallAsync上。和上面一样,HandleServerStreamCallAsync方法利用我们定义的ReadAndProcessAsync方法读取作为请求的HelloRequest对象,并针对其携带的每一个名气生成一个HelloReply对象,后者最终通过我们定义的WriteMessageAsync方法予以响应。为了体验"流"的效果,我们添加了1秒的时间间隔。 using GrpcService; using System.IO.Pipelines; using System.Net; var app = WebApplication.Create(); app.MapPost("/unary", HandleUnaryCallAsync); app.MapPost("/serverstream", HandleServerStreamCallAsync); app.MapPost("/clientstream", HandleClientStreamCallAsync); await app.StartAsync(); static async Task HandleServerStreamCallAsync(HttpContext httpContext) { var reader = httpContext.Request.BodyReader; var write = httpContext.Response.BodyWriter; await reader.ReadAndProcessAsync(HelloRequest.Parser, async hello => { var names = hello.Names.Split(","); foreach (var name in names) { var reply = new HelloReply { Message = #34;Hello, {name}!" }; await write.WriteMessageAsync(reply); await Task.Delay(1000); } }); } static async Task ServerStreamCallAsync() { using (var httpClient = new HttpClient()) { var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/serverstream") { Version = HttpVersion.Version20, VersionPolicy = HttpVersionPolicy.RequestVersionExact, Content = new MessageContent(new HelloRequest { Names = "foo,bar,baz,qux" }) }; var reply = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead); await PipeReader.Create(await reply.Content.ReadAsStreamAsync()).ReadAndProcessAsync(HelloReply.Parser, reply => { Console.WriteLine(#34;[{DateTimeOffset.Now}]{reply.Message}"); return Task.CompletedTask; }); } }
模拟客户端调用的ServerStreamCallAsync方法在生成一个携带多个名字的HttpRequestMessage对象,并利用HttpClient将其发送出去。由于服务端是以流的形式对请求进行响应的,所以我们在调用SendAsync方法是将HttpCompletionOption.ResponseHeadersRead枚举作为第二个参数,这样我们才能在收到响应头部之后得到代表响应消息的HttpResponseMessage对象。这样的响应将会携带4个问候消息,我们同样利用ReadAndProcessAsync方法将读取并以如下的形式输出到控制台上。
五、Client Stream
Client Stream与Server Stream正好相反,客户端会以流的形式将请求内容提交给服务端进行处理。由于我们以HttpClient来模拟客户端,所以我们只能从HttpRequestMessage上作文章。
具体来说,我们需要自定义一个HttpContent类型,让它以"客户端流"的形式相对方发送内容。这个自定义的HttpContent就是如下这个ClientStreamContent类型。如代码片段所示,ClientStreamContent是对一个ClientStreamWriter对象的封装,客户端程序利用后者以流的形式向服务端输出TMessage对象承载的内容。对于ClientStreamWriter方法来说,作为输出流的Stream对象是在ClientStreamContent重写的SerializeToStreamAsync方法中指定的。WriteAsync方法利用我们定义的WriteMessageAsync扩展方法实现了针对ProtoBuf消息的输出。客户端通过调用Complete方法决定客户端流是否终结,ClientStreamContent重写的SerializeToStreamAsync通过WaitAsync进行等待。 public class ClientStreamContent : HttpContent where TMessage:IMessage { private readonly ClientStreamWriter _writer; public ClientStreamContent(ClientStreamWriter writer)=> _writer = writer; protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context) => _writer.SetOutputStream(stream).WaitAsync(); protected override bool TryComputeLength(out long length) => (length = -1) != -1; } public class ClientStreamWriter where TMessage: IMessage { private readonly TaskCompletionSource _streamSetSource = new(); private readonly TaskCompletionSource _streamEndSuource = new(); public ClientStreamWriter SetOutputStream(Stream outputStream) { _streamSetSource.SetResult(outputStream); return this; } public async Task WriteAsync(TMessage message) { var stream = await _streamSetSource.Task; await PipeWriter.Create(stream).WriteMessageAsync(message); } public void Complete()=> _streamEndSuource.SetResult(); public Task WaitAsync() => _streamEndSuource.Task; }
针对Client Stream的模拟体现在针对路径"/clientstream"的路由处理方法HandleClientStreamCallAsync。这个方法没有什么特别之处,它进行时调用ReadAndProcessAsync方法将HelloRequest消息读取出来,并将生成的问候语直接输出到本地(服务端)控制台上而已。 using GrpcService; using System.IO.Pipelines; using System.Net; var app = WebApplication.Create(); app.MapPost("/unary", HandleUnaryCallAsync); app.MapPost("/serverstream", HandleServerStreamCallAsync); app.MapPost("/clientstream", HandleClientStreamCallAsync); await app.StartAsync(); await ClientStreamCallAsync(); static async Task HandleClientStreamCallAsync(HttpContext httpContext) { var reader = httpContext.Request.BodyReader; var write = httpContext.Response.BodyWriter; await reader.ReadAndProcessAsync(HelloRequest.Parser, async hello => { var names = hello.Names.Split(","); foreach (var name in names) { Console.WriteLine(#34;[{DateTimeOffset.Now}]Hello, {name}!"); await Task.Delay(1000); } }); } static async Task ClientStreamCallAsync() { using (var httpClient = new HttpClient()) { var writer = new ClientStreamWriter(); var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/clientstream") { Version = HttpVersion.Version20, VersionPolicy = HttpVersionPolicy.RequestVersionExact, Content = new ClientStreamContent(writer) }; _ = httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead); foreach (var name in new string[] {"foo","bar","baz","qux" }) { await writer.WriteAsync(new HelloRequest { Names = name}); await Task.Delay(1000); } writer.Complete(); } }
在用于模拟Client Stream调用的ClientStreamCallAsync方法中,我们首先创建了一个ClientStreamWriter对象,并利用它创建了对应的ClientStreamContent对象,后者将作为HttpRequestMessage消息的主体内容。在调用HttpClient的SendAsync方法后,我们并没有作任何等待(否则程序将卡在这里),而是利用ClientStreamWriter对象以流的形式发送了四个请求。服务端在接收到每个请求后,会将对应的问候语以如下的形式输出到控制台上。
六、Bidirectional Stream
Bidirectional Stream将连接作为真正的"双工通道"。这次我们不再注册额外的路由,而是直接利用前面模拟Unary的路由终结点来演示双向通信。在如下所示的客户端模拟方法BidirectionalStreamCallAsync中,我们采用上面的方式以流的形式发送了4个HelloRequest。 using GrpcService; using System.IO.Pipelines; using System.Net; var app = WebApplication.Create(); app.MapPost("/unary", HandleUnaryCallAsync); app.MapPost("/serverstream", HandleServerStreamCallAsync); app.MapPost("/clientstream", HandleClientStreamCallAsync); await app.StartAsync(); await BidirectionalStreamCallAsync(); static async Task BidirectionalStreamCallAsync() { using (var httpClient = new HttpClient()) { var writer = new ClientStreamWriter(); var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/unary") { Version = HttpVersion.Version20, VersionPolicy = HttpVersionPolicy.RequestVersionExact, Content = new ClientStreamContent(writer) }; var task = httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead); _ = Task.Run(async () => { var response = await task; await PipeReader.Create(await response.Content.ReadAsStreamAsync()).ReadAndProcessAsync(HelloReply.Parser, reply => { Console.WriteLine(#34;[{DateTimeOffset.Now}]{reply.Message}"); return Task.CompletedTask; }); }); foreach (var name in new string[] { "foo", "bar", "baz", "qux" }) { await writer.WriteAsync(new HelloRequest { Names = name }); await Task.Delay(1000); } writer.Complete(); } }
于此同时,我们在得到表示响应消息的HttpResponseMessage后,调用ReadAndProcessAsync方法将作为响应的问候语以如下的方式输出到控制台上。
杭州有一趟慢火车,直达内蒙古,全程有32站我们常说,坐上火车去拉萨。那么,有没有坐上火车去内蒙的?答案是真的有!话说,杭州就有一趟列车,从杭州站始发,终到呼市(呼和浩特),而且,票价也十分亲民!真的假的?当然是真的啦!不信
一份问卷知天下未来经济的主线是什么?都说群众的眼睛是雪亮的,那就看考验一下就业者们的眼光吧。下面是关于就业的一个问卷调查你以后择业选择去哪?任泽平的一份问卷调查这是一个很有意思的问卷,如果再增加一个选项躺平,那就更有
广东黑白矮的成长速度让人失望,本赛季可能无缘季后赛广东这赛季困难重重,外援重组,内战吃紧,锋线走了周鹏,替补还是那几个杜润旺张昊勉强能用,王薪凯出去转了转一圈没人要又回来了,新人张明池徐昕还难堪大用,这种情况就要三个后卫承担更多,
三星杯韩方抽签够硬气!首轮强强对碰,打算一举歼灭中方精锐?中方精锐绝无冷场场场硬战!充斥着虾兵蟹将的三星杯,头一轮对阵竟迎来中韩主力大对抗。第27届三星杯首轮对阵表中韩主力大对碰(棋手白字为世冠)韩方小觑了三星杯之男唐韦星?或是已无可战之
灵蛇手镯你说你怕蛇,黄金编织的蛇你怕吗?灵蛇造型的珠宝,早在古埃及时期就备受追捧,灵蛇自古以来就是智慧重生和活力的象征,更是永恒的符号。而到了19世纪,灵蛇元素则代表着爱意永恒,常常出现在
沈梦辰亮相晚会,穿一条蓝色灯笼袖连衣裙,恬静又减龄头条创作挑战赛沈梦辰不仅主持功力深厚,穿衣搭配也很前卫时髦呢,她经常在社交软件上晒出自己的穿搭look,都特别好看。尤其是出席活动主持晚会或者主持节目的时候,她更是会精心打扮一番,
三个简单的缓解堆积的黑色素方法!简单有效一通过口服VC片补充维生素C广泛存在于水果蔬菜等食物中,具有调节色素代谢的功能,当出现色素沉着性皮肤病,通常可给患者口服大剂量维生素C二涂抹氢醌乳膏氢醌乳膏是临床经常用于祛除黄褐斑
一直以为葫芦裤很显胖,看到赵雅芝的搭配才发现,我错了现如今,有很多人都以为所有的葫芦裤都很显胖,直到看了赵雅芝的搭配造型之后,许多上了年纪的女人都直呼原来我错了!赵雅芝作为无数人的童年女神,不仅颜值和身材都是娱乐圈之中数一数二的级别
杨幂太美了吧?很有气质杨幂真的是白皙的皮肤,窈窕好身材,秀出妩媚性感。长发飘飘,充满了贵气与魅力,气质太好了吧?显得很优雅纤细的身材,靓丽俊俏的脸蛋,气质优雅,容貌惊艳是我年少时期,无论遇见多少次都会怦
BellaHadid最新施华洛世奇广告BellaHadid与施华洛世奇一起迎接节日的到来。施华洛世奇以BellaHadid作为其2022年假日活动的代言人,其标语是。在这个节日里,与施华洛世奇一起开启奇迹。在广告活动的
首家全球波司登体验店空降上海,沉浸式开启羽宙之旅魔都猝不及防的以5G速度降温BUT,这个冬天不太冷!专注羽绒服46年的波司登迎来了魔都首家全球品牌体验店让我们一起走进羽绒服专家波司登近距离感受行业TOP的硬核产品共赴多元羽宙的奇