范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

手把手教你用最少的代码模拟gRPC四种消息交换模式

  一、定义ProtoBuf消息
  我们选择简单的"Hello World"场景进行演示:客户端请求指定一个或者多个名字,回复以"Hello, {Name}!"。为此我们在一个ASP.NET Core应用中定义了如下两个ProtoBuf消息HelloRequest和HelloReply,生成两个同名的消息类型。 syntax = "proto3";  message HelloRequest {   string names = 1; }  message HelloReply {   string message = 1; }二、请求/响应的读写
  gRPC框架的核心莫过于在服务端针对请求消息的读取和对响应消息的写入;以及在客户端针对请求消息的写入和对相应消息的读取。这四个核心功能被实现在如下这两个扩展方法中。如下面的代码片段所示,扩展方法WriteMessageAsync将指定的ProtoBuf消息写入PipeWriter对象中。为了确保消息能够被准确地读取,我们利用前置的四个字节存储了消息的字节数。 public static class ReadWriteExtensions {     public static ValueTask WriteMessageAsync(this PipeWriter writer, IMessage message)     {         var length = message.CalculateSize();         var span = writer.GetSpan(4+length);         BitConverter.GetBytes(length).CopyTo(span);         message.WriteTo(span.Slice(4, length));         writer.Advance(4 + length);         return writer.FlushAsync();     }      public static async Task ReadAndProcessAsync(this PipeReader reader, MessageParser parser, Func handler) where TMessage:IMessage     {         while(true)         {             var result = await reader.ReadAsync();             var buffer = result.Buffer;             while (TryReadMessage(ref buffer, out var message))             {                 await handler(message!);             }             reader.AdvanceTo(buffer.Start, buffer.End);             if(result.IsCompleted)             {                 break;             }         }           bool TryReadMessage(ref ReadOnlySequence buffer, out TMessage? message)         {             if(buffer.Length < 4)             {                 message = default;                 return false;             }              Span lengthBytes = stackalloc byte[4];             buffer.Slice(0,4).CopyTo(lengthBytes);             var length = BinaryPrimitives.ReadInt32LittleEndian(lengthBytes);             if (buffer.Length < length + 4)             {                 message = default;                 return false;             }              message = parser.ParseFrom(buffer.Slice(4, length));             buffer = buffer.Slice(length + 4);             return true;         }     } }
  ReadAndProcessAsync扩展方法从指定的PipeReader对象中读取指定类型的ProtoBuf消息,并利用指定处理器(一个Func委托)对它进行处理。由于写入时指定了消息的字节数,所以我们可以将承载消息的字节"精准地"读出来,并利用指定的MessageParser对其进行序列化。 三、Unary
  我们知道正常的gRPC开发需要将包含一个或者多个操作的服务定义在ProtoBuf文件中,并利用它生成一个基类,我们通过继承这个基类并重写操作对应方法。对于ASP.NET Core gRPC来说,服务操作对应的方法最终会转换成对应的终结点并以路由的形式进行注册。这个过程其实并不复杂,但不是本篇文章关注的终结点。本文会直接注册四个对应的路由终结点来演示四个基本的消息交换模式。
  Unary调用最为简单,就是简单的Request/Reply模式。在如下的代码中,我们注册了一个针对请求路径"/unary"的路由,对应的处理方法为如下所示的HandleUnaryCallAsync。该方法直接调用上面定义的ReadAndProcessAsync扩展方法将请求消息(HelloRequest)从请求的BodyReader中读取出来,并生成一个对应的HelloReply消息予以应答。后者利用上面的WriteMessageAsync扩展方法写入响应的BodyWriter。 using GrpcService; using System.IO.Pipelines; using System.Net;  var app = WebApplication.Create(); app.MapPost("/unary", HandleUnaryCallAsync); await app.StartAsync();  await UnaryCallAsync();  static async Task HandleUnaryCallAsync(HttpContext httpContext) {     var reader = httpContext.Request.BodyReader;     var write = httpContext.Response.BodyWriter;     await reader.ReadAndProcessAsync(HelloRequest.Parser, async hello =>     {         var reply = new HelloReply { Message = #34;Hello, {hello.Names}!" };         await write.WriteMessageAsync(reply);     }); }  static async Task UnaryCallAsync() {     using (var httpClient = new HttpClient())     {         var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/unary")         {             Version = HttpVersion.Version20,             VersionPolicy = HttpVersionPolicy.RequestVersionExact,             Content = new MessageContent(new HelloRequest { Names = "foobar" })         };         var reply = await httpClient.SendAsync(request);         await PipeReader.Create(await reply.Content.ReadAsStreamAsync()).ReadAndProcessAsync(HelloReply.Parser, reply =>         {             Console.WriteLine(reply.Message);             return Task.CompletedTask;         });     } }
  UnaryCallAsync模拟了客户端针对Unary服务操作的调用,具体的调用由我们熟悉的HttpClient对象完成。如代码片段所示,我们针对路由地址创建了一个HttpRequestMessage对象,并对其HTTP版本进行了设置(2.0),代表请求主体内容的HttpContent是一个MessageContent对象,具体的定义如下。MessageContent将代表ProtoBuf消息的IMessage对象作为主体内容,在重写的SerializeToStreamAsync,我们调用上面定义的WriteMessageAsync扩展方法将指定的IMessage对象写入输出流中。 public class MessageContent : HttpContent {     private readonly IMessage _message;     public MessageContent(IMessage message) => _message = message;     protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context)     =>await PipeWriter.Create(stream).WriteMessageAsync(_message);     protected override bool TryComputeLength(out long length)     {         length = -1;         return false;     } }
  创建的HttpRequestMessage对象利用HttpClient发送出去后,我们得到对应的HttpResponseMessage对象,并调用ReadAndProcessAsync扩展方法将主体内容读取出来并反序列化成HelloReply对象,其承载的问候消息将以如下的形式输出到控制台上。
  四、Server Stream
  Server Stream这种消息交换模式意味着服务端可以将内容以流的形式响应给客户端。作为模拟,客户端会携带一个名字列表("foo,bar,baz,qux"),服务端以流的形式针对每个名字回复一个问候消息,具体的实现体现在针对请求路径"/serverstream"的路由处理方法HandleServerStreamCallAsync上。和上面一样,HandleServerStreamCallAsync方法利用我们定义的ReadAndProcessAsync方法读取作为请求的HelloRequest对象,并针对其携带的每一个名气生成一个HelloReply对象,后者最终通过我们定义的WriteMessageAsync方法予以响应。为了体验"流"的效果,我们添加了1秒的时间间隔。 using GrpcService; using System.IO.Pipelines; using System.Net;  var app = WebApplication.Create(); app.MapPost("/unary", HandleUnaryCallAsync); app.MapPost("/serverstream", HandleServerStreamCallAsync); app.MapPost("/clientstream", HandleClientStreamCallAsync); await app.StartAsync();  static async Task HandleServerStreamCallAsync(HttpContext httpContext) {     var reader = httpContext.Request.BodyReader;     var write = httpContext.Response.BodyWriter;     await reader.ReadAndProcessAsync(HelloRequest.Parser, async hello =>     {         var names = hello.Names.Split(",");         foreach (var name in names)         {             var reply = new HelloReply { Message = #34;Hello, {name}!" };             await write.WriteMessageAsync(reply);             await Task.Delay(1000);         }     }); }  static async Task ServerStreamCallAsync() {     using (var httpClient = new HttpClient())     {         var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/serverstream")         {             Version = HttpVersion.Version20,             VersionPolicy = HttpVersionPolicy.RequestVersionExact,             Content = new MessageContent(new HelloRequest { Names = "foo,bar,baz,qux" })         };         var reply = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);         await PipeReader.Create(await reply.Content.ReadAsStreamAsync()).ReadAndProcessAsync(HelloReply.Parser, reply =>         {             Console.WriteLine(#34;[{DateTimeOffset.Now}]{reply.Message}");             return Task.CompletedTask;         });     } }
  模拟客户端调用的ServerStreamCallAsync方法在生成一个携带多个名字的HttpRequestMessage对象,并利用HttpClient将其发送出去。由于服务端是以流的形式对请求进行响应的,所以我们在调用SendAsync方法是将HttpCompletionOption.ResponseHeadersRead枚举作为第二个参数,这样我们才能在收到响应头部之后得到代表响应消息的HttpResponseMessage对象。这样的响应将会携带4个问候消息,我们同样利用ReadAndProcessAsync方法将读取并以如下的形式输出到控制台上。
  五、Client Stream
  Client Stream与Server Stream正好相反,客户端会以流的形式将请求内容提交给服务端进行处理。由于我们以HttpClient来模拟客户端,所以我们只能从HttpRequestMessage上作文章。
  具体来说,我们需要自定义一个HttpContent类型,让它以"客户端流"的形式相对方发送内容。这个自定义的HttpContent就是如下这个ClientStreamContent类型。如代码片段所示,ClientStreamContent是对一个ClientStreamWriter对象的封装,客户端程序利用后者以流的形式向服务端输出TMessage对象承载的内容。对于ClientStreamWriter方法来说,作为输出流的Stream对象是在ClientStreamContent重写的SerializeToStreamAsync方法中指定的。WriteAsync方法利用我们定义的WriteMessageAsync扩展方法实现了针对ProtoBuf消息的输出。客户端通过调用Complete方法决定客户端流是否终结,ClientStreamContent重写的SerializeToStreamAsync通过WaitAsync进行等待。 public class ClientStreamContent : HttpContent where TMessage:IMessage {     private readonly ClientStreamWriter _writer;     public ClientStreamContent(ClientStreamWriter writer)=> _writer = writer;     protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context) => _writer.SetOutputStream(stream).WaitAsync();     protected override bool TryComputeLength(out long length) => (length = -1) != -1; }  public class ClientStreamWriter where TMessage: IMessage {     private readonly TaskCompletionSource _streamSetSource = new();     private readonly TaskCompletionSource _streamEndSuource = new();      public ClientStreamWriter SetOutputStream(Stream outputStream)     {         _streamSetSource.SetResult(outputStream);         return this;     }      public async Task WriteAsync(TMessage message)     {         var stream = await _streamSetSource.Task;         await PipeWriter.Create(stream).WriteMessageAsync(message);     }      public void Complete()=> _streamEndSuource.SetResult();     public Task WaitAsync() => _streamEndSuource.Task; }
  针对Client Stream的模拟体现在针对路径"/clientstream"的路由处理方法HandleClientStreamCallAsync。这个方法没有什么特别之处,它进行时调用ReadAndProcessAsync方法将HelloRequest消息读取出来,并将生成的问候语直接输出到本地(服务端)控制台上而已。 using GrpcService; using System.IO.Pipelines; using System.Net;  var app = WebApplication.Create(); app.MapPost("/unary", HandleUnaryCallAsync); app.MapPost("/serverstream", HandleServerStreamCallAsync); app.MapPost("/clientstream", HandleClientStreamCallAsync); await app.StartAsync();  await ClientStreamCallAsync();  static async Task HandleClientStreamCallAsync(HttpContext httpContext) {     var reader = httpContext.Request.BodyReader;     var write = httpContext.Response.BodyWriter;     await reader.ReadAndProcessAsync(HelloRequest.Parser, async hello =>     {         var names = hello.Names.Split(",");         foreach (var name in names)         {             Console.WriteLine(#34;[{DateTimeOffset.Now}]Hello, {name}!");             await Task.Delay(1000);         }     }); }  static async Task ClientStreamCallAsync() {     using (var httpClient = new HttpClient())     {         var writer = new ClientStreamWriter();         var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/clientstream")         {             Version = HttpVersion.Version20,             VersionPolicy = HttpVersionPolicy.RequestVersionExact,             Content = new ClientStreamContent(writer)         };         _ =  httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);         foreach (var name in new string[] {"foo","bar","baz","qux" })         {             await writer.WriteAsync(new HelloRequest { Names = name});             await Task.Delay(1000);         }         writer.Complete();     } }
  在用于模拟Client Stream调用的ClientStreamCallAsync方法中,我们首先创建了一个ClientStreamWriter对象,并利用它创建了对应的ClientStreamContent对象,后者将作为HttpRequestMessage消息的主体内容。在调用HttpClient的SendAsync方法后,我们并没有作任何等待(否则程序将卡在这里),而是利用ClientStreamWriter对象以流的形式发送了四个请求。服务端在接收到每个请求后,会将对应的问候语以如下的形式输出到控制台上。
  六、Bidirectional Stream
  Bidirectional Stream将连接作为真正的"双工通道"。这次我们不再注册额外的路由,而是直接利用前面模拟Unary的路由终结点来演示双向通信。在如下所示的客户端模拟方法BidirectionalStreamCallAsync中,我们采用上面的方式以流的形式发送了4个HelloRequest。 using GrpcService; using System.IO.Pipelines; using System.Net;  var app = WebApplication.Create(); app.MapPost("/unary", HandleUnaryCallAsync); app.MapPost("/serverstream", HandleServerStreamCallAsync); app.MapPost("/clientstream", HandleClientStreamCallAsync); await app.StartAsync();  await BidirectionalStreamCallAsync();  static async Task BidirectionalStreamCallAsync() {     using (var httpClient = new HttpClient())     {         var writer = new ClientStreamWriter();         var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/unary")         {             Version = HttpVersion.Version20,             VersionPolicy = HttpVersionPolicy.RequestVersionExact,             Content = new ClientStreamContent(writer)         };         var task = httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);         _ = Task.Run(async () =>         {             var response = await task;             await PipeReader.Create(await response.Content.ReadAsStreamAsync()).ReadAndProcessAsync(HelloReply.Parser, reply =>             {                 Console.WriteLine(#34;[{DateTimeOffset.Now}]{reply.Message}");                 return Task.CompletedTask;             });         });          foreach (var name in new string[] { "foo", "bar", "baz", "qux" })         {             await writer.WriteAsync(new HelloRequest { Names = name });             await Task.Delay(1000);         }         writer.Complete();     } }
  于此同时,我们在得到表示响应消息的HttpResponseMessage后,调用ReadAndProcessAsync方法将作为响应的问候语以如下的方式输出到控制台上。

最有包容度的中国美食水城烙锅第99次来凉都,第101次吃烙锅。对于凉都人来说朋友团聚得吃烙锅,伤心失恋得吃烙锅,连政府接待也得吃烙锅走,吃烙锅去,已成了凉都人约饭的惯用语。凉都人外出返乡后的回魂餐不是羊肉粉,网易逆水寒手游推出游戏版ChatGPT交互式电子出版物我在故宫修文物来了ElonMusk需要密切关注人工智能Meta元宇宙指北播报太长不看版网易逆水寒手游推出首个游戏版ChatGPT合肥高新区发布元宇宙产业规划江西三清山与花脸数字深度合作,探讨文旅元宇宙方案故宫出版社将推出首套元宇宙场景交互式电子出版物我在故贵州以村晚为新的文旅支撑点中国文化报记者王彬今年春节,家住上海的徐江想来点不一样的年味体验。经过一番研究,他最后选择了去贵州黔东南州雷山西江千户苗寨。面对我非常陌生的西南地区,去这个少数民族集聚区看村晚赏民时隔四年马蜂窝完成新一轮融资,贵州国资押宝旅游复苏南都记者从马蜂窝方面获悉,2月15日,马蜂窝宣布完成新一轮融资。此轮融资由贵州省创新赋能大数据投资基金贵阳创投贵阳观山湖现代服务业投资基金等机构联合投资。马蜂窝创始人CEO陈罡表示来看花啦!贵州各地繁花盛开春意浓春回大地,贵州各地百花齐放,春意撩人,处处生机盎然美不胜收。在铜仁市思南县塘头镇,油菜花汇聚成了一片金色的海洋,吸引众多游客前来享受阳光,拍照留念。2月中下旬正是塘头油菜花的最佳观怎么回事?在普陀这里,孩子们竟成了书中的主人公!墨洗心灵趣味足,籍涤稚童茁壮长。拾书品籍,是有效亦是有味道的成长填充剂。近日,曹杨新村街道武宁片区组织开展了萤火虫读书会活动。本次活动由曹杨新村街道未保站普陀区乐为社区文化服务中心冥想能帮助孩子专注么?新智注意力的注意力课堂里会有冥想的环节,家长们经常会问,孩子这么小,就开始学习冥想有什么好处呢?冥想,或者说沉思和放松,对缓解焦虑症疼痛治疗高血压哮喘失眠糖尿病抑郁症甚至一些精神性订单已超百万台!德州造智能手机畅销28个一带一路国家视频加载中齐鲁网闪电新闻2月16日讯英望科技是德州一家自主研发自主运营品牌的手机整机制造商,新春伊始,企业进行了产品创新升级,进一步拓宽海外市场。目前,企业正开足马力抓订单拓销路忙音响声音不好听怎么调节?音响系统是我们日常生活中经常使用的设备之一,但有时候我们可能会遇到一些问题,比如声音不好听。在这种情况下,一些简单的调节和优化可以帮助改善音响效果,让我们来了解一下。一检查连接如果MFi认证放当年不可谓不好,时过境迁了,沃尔玛onn。双口墙充拆解前言充电头网淘到了沃尔玛旗下onn。品牌的一款双口充电器,这款充电器为超薄设计,配备折叠插脚,还内置了电源指示灯,便于了解供电状态。充电器为固定5V输出,不支持快充。具备4。8A输这年头胸口没扎几颗钉子,都不好意思自称潮人鼻环眉骨环唇环,试问Rapper潮人的脸上,谁还没打几个洞呢?之前上流君冲浪看到了一条新闻,说这位说唱歌手因为太喜欢这颗粉钻,通过在额头打孔的方式,成功把钻石戴在了额头上。没错,在
vivoS16即将发布!颜值自拍新亮点?vivo近日官宣,S16系列将于12月22日1900发布,S系列定位依旧是主打人像轻薄颜值的拍照手机。四种配色颜如玉烟花玄黑风信紫其中主打配色为颜如玉,有种古色生香的轻盈感,后盖上油改电大对决!豪华纯电SUV捷尼赛思GV70宝马iX3怎么选?随着汽车电动化时代的推进,以及消费者接受程度的日益提高,越来越多经典燃油车型都推出了纯电版本,在豪华品牌这种现象尤为普遍。在豪华中型SUV这个细分市场上,纯电车型的选择越来越多,日体验过小米11的售后,确实比得上国际一线大厂之前一次买小米手机的体验很震撼。小米体验店买了个新手机,用了一个月发现手机边缘缝隙有点大,然后去找售后。结果工作人员二话没说就给换了新手机。这还没完,用了几天总觉得屏幕饱和度不够,为智能机器人打造一款类人的感知皮肤如何烹饪有益健康餐厨垃圾短期存放时的细菌风险跟踪科研成果,掌握最新动态!1为智能机器人打造一款类人的感知皮肤智能机器人如何实现类人的高灵巧操作?触觉传感器在当中起到至关重要的作用。这就好比皮肤之于人,智能机器人的皮肤触觉传感华润三九仁和药业江中药业,你看好谁作为国宝,中药是我国医药产业的重要组成部分,担负着维护人民健康提高民族素质的重要作用,产品也畅销东南亚各国。我国中药企业繁多,在A股上市的就有75家有些中药产品,在中国家喻户晓,比年销7亿件!全球最大羊毛衫集散中心,在这里!(央视财经天下财经)本周,由中央广播电视总台财经节目中心主办的寻百强看中国大型融媒体活动走进浙江桐乡湖北鹤峰福建平潭等地。视频加载中央视财经天下财经栏目视频浙江桐乡年销7亿件小小羊美国将30多家中国企业列入实体清单丨GoingGlobalGoingGlobal出海周报是创业邦推出的出海系列栏目,旨在为出海领域的创业者和投资人精选出海大事件海外大公司投融资消息,本篇为栏目第164篇报道。整理丨赵晓晓编辑丨及轶嵘本周(澳洲人,快撑不住了!对很多澳洲人来说,最艰难的日子,将要来临。一物价2022年澳洲的物价,真的涨疯了,让所有澳洲人,都陷入到生活成本危机,让家庭预算捉襟见肘。未来的支出,将会出现两位数的增长,预计下一搞钱逻辑变了!了解明年经济走向,关乎普通人的3大趋势近期的管理层会议,虽然释放了一系列信号,但核心目标只有一个,就是重塑信心。困境时信心远比黄金更重要,当下的经济遭受供给冲击,需求收缩,预期变弱的影响,再加上外部环境的动荡起伏,不知崩了!2230亿灰飞烟灭!本文转自作者张生所谓去中心化,终究沦为了一场笑话。21世纪人类最大骗局,雪崩已进入倒计时!一hr90后千亿富豪,一夜间彻底凉凉!12月13日,据外媒报道,加密货币交易所FTX创始人大跌418元吨,国际尿素价格持续下滑哈喽,大家好!这里是化肥价格行情!关注我每天看最新尿素复合肥磷铵钾肥价格行情!今天(2022年12月18日)星期天,今天我们说说本周国际方面尿素行情动态本周,国际尿素价格继续下滑,