范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

python并发编程管道与队列实现进程间通信

  1.1 队列实现进程间通信1.1.1 Queue常用类
  类
  说明
  Queue([maxsize])
  创建共享的进程队列,maxsize是队列中允许的最大项,省略则队列无大小限制 1.1.2 Queue常用实例q常用方法
  方法
  说明
  q.cancel_join_thread()
  不会在进程退出时自动连接后台线程,可防止join_thread()方法阻塞
  q.join_thread()
  连接队列的后台线程。此方法用于q.close()方法之后,等待所有队列项被消耗完。调用q.cancel_join_thread()方法可以禁止这种行为
  q.close()
  关闭队列。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据
  q.empty()
  如调用此方法时q为空,返回True。此方法返回结果不可靠,在返回和使用结果之间,队列有可能已经加入新的项
  q.full()
  如调用此方法时q已满,返回Ture。同样此方法返回结果不可靠
  q.get([block [, timeout]])
  返回q中的一项,如果q为空,此方法将阻塞,直到队列中有项可用为止。block用于控制阻塞行为,默认为True。如果设为False,将引发Queue.Empty异常,timeout为可选超时时间
  q.get_nowait()
  等同于 q.get(False)方法
  q.put(item [, block [, timeout]])
  将item放入队列中,如果队列已满,此方法将阻塞至有可用空间为止。block控制阻塞行为,默认为True。如果设置为False,将会引发Queue.Full异常。timeout指定在阻塞模式中等待可用空间的时间长短,超时后将引发Queue.Full异常
  q.put_nowait(item)
  等同于q.put(item, False)方法
  q.qsize()
  返回队列中项的数量,结果不可靠 1.1.3 JoinableQueue
  类
  说明
  JoinableQueue([maxsize])
  创建可连接的共享进程队列,类似Queue的一个对象,但它允许项的消费者通知生产者项已被成功处理 1.1.4 JoinableQueue 实例除Queue实例方法处的方法
  方法
  说明
  q.task_done()
  消费者使用此方法发出信号,表示q.get()返回项已经被成功处理,如果调用此方法的次数大于从队列中删除的项的数量,将引发ValueError异常
  q.join()
  生产者使用此方法进行阻塞,直到队列中的所有项均被处理。阻塞将持续到为队列中的每个项均被调用q.task_done()方法为止 1.1.5 JoinableQueue实现进程间通信示例import multiprocessing   def consumer(input_q):      """ 消费者     """     while True:         item = input_q.get()         # 处理项         print(f"处理input_q: {item}")         # 发出信号通知任务完成         input_q.task_done()   def producer(sequence, output_q):     """ 生产者     """     for item in sequence:         output_q.put(item)   if __name__ == "__main__":     q = multiprocessing.JoinableQueue()          # 运行消费者进程     cons_p = multiprocessing.Process(target=consumer, args=(q,))     cons_p.daemon = True     cons_p.start()      # 运行生成者进程     sequence = (1,2,3,4)     producer(sequence, q)      # 等待所有项被处理完成     q.join()   1.1.6 JoinableQueue实现多个消费都进程池间通信示例from msilib import sequence import multiprocessing   def consumer(input_q):      """ 消费者     """     while True:         item = input_q.get()         # 处理项         print(f"处理input_q: {item}")         # 发出信号通知任务完成         input_q.task_done()   def producer(sequence, output_q):     """ 生产者     """     for item in sequence:         output_q.put(item)   if __name__ == "__main__":     q = multiprocessing.JoinableQueue()      # 多个进程运行消费者模型     cons_p_1 = multiprocessing.Process(target=consumer, args=(q, ))     # 进程1处理项     cons_p_1.daemon = True     cons_p_1.start()      cons_p_2 = multiprocessing.Process(target=consumer, args=(q, ))     # 进程2处理项     cons_p_2.daemon = True     cons_p_2.start()      # 运行生产者     sequence = (1,2,3,4)     producer(sequence, q)      # 等待所有项被处理     q.join()   1.1.7 哨兵模式实现多进程间通信示例from ast import arg from audioop import mul from msilib import sequence import multiprocessing import sqlite3 from tkinter.messagebox import NO   def consumer(input_q):     while True:         item = input_q.get()         # 处理哨兵         if item is None:             break         # 处理项         print(f"处理input_q: {item}")     # 关闭     print("consumer 处理完成!")   def producer(sequence, output_q):     for item in sequence:         output_q.put(item)       if __name__ == "__main__":     q = multiprocessing.Queue()      # 启动消费者进程     cons_p = multiprocessing.Process(target=consumer, args=(q, ))     cons_p.start()      # 生产者     sequence = (1,2,3,4)     producer(sequence, q)     # 在队列上安置哨并     q.put(None)     # 等待消费者进程关闭     cons_p.join()
  注意:  如果使用哨兵,一定要在队列上为每个消费都都安置哨兵,才能保证所有消费者进程都能关闭。 1.2 管道实现进程间通信1.2.1 管道类
  类
  说明
  Pipe([duplex])
  在进程之间创建一条管道,并返回元组(conn1, conn2),其中conn1和conn2表示管道两端的Connection对象。默认情况下,管道是双向的。如果将duplex置为False,conn1只能用于接收,conn2只能用于发送。必须在创建和启动使用管道的Process对象之前调用Pipe()方法 1.2.2 Pipe()返回的Connection对象实例c具有的方法和属性
  方法
  说明
  c.close()
  关闭连接。如果c被垃圾回收,将自动调用此方法
  c.fileno()
  返回连接使用的整数文件描述符
  c.pull([timeout])
  如果连接上的数据可用,返回True,timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout置为None,操作将无限期地等待数据到达
  c.recv()
  接收c.send()方法返回的对象。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常
  c.recv_bytes([maxlength])
  接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常
  c.recv_bytes_into(buffer [, offset])
  接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常
  c.send(obj)
  通过连接发送对象。obj是与序列化兼容的任意对象
  c.send_bytes(buffer [, offset [, size]])
  通过连接发送字节数据缓冲区。buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收 1.2.3 管道实现生产者消费都模型示例import multiprocessing   def consumer(pipe):     output_p, input_p = pipe     input_p.close()  # 关闭管道的输入端     while True:         try:             item = output_p.recv()         except EOFError:             break         # TODO 可替换为有用的工作         print(item)     # 关闭     print("consumer done.")   def producer(sequence, input_p):     for item in sequence:         input_p.send(item)   if __name__ == "__main__":     # 创建管道     (output_p, input_p) = multiprocessing.Pipe()      # 启动消费者进程     cons_p = multiprocessing.Process(target=consumer, args=((output_p, input_p),))     cons_p.start()      # 关闭生产者中的输出管道     output_p.close()      # 生产项     sequence = (1,2,3,4)     producer(sequence, input_p)      # 半闭输入管道,表示输入完成     input_p.close()      # 等待消费者进程结束     cons_p.join()
  说明:如果生产者或者消费者中都没有使用管道的某个端点,就就将其关闭。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者中也关闭了相同的管道端点 1.2.4 管道双向通信示例import multiprocessing from unittest import result   def adder(pipe):     server_p, client_p = pipe     client_p.close()     while True:         try:             x,y = server_p.recv()         except EOFError:             break         result = x + y  # TODO 可用实例业务代替         server_p.send(result)     # 关闭     print("Server done.")   if __name__ == "__main__":     # 生成管道     (server_p, client_p) = multiprocessing.Pipe()          # 启动服务器进程     adder_p = multiprocessing.Process(target=adder, args=((server_p, client_p),))     adder_p.start()          # 关闭客户端中的服务器管道     server_p.close()          # 客户端发出请求     client_p.send((3,4))     print(client_p.recv())      client_p.send(("Hello", "World"))     print(client_p.recv())      # 完成,关闭管道     client_p.close()      # 等待消费者进程完成     adder_p.join()

2022年云南省脱贫群众人均纯收入增长15。9本报昆明2月21日电(记者沈靖然)近日,记者从云南省乡村振兴局长会议上获悉2022年,云南全省脱贫群众人均纯收入达14147元,增长15。9。2022年,云南省政府救助平台收到群众Jennie的芭蕾裙火了!比渣女裙时髦,比仙女裙好看!头条创作挑战赛前阵子Jennie世巡不仅带来了个人新曲solo舞台ILoveYouMe,还掀起了一股新的时尚风潮Balletcore(芭蕾风)!(图源金橘妮萌)有一说一,jenni新时代新征程新伟业来齐云山做逍遥客成为旅游者新宠大皖新闻讯天下无双胜境,江南第一名山的齐云山既有皇家道场的显赫,也有道教名山的威名。朱熹理学与王阳明心学在此赓续嬗变徐霞客游记与唐伯虎碑刻在此交相辉映它是临川才子汤显祖一生痴绝处的怎么把电脑上的图片分类管理?不想桌面这么乱了!我们在电脑上办公或者学习的时候,很多人会把将来会用到的图片,一键保存到电脑桌面上,方便自己及时查找。时间长了就会发现,电脑桌面密密麻麻全是图片图标,图标从屏幕左侧铺到右侧,自己的精浙大研究每天多吃一个蛋,心脏病和癌症死亡风险或升高?真的吗少吃点鸡蛋,不仅胆固醇高,还容易伤血管,致心脏病!王奶奶又开始唠叨儿子了。小张近几个月迷上了健身,想要练出肌肉,每天都运动强度很大。为了补充蛋白质,他每天都要吃上好几个鸡蛋。但王奶热火是签下勒夫的领跑者,后者还将和76人会谈骑士和凯文勒夫已经正式完成买断,还宣布将退役勒夫的0号球衣。ESPN报道,热火是签下勒夫的领跑者。勒夫在正式做出决定前,还计划跟76人会谈。勒夫缺席了全明星赛前的最后12场比赛,因里弗斯湖人给我3年2800万,是对我不信任,我能值3年4500万对于湖人这个赛季来说,能留下一批年轻的球员是一个非常重要要做的事情,虽然摆脱了威少这个大合同,湖人的薪酬空间也不是很宽裕,比如詹姆斯,戴维斯的顶级合同。最近湖人年轻小将里弗斯对记者涩气满满,这么会打擦边球的国产剧不多了影视杂谈开年几部现象级爆款,掀起了内娱叔圈热潮。相比于靠颜值出道的小鲜肉,叔圈顶流大多在行业里摸爬滚打多年。丰富的生活经历岁月沉淀下的成熟思想真诚低调接地气的待人方式都让他们在演技竞拍第三日玛尔斯赐你一死!它将为您带来永恒的胜利与银币!竞拍活动第3天!各位坦克世界的玩家们大家好,竞拍活动今天已经进行到第3天了,这几天中每天我们都会为您带来1个特别拍品,直到2月19日。这些拍品都拥有独特的玩法和很高的收藏价值。您可什么样的人会说不买华为,就是不爱国?什么样的人才会说出不买华为就是不爱国,这样的话?这句话从字面上看,都懂是什么意思,但是好像又不懂在表达什么意思。华为被制裁三年以来,触动了很多国人的心,那种永不放弃,坚韧不拔,破除梦幻西游高宝图挖出70的愤怒晶清,价值3。2亿,能再挖70个牌子游戏的意义就在于它能够给人带来快乐,大家好,我是小三,每天给大家分享游戏中的八卦趣事。高宝图挖出70的愤怒晶清,价值3。2亿,能再挖70个牌子在高级藏宝图玩法中大家可以获得的最高价
2月9日复盘天下风云我辈出,关注主线人工智能汽车芯片老师们晚上好,我出手了!昨天的复盘基本上都中标了,根据昨天的复盘,上证指数这里大概率要反弹,而北向资金此前持续买入建仓后并没有出货动作,预计会等一个他们认为指数调整到位的节点再次引多年补牙经验总结,小心口腔护理危害禁忌!今天想跟大家谈一下自己补牙的经历,因为我的牙质不是很好,又喜欢吃甜食,所以很容易出现蛀牙问题,过去几年陆陆续续补了五颗牙,后来在牙医的指导下进行了更科学的口腔护理,才脱离了反复补牙流血的仕途(一)天下第一打工人上班是在北上广深还是回家乡?2200多年前,有一个人也遇到了这样一个问题,启发他想到这个问题是源于他的一个小实验。这人是楚国上蔡地方的一个管粮仓的小公务员,名字叫李斯,平时最喜欢做一口气让你读懂西汉史,明犯强汉者,虽远必诛汉朝到底有多强本期就让我带领大家,一口气读懂西汉史。从公元前202年汉高祖刘邦建国开始,到公元220年曹丕(p)篡(cun)汉为止,汉朝一共经历了29位帝王,享国共计405年。史学上又将汉朝分为加密500多年,横跨几个世纪至今仍未破解的加密语言古代玛雅从16世纪西班牙士兵用火枪征服了曾经辉煌一时的玛雅文明后。玛雅的神庙书籍信仰全都被焚烧破坏。这个人口一度达到上百万,拥有自己文化,语言,历史,数学,天文和世界观的古代文明最终倒在了3000多年前赫梯帝国因何毁灭?国际最新研究称或重大干旱导致安纳托利亚中部一处丘陵以及附近的风景和作物(图片来自JohnMarston)。施普林格自然供图中新网北京2月9日电(记者孙自法)3000多年前曾强盛一时的赫梯帝国因何突然崩溃毁灭?隋炀帝的历史功劳隋炀帝杨广,不管是演义小说还是历史记载,给人的印象都是,荒淫无道,好大喜功。他三征高句丽,开通大运河滥用民力。使天下民怨四起。最终揭竿而起。荒淫无道这点也存有疑点,隋炀帝有记载的妃中国历史上的冷知识01hr众所周知,1662年郑成功收复了被荷兰侵占38年的台湾,台湾重归中国版图。但绝大多数人都不知道,其实后来明郑真正统一台湾要到几年之后。1662年郑成功死后,清荷联军攻占金厦2022年中国锂电正极材料出货194。7万吨磷酸铁锂渗透率近60摘要2022年,中国锂电池正极材料出货量为194。7万吨,同比大幅增长77。97。其中磷酸铁锂正极材料出货量114。2万吨,同比增长150。99,在整个正极材料中的市场份额已经上升抗日时期百姓为了不出卖八路同胞,全村遭到屠戮1943年抗日时期,日军吧一个男人关进了地牢里,3天后鬼子要杀了他,但是这个男人很不甘心,想着哪怕是死了也要拉一个日本鬼子当垫背的,于是看准时机他抢过来了日军手里的铁锤。那时日军占他北宋时期出生于山西,从无名的小卒得到范仲淹的赏识,终成大将他叫狄青,出生于北宋时期的汾州西河,即今山西省汾河市。经十余年立无数战功得以位极权密副使。宝元二年(1038),西夏王李元昊派遣使者向宋朝上表,试图让宋朝以土地和金钱换取和平,为保