范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

Vue3。0Tornado6。1发布订阅模式打造异步非阻塞实时通信聊天系统

  "表达欲"是人类成长史上的强大"源动力",恩格斯早就直截了当地指出,处在蒙昧时代即低级阶段的人类,"以果实、坚果、根作为食物;音节清晰的语言的产生是这一时期的主要成就"。而在网络时代人们的表达欲往往更容易被满足,因为有聊天软件的存在。通常意义上,聊天大抵都基于两种形式:群聊和单聊。群聊或者群组聊天我们可以理解为聊天室,可以有人数上限,而单聊则可以认为是上限为2个人的特殊聊天室。
  为了开发高质量的聊天系统,开发者应该具备客户机和服务器如何通信的基本知识。在聊天系统中,客户端可以是移动应用程序(C端)或web应用程序(B端)。客户端之间不直接通信。相反,每个客户端都连接到一个聊天服务,该服务支撑双方通信的功能。所以该服务在业务上必须支持的最基本功能:
  1.能够实时接收来自其他客户端的信息。
  2.能够将每条信息实时推送给收件人。
  当客户端打算启动聊天时,它会使用一个或多个网络协议连接聊天服务。对于聊天服务,网络协议的选择至关重要,这里,我们选择Tornado框架内置Websocket协议的接口,简单而又方便,安装tornado6.1 pip3 install tornado==6.1
  随后编写程序启动文件main.py: import tornado.httpserver import tornado.websocket  import tornado.ioloop  import tornado.web  import redis  import threading  import asyncio  # 用户列表 users = []  # websocket协议 class WB(tornado.websocket.WebSocketHandler):   	# 跨域支持 	def check_origin(self,origin):  		return True  	# 开启链接 	def open(self):                  users.append(self)   	# 接收消息 	def on_message(self,message):  		self.write_message(message["data"])  	# 断开 	def on_close(self):  		users.remove(self) # 建立torando实例  app = tornado.web.Application(  	[  	(r"/wb/",WB)  	],debug=True  )  if __name__ == "__main__":   	# 声明服务器 	http_server_1 = tornado.httpserver.HTTPServer(app)  	# 监听端口 	http_server_1.listen(8000)  	# 开启事件循环 	tornado.ioloop.IOLoop.instance().start()
  如此,就在短时间搭建起了一套websocket协议服务,每一次有客户端发起websocket连接请求,我们都会将它添加到用户列表中,等待用户的推送或者接收信息的动作。
  下面我们需要通过某种形式将消息的发送方和接收方联系起来,以达到"聊天"的目的,这里选择Redis的发布订阅模式(pubsub),以一个demo来实例说明,server.py import redis  r = redis.Redis() r.publish("test","hello")
  随后编写 client.py: import redis r = redis.Redis() ps = r.pubsub() ps.subscribe("test")   for item in ps.listen():      if item["type"] == "message":         print(item["data"])
  可以这么理解:订阅者(listener)负责订阅频道(channel);发送者(publisher)负责向频道(channel)发送二进制的字符串消息,然后频道收到消息时,推送给订阅者。
  频道不仅可以联系发布者和订阅者,同时,也可以利用频道进行"消息隔离",即不同频道的消息只会给订阅该频道的用户进行推送:
  根据发布者订阅者逻辑,改写main.py: import tornado.httpserver import tornado.websocket  import tornado.ioloop  import tornado.web  import redis  import threading  import asyncio  # 用户列表 users = []  # 频道列表 channels = ["channel_1","channel_2"]   # websocket协议 class WB(tornado.websocket.WebSocketHandler):   	# 跨域支持 	def check_origin(self,origin):  		return True  	# 开启链接 	def open(self):   		users.append(self)   	# 接收消息 	def on_message(self,message):  		self.write_message(message["data"])  	# 断开 	def on_close(self):  		users.remove(self)       # 基于redis监听发布者发布消息 def redis_listener(loop):  	asyncio.set_event_loop(loop)  	async def listen():   		r = redis.Redis(decode_responses=True)  		# 声明pubsb实例 		ps = r.pubsub()  		# 订阅聊天室频道  		ps.subscribe(["channel_1","channel_2"])   		# 监听消息 		for message in ps.listen():  			print(message)  			# 遍历链接上的用户 			for user in users:  				print(user)  				if message["type"] == "message" and message["channel"] == user.get_cookie("channel"):   					user.write_message(message["data"])  	future = asyncio.gather(listen()) 	loop.run_until_complete(future)    # 接口  发布信息 class Msg(tornado.web.RequestHandler):   	# 重写父类方法 	def set_default_headers(self):  		# 设置请求头信息 		print("开始设置") 		# 域名信息 		self.set_header("Access-Control-Allow-Origin","*") 		# 请求信息 		self.set_header("Access-Control-Allow-Headers","x-requested-with") 		# 请求方式 		self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")  	  	# 发布信息 	async def post(self):  		data = self.get_argument("data",None)  		channel = self.get_argument("channel","channel_1")  		print(data)  		# 发布 		r = redis.Redis()  		r.publish(channel,data)  		return self.write("ok")   # 建立torando实例  app = tornado.web.Application(  	[  	(r"/send/",Msg), 	(r"/wb/",WB)  	],debug=True  )  if __name__ == "__main__":   	loop = asyncio.new_event_loop()  	# 单线程启动订阅者服务 	threading.Thread(target=redis_listener,args=(loop,)).start()   	# 声明服务器 	http_server_1 = tornado.httpserver.HTTPServer(app)  	# 监听端口 	http_server_1.listen(8000)  	# 开启事件循环 	tornado.ioloop.IOLoop.instance().start()
  这里假设默认有两个频道,逻辑是这样的:由前端控制websocket链接用户选择将消息发布到那个频道上,同时每个用户通过前端cookie的设置具备频道属性,当具备频道属性的用户对该频道发布了一条消息之后,所有其他具备该频道属性的用户通过redis进行订阅后主动推送刚刚发布的消息,而频道的推送只匹配订阅该频道的用户,达到消息隔离的目的。
  需要注意的一点是,通过线程启动redis订阅服务时,需要将当前的loop实例传递给协程对象,否则在订阅方法内将会获取不到websocket实例,报这个错误: IOLoop.current() doesn"t work in non-main
  这是因为Tornado底层基于事件循环ioloop,而同步框架模式的Django或者Flask则没有这个问题。
  下面编写前端代码,这里我们使用时下最流行的vue3.0框架,编写chat.vue:      
  这里前端在线客户端定期向状态服务器发送心跳事件。如果服务端在特定时间内(例如x秒)从客户端接收到心跳事件,则认为用户处于联机状态。否则,它将处于脱机状态,脱机后在阈值时间内可以进行重新连接的动作。同时利用vant框架的标签页可以同步切换频道,切换后将频道标识写入cookie,便于后端服务识别后匹配推送。
  效果是这样的:
  诚然,功能业已实现,但是如果我们处在一个高并发场景之下呢?试想一下如果一个频道有10万人同时在线,每秒有100条新消息,那么后台tornado的websocket服务推送频率是100w*10/s = 1000w/s 。
  这样的系统架构如果不做负载均衡的话,很难抗住压力,那么瓶颈在哪里呢?没错,就是数据库redis,这里我们需要异步redis库aioredis的帮助: pip3 install aioredis
  aioredis通过协程异步操作redis读写,避免了io阻塞问题,使消息的发布和订阅操作非阻塞。
  此时,可以新建一个异步订阅服务文件main_with_aioredis.py: import asyncio import aioredis from tornado import web, websocket from tornado.ioloop import IOLoop import tornado.httpserver import async_timeout
  之后主要的修改逻辑是,通过aioredis异步建立redis链接,并且异步订阅多个频道,随后通过原生协程的asyncio.create_task方法(也可以使用asyncio.ensure_future)注册订阅消费的异步任务reader: async def setup():     r = await aioredis.from_url("redis://localhost", decode_responses=True)     pubsub = r.pubsub()      print(pubsub)     await pubsub.subscribe("channel_1","channel_2")      #asyncio.ensure_future(reader(pubsub))     asyncio.create_task(reader(pubsub))
  在订阅消费方法中,异步监听所订阅频道中的发布信息,同时和之前的同步方法一样,比对用户的频道属性并且进行按频道推送: async def reader(channel: aioredis.client.PubSub):     while True:         try:             async with async_timeout.timeout(1):                 message = await channel.get_message(ignore_subscribe_messages=True)                 if message is not None:                     print(f"(Reader) Message Received: {message}")                      for user in users:                          if user.get_cookie("channel") == message["channel"]:                              user.write_message(message["data"])                          await asyncio.sleep(0.01)         except asyncio.TimeoutError:             pass
  最后,利用tornado事件循环IOLoop传递中执行回调方法,将setup方法加入到事件回调中: if __name__ == "__main__":      # 监听端口     application.listen(8000)      loop = IOLoop.current()     loop.add_callback(setup)     loop.start()
  完整的异步消息发布、订阅、推送服务改造 main_aioredis.py: import asyncio import aioredis from tornado import web, websocket from tornado.ioloop import IOLoop import tornado.httpserver import async_timeout  users = []  # websocket协议 class WB(tornado.websocket.WebSocketHandler):       # 跨域支持     def check_origin(self,origin):          return True      # 开启链接     def open(self):           users.append(self)       # 接收消息     def on_message(self,message):          self.write_message(message["data"])      # 断开     def on_close(self):          users.remove(self)   class Msg(web.RequestHandler):       # 重写父类方法     def set_default_headers(self):          # 设置请求头信息         print("开始设置")         # 域名信息         self.set_header("Access-Control-Allow-Origin","*")         # 请求信息         self.set_header("Access-Control-Allow-Headers","x-requested-with")         # 请求方式         self.set_header("Access-Control-Allow-Methods","POST,GET,PUT,DELETE")       # 发布信息     async def post(self):          data = self.get_argument("data",None)          channel = self.get_argument("channel","channel_1")          print(data)          # 发布         r = await aioredis.from_url("redis://localhost", decode_responses=True)          await r.publish(channel,data)          return self.write("ok")   async def reader(channel: aioredis.client.PubSub):     while True:         try:             async with async_timeout.timeout(1):                 message = await channel.get_message(ignore_subscribe_messages=True)                 if message is not None:                     print(f"(Reader) Message Received: {message}")                      for user in users:                          if user.get_cookie("channel") == message["channel"]:                              user.write_message(message["data"])                          await asyncio.sleep(0.01)         except asyncio.TimeoutError:             pass   async def setup():     r = await aioredis.from_url("redis://localhost", decode_responses=True)     pubsub = r.pubsub()      print(pubsub)     await pubsub.subscribe("channel_1","channel_2")      #asyncio.ensure_future(reader(pubsub))     asyncio.create_task(reader(pubsub))   application = web.Application([     (r"/send/",Msg),     (r"/wb/", WB), ],debug=True)       if __name__ == "__main__":      # 监听端口     application.listen(8000)      loop = IOLoop.current()     loop.add_callback(setup)     loop.start()
  从程序设计角度上讲,充分利用了协程的异步执行思想,更加地丝滑流畅。
  结语:实践操作来看,Redis发布订阅模式,非常契合这种实时(websocket)通信聊天系统的场景,但是发布的消息如果没有对应的频道或者消费者,消息则会被丢弃,假如我们在生产环境在消费的时候,突然断网,导致其中一个订阅者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息也将不会存在,所以如果想要保证系统的健壮性,还需要其他服务来设计高可用的实时存储方案,不过那就是另外一个故事了,最后奉上项目地址,与众乡亲同飨:https://github.com/zcxey2911/tornado_redis_vue3_chatroom

张馨予与何捷过情人节,腹部隆起明显疑似有孕,撞脸赵丽颖引热议饿了吗?戳右边关注我们,每天给您送上最新出炉的娱乐硬核大餐!2月14日,演员张馨予在社交平台上晒出一段视频,并配文称今天要告诉你一件事。视频中,张馨予腹部隆起明显,疑似有孕,引起网最高法院院长周强加强法院文艺创作,讲好中国法治故事南都讯记者刘嫚发自北京2月14日上午,电视剧底线研讨会在京召开。本次研讨会由中国法官文学艺术联合会中国电视艺术委员会和中共湖南省委宣传部共同主办,最高人民法院党组书记院长周强出席研歌声起太行张健冬日清晨,河北阜平县龙泉关镇骆驼湾村迎来日出,村民在为游客讲解太行山独特的日照冰瀑景观。记者李舸摄地处太行山深处的河北省阜平县是革命老区,是当年晋察冀边区政府所在地。2012年闪电侠预告45个彩蛋解析超人死亡,闪电侠成为最终反派闪电侠发布了第二支正式预告,我会把预告中这些被打乱的片段,以电影的时间顺序重新排列,为大家带来一支清晰完整的闪电侠预告解析!首先,影片一开始是巴里和本蝙一起执行任务,闪电侠和海王2人民日报用奋斗点亮人生的春天原标题用奋斗点亮人生的春天(金台潮声)在农历兔年,希望全国人民特别是广大青年像动如脱兔般奋跃而上飞速奔跑,在各行各业竞展风流尽显风采。习近平总书记在二二三年春节团拜会上的殷殷嘱托,地评线中安时评唱好春耕备耕重头戏,让春天的约定更动人一年之计在于春。眼下,全国春耕工作从南到北陆续展开。在河南安徽等地,农技人员深入田间指导生产,农业机械助力高效田管作业,三农工作者在春耕备耕中把藏粮于技落实到实处。春耕夏耘秋收冬藏新书新媒体技术赋权与文化民主化的产物网红是如何炼成的网红是指在现实或者网络生活中,因为某个事件或者某个行为被网民关注从而走红的人或长期持续输出专业知识而走红的人。网红现象是新媒体技术赋权与文化民主化的产物,但在当前的互联网生态中,网昌邑市饮马镇做好三个第一按下复工复产加速键大众网海报新闻记者高森贵通讯员侯楚秋潍坊报道当好送策上门第一人。积极发动29名服务企业专员和34名镇村干部对重点企业进行走访,解读复工复产安全规范,帮助发布招工信息,力保企业安全复老年人的夫妻生活,可以坚持到多大年龄?3个建议需要听一听生病了就自己去医院啊,打个车,走几步就到了,我可没空送你!听到这话,苏阿姨很是心寒。都说一日夫妻百日恩,可60岁以后,丈夫对自己却越来越冷漠。苏阿姨和丈夫在年轻的时候也经常吵吵闹闹随州二中被授予湖北省新时代好少年强国有我主题教育读书活动先进集体称号随州二中被授予主题教育读书活动先进集体称号近期,省教育厅关工委就2022年度新时代好少年强国有我主题教育读书活动授予表彰,表彰了一批湖北省新时代好少年强国有我主题教育读书活动中先进3岁男孩卷入车底接下来的一幕近日,在南漳县九集镇一乡村道路上发生惊险一幕。一辆越野车经过路口时将一名3岁男孩卷入车底,而庆幸的是小孩被大人从车底拉出后只是轻微擦伤,身体并无大碍。视频加载中1月18日下午2点多
职工什么时间点离职,可以在下家公司接上社保和公积金?这个跟你几时离职没有必然联系,社保能不能续得上,取决于前后两家公司人力资源同事的工作效率,以及你找工作的速度。首先,离职之后,你的社保要在上一家公司顺利的停缴,才可以让下一家公司继一周时间从内江自驾游到梵净山怎么安排路线比较合理?谢谢风雨彩虹130418940邀请,内江到梵净山自驾游,如果单单转这一个地方来回跑一趟不太划算啊,毕竟往返也有一千多公里了,下面有几条线路,基本上是环线,仅供参考。一内江泸州赤水遵大量恐龙化石已被发现,为何传言说唯独没有发现进化中的人类化石?在北京自然博物馆,二楼有一个人之起源厅。按照达尔文进化论。人首先是从水里诞生的,又成为两栖动物。上岸,上树。下树。这一段时间。大约需要二千万年才能进化成类人猿。展板上明确写着。这二参加廉价旅行团是什么体验?我62年的,12年年底退休开始有空旅游了。第一次出门就是收到我们当地一个旅行社信息,昆大丽双飞六日游,价格460元,有年龄要求2858岁。我和老姐两人一起报名了,家人都担心说,不要三体中出现了几个文明,具体有什么表现?就凭我的记忆说一说了。1。人类文明这当然是我们最熟悉的一种文明。在书里,又可以分为地球人类和星舰人类两个分支。地球人类太阳系中唯一的文明,在书中最后发展到了太空时代,标志性行为是可和极速版有什么区别?为什么不合并?今日头条和今日头条极速版都是北京字节公司旗下的产品,两者的定位不一样1今日头条主要是面向创作者,创作者可以在今日头条发文章视频微头条问答等,并且可以从中获得收益,而今日头条极速版更如何鉴别新买的手机是不是改装机或返修机?首先检查外包装和包装盒内的新机是否全新配件齐全之类不需要废话。鉴于如今做假技术的飞跃,包装和手机全新也并不一定能够保证手机不是翻新机或是改装机,那么这时候就需要稍微费点心思来检测检满大街的热敷减肥,针灸减肥,拔罐减肥有用吗?热敷针灸拔罐这些号称排油便活血化瘀通筋活络等绿色减肥方法非常吸引人,很多想减肥的人觉得不用忍饥挨饿,不用辛苦运动,遭点针刺拔罐的皮肉之苦就能减肥是再好不过了。可是如果你仔细问针灸拔健身期间吃了蛋白粉一定会长肌肉吗?可以肯定的告诉你,吃蛋白粉有助于肌肉增长,但前提是你的训练没问题,如果训练不到位,蛋白粉没少吃,那就长不了多少肌肉,光长肥肉了!看了您这个问题,老胡不禁想起了自己刚开始健身的时候,取消公积金对大部分人来说真的利大于弊吗?大部分人真的想取消公积金吗?说取消公积金的,就是对公积金一点也不了解,第一,如果将来你买房,贷款上可以比一般贷款省下很多钱,注意是很多,第二,如果不买房,就当每月固定的给自己存点钱,将来退休取出来,也是一笔不什么情况才到了不得不离职的地步?遇到以下10种情况还不离职的人,大写的服,不接受反驳!第种,身体告急,隔三差五生病!我有个朋友,姓林,属牛,他在公司勤勤恳恳10年,全公司都知道他是出了名的加班狂人,经常凌晨23点