springBoot整合rocketMq
springBoot整合rocketMq
因写了一个rocketMQ 发送,有一个消息发送一次,进行一下producer创建,后修改为项目启动创建生产者。
一、项目结构
二、maven相关包: com.alibaba.rocketmq rocketmq-client 3.5.8 com.alibaba.rocketmq rocketmq-all 3.5.8 pom
三、配置文件
application.ymlrockerMq: #Name Server 地址,因为是集群部署 所以有多个用 分号 隔开 nameServer: ip:9876 #主题名称 topic: dddd #一般发送同样消息的Producer,归为同一个Group,应用必须设置,并保证命名唯一 producerGroup: dd #tag 临时值 标签 tags: mm
四、生产者部分
1、ErrorCodeimport java.io.Serializable; /** * @author * @description: * @param: * @return: * @date:2021/9/22 */ public interface ErrorCode extends Serializable { /** * 错误码 * @return */ String getCode(); /** * 错误信息 * @return */ String getMsg(); }
2、RocketMQErrorEnum
/** * @author * @description: * @param: * @return: * @date:2021/9/22 */ public enum RocketMQErrorEnum implements ErrorCode{ /********公共********/ PARAMM_NULL("MQ_001","参数为空"), /********生产者*******/ /********消费者*******/ NOT_FOUND_CONSUMESERVICE("MQ_100","根据topic和tag没有找到对应的消费服务"), HANDLE_RESULT_NULL("MQ_101","消费方法返回值为空"), CONSUME_FAIL("MQ_102","消费失败") ; private String code; private String msg; private RocketMQErrorEnum(String code, String msg) { this.code = code; this.msg = msg; } @Override public String getCode() { return this.code; } @Override public String getMsg() { return this.msg; } }
3、AppException
/** * @author * @description: * @param: * @return: * @date:2021/9/22 */ public class AppException extends RuntimeException { private static final long serialVersionUID = 1L; /** * 错误编码 */ protected ErrorCode errCode; /** * 错误信息 */ protected String errMsg; /** * 无参构造函数 */ public AppException() { super(); } public AppException(Throwable e) { super(e); } public AppException(ErrorCode errCode, String... errMsg) { super(errCode.getMsg()); this.errCode = errCode; setErrMsg(errMsg,true); } public AppException(ErrorCode errCode, String errMsg,Boolean isTransfer) { super(errMsg); this.errCode = errCode; setErrMsg(new String[]{errMsg},isTransfer); } /** * 构造函数 * * @param cause 异常 */ public AppException(ErrorCode errCode, Throwable cause, String... errMsg) { super(errCode.getCode() + errCode.getMsg(), cause); this.errCode = errCode; setErrMsg(errMsg,true); } public ErrorCode getErrCode() { return errCode; } public void setErrCode(ErrorCode errCode) { this.errCode = errCode; } public String getErrMsg() { return this.errMsg; } public void setErrMsg(String[] errMsg,Boolean isTransfer) { if (null != errMsg &&errMsg.length>0) { if(errCode.getMsg().contains("%s") && isTransfer){ this.errMsg = String.format(errCode.getMsg(), errMsg); }else{ StringBuffer sf = new StringBuffer(); for (String msg : errMsg) { sf.append(msg+";"); } this.errMsg = sf.toString(); } }else{ this.errMsg = errCode.getMsg(); } } public static void main(String[] args) { String str = "ERRCode:1004--对象不存在:[%s]"; if (str.contains("%s")){ System.out.println("包含"); } } }
4.RocketMQException
/** * @author * @description: * @param: * @return: * @date:2021/9/22 */ public class RocketMQException extends AppException{ private static final long serialVersionUID = 1L; /** * 无参构造函数 */ public RocketMQException() { super(); } public RocketMQException(Throwable e) { super(e); } public RocketMQException(ErrorCode errorType) { super(errorType); } public RocketMQException(ErrorCode errorCode, String ... errMsg) { super(errorCode, errMsg); } /** * 封装异常 * @param errorCode * @param errMsg * @param isTransfer 是否转换异常信息,如果为false,则直接使用errMsg信息 */ public RocketMQException(ErrorCode errorCode, String errMsg,Boolean isTransfer) { super(errorCode, errMsg,isTransfer); } public RocketMQException(ErrorCode errCode, Throwable cause,String ... errMsg) { super(errCode,cause, errMsg); } }
5、MQProducerConfiguration
import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author * @description: * @param: * @return: * @date:2021/9/22 */ @Slf4j @Configuration public class MQProducerConfiguration { /** * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示 */ @Value("${config.rockerMq.producerGroup}") private String groupName; @Value("${config.rockerMq.nameServer}") private String namesrvAddr; @Bean public DefaultMQProducer getRocketMQProducer() throws RocketMQException { if (StringUtils.isEmpty(this.groupName)) { throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"groupName is blank",false); } if (StringUtils.isEmpty(this.namesrvAddr)) { throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"nameServerAddr is blank",false); } DefaultMQProducer producer; producer = new DefaultMQProducer(this.groupName); producer.setNamesrvAddr(this.namesrvAddr); producer.setVipChannelEnabled(false); //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName //producer.setInstanceName(instanceName); try { producer.start(); log.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]" , this.groupName, this.namesrvAddr)); } catch (MQClientException e) { log.error(String.format("producer is error {}" , e.getMessage(),e)); throw new RocketMQException(e); } return producer; } }
6、SendRocketMqMsg
import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.SendStatus; import com.alibaba.rocketmq.remoting.exception.RemotingException; import com.alibaba.rocketmq.common.message.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * @author taochui * @description: * @param: * @return: * @date:2021/9/22 */ @Slf4j @Service public class SendRocketMqMsg { /**使用RocketMq的生产者*/ @Autowired private DefaultMQProducer defaultMQProducer; @Resource private Environment environment; /** * 发送消息 * * 2018年3月3日 tc * @throws InterruptedException * @throws MQBrokerException * @throws RemotingException * @throws MQClientException */ public boolean send(String msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException{ String topic = environment.getProperty("config.rockerMq.topic").trim(); String tags = environment.getProperty("config.rockerMq.tags").trim(); Message sendMsg = new Message(topic,tags,msg.getBytes()); //默认3秒超时 SendResult sendResult = defaultMQProducer.send(sendMsg); log.info("发送响应信息:"+sendResult.toString()); SendStatus sendStatus = sendResult.getSendStatus(); log.info("sendStatus:" + sendStatus); if(sendStatus != null && "SEND_OK".equals(sendStatus.toString())){ return true; }else{ return false; } } }
7、使用测试,发送消息public class test { @Autowired SendRocketMqMsg sendRocketMqMsg; public void sendTest() { String msg = "test"; boolean status = sendRocketMqMsg.send(msg); } }五、消费者部分。
之前的代码部分新增consumer包。
1、MQConsumerConfigurationimport org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; /** *@Description: 消费者Bean配置 *@Param: *@return: *@Author: *@date: 2021/10/11 **/ @Configuration public class MQConsumerConfiguration { public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class); @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.topics}") private String topics; @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") private int consumeMessageBatchMaxSize; @Autowired private MQConsumeMsgListenerProcessor mqMessageListenerProcessor; @Bean public DefaultMQPushConsumer getRocketMQConsumer() throws RocketMQException { if (StringUtils.isEmpty(groupName)){ throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"groupName is null !!!",false); } if (StringUtils.isEmpty(namesrvAddr)){ throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"namesrvAddr is null !!!",false); } if(StringUtils.isEmpty(topics)){ throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"topics is null !!!",false); } DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.registerMessageListener(mqMessageListenerProcessor); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); /** * 设置消费模型,集群还是广播,默认为集群 */ //consumer.setMessageModel(MessageModel.CLUSTERING); /** * 设置一次消费消息的条数,默认为1条 */ consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); try { /** * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3 */ String[] topicTagsArr = topics.split(";"); for (String topicTags : topicTagsArr) { String[] topicTag = topicTags.split("~"); consumer.subscribe(topicTag[0],topicTag[1]); } consumer.start(); LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr); }catch (MQClientException e){ LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e); throw new RocketMQException(e); } return consumer; } }
2、MQConsumeMsgListenerProcessorimport java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.message.MessageExt; /** *@Description: 消费者消费消息路由 *@Param: *@return: *@Author: *@date: 2021/10/11 **/ @Component public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently{ private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class); /** * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
* 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS */ @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { if(CollectionUtils.isEmpty(msgs)){ logger.info("接受到的消息为空,不处理,直接返回成功"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = msgs.get(0); logger.info("接受到的消息为:"+messageExt.toString()); if(messageExt.getTopic().equals("你的Topic")){ if(messageExt.getTags().equals("你的Tag")){ //TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重) //TODO 获取该消息重试次数 int reconsume = messageExt.getReconsumeTimes(); if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //TODO 处理对应的业务逻辑 } } // 如果没有return success ,consumer会重新消费该消息,直到return success return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
体二代商竣程杀入大满贯正赛商竣程在比赛中1月12日,2023澳网男单资格赛尘埃落定。中国17岁网球新锐商竣程以6比3和6比4击败皮洛斯,晋级澳网正赛。这是商竣程首次参加大满贯资格赛就晋级正赛。商竣程的父母分
美国体操再惹争议,NCAA第一美少女翻车了辣眼睛,美国体操女网红,被十几名男子粗鲁骚扰,高校为其雇佣保安。说起美国体操,这几乎是世界体坛公认的至暗黑洞,纳萨尔事件也是公认的美国体坛第一大丑闻。2018年1月24日,美国臭名
相依相依真心爱你,恋你恋你一生相依我遇到了你便爱上了你,那一刻掩住怦怦心跳,你就是我的念想和企盼,你就是我的等待和追忆,一声问候犹如天籁,一句叮咛犹如春风拂花。花儿烂漫风儿轻柔,红红的红果绿绿的藤蔓,你说看到它就想
爱不如宠,宠不如懂头条创作挑战赛最好的样子前两年网上很火的一句话,就是有人问什么才是爱情最好的样子,下面有个回复说爱不如宠,宠不如懂。有人爱,有人宠,有人懂,这大概就是爱情里最动人的模样了吧。在感情
职场中,你狠心放弃这3点,幸福才会向你靠拢,否则只会让你失望传说一只快乐的白兔自从得到了一个月亮宝贝,为了宝贝不被他人抢走,每天吃不好睡不好,患得患失。为了找寻曾经的快乐,白兔找高人指点,它就不再把月亮占为己有。然后它又变得快乐了。这个小故
暧昧不清的关系不宜长期保持头条创作挑战赛暧昧不清的关系,真的不宜长期保持,一直与女人保持着暧昧关系的人,其实你得注意啦,要是想要改变你长期的习惯,可能是很难改写的。对于这种感情并不是大家所说的,拿得起,放得
斯得谈命运命运是明显的又是神秘的,是一种必然性,比如生老病死是一种偶然性,比如某个人被汽车碾过却毫发无损,走在人行道上却被汽车撞死。命运有迹可循,却难以预知。悟性高的人都知道有命运的存在,但
专注100天写作会收获什么今天是坚持每天写作第100天。日更写作是我2022年做得最正确的一件事,也是坚持得最久的一件事。刚开始的时候,感觉100天太久了,害怕自己坚持不下来,会半途而废,结果还真的坚持下来
我曾试图扼杀我的信仰像保罗那样亲爱的朋友们陷入迷茫的朋友被生活遗弃的朋友被生活折磨的朋友被不公摔倒的朋友被痛苦缠绕的朋友被死亡窥探的朋友一切劳苦担重担的朋友我们将因着爱变成不曾见面胜过见面不曾熟悉胜似熟悉的朋友
关于爱情的高端文案我本来要忘记你了,可我又做梦了。1。你把青涩和挚爱都给了一个人,最后却把生活给了另外一个人,人生的出场顺序真的很重要,爱的深,爱的好,都不如爱的刚刚好。2。当代婚姻的三大劲敌已婚者
韩国政府宣布决定进一步取消中韩来往航班!1月11日,韩国政府下令继续取消中韩之间的来往航班。韩方解释是由于对来自中国旅客采取入境检测等加码措施导致承载力有限,很多地方需特意进行准备,人力和物力不足。所以从釜山济州大邱出发
一夜消失的三大神秘古国头条创作挑战赛华夏地大物博,源远流长,不单单奇能异士颇多,就连其他的事迹也特别多,就说历史有三个很神秘的古国,你知道吗?楼兰遗址第一楼兰古国。它位于新疆罗布泊的西北边,它在1600
美欧联手送乌克兰走上绝路(三)第三步,思想意识文化领域的清洗控制。当美欧资本的抢劫掠夺达到一定程度的时候,总会有反抗的民众出现,要实现长期的进一步的掠夺,必须对乌克兰民众的思想意识,进行清洗,同时还需要制造可见
项羽被蚂蚁害死!垓下之围,项羽乌江自刎,一代英雄就此陨落。关于项羽乌江自刎流传着几个版本。话说项羽兵败被围至乌江边,霸王完全可以乘坐乌江边上仅有的一只船逃走,日后东山再起也不无可能,可霸王脸皮薄爱
泱泱大国千年已去,汉礼的特征与发展引言中国是一个礼仪之邦,有着非常完备的礼仪制度和社会风俗,中国礼教的形成并非是一蹴而就的,而是长期社会引导的结果。直至今日,中国人依旧以汉人自居,由此可见,汉朝的思想文化对国人来说
他曾刀斩日军少将,还是新中国研究院院长,堪称文武双全第一人!一个是中国计量科学院研究院院长,一个是刀斩日军少将的抗日英雄。这两个即截然相反的身份,相信任何人都不会将他们联系在一起。但是在中国近代却恰好出了这样一位文武双全的奇才。文他是国计量
在一个又一个战略错误中,中国第一支近代海军被送上绝路福州,柔远驿,一处独特的墓葬群,这里是一群琉球国流亡者最后的归宿,隐藏着琉球国灭亡的密码。琉球,历史上曾是一个独立的王国,明清两朝,为中国的藩属国,逢新国王登基,需得到中国皇帝的册
诸葛亮去世时,刘禅为何连杀了3位重臣?1年后才发现,刘禅很高明章武三年二月,刘备病重,预感到自己时日无多,于是派人将丞相诸葛亮召来,准备托孤与他。刘备先是狠狠地夸赞了一下诸葛亮,他说你的才能是曹丕的十倍,必然能安邦定国,成就大事。紧接着,刘备
楚河汉界项羽打算划沟而治,刘邦为何要耍赖?读西汉(41)松鼠父子读西汉系列故事(41)鸿沟之约文松鼠爸爸楚汉争霸的精彩故事非常多,由此也衍生了很多典故和成语,其中最著名之一,当属楚河汉界。楚河汉界背后有什么故事?鸿沟之约。话说前203年
唐代宦官的娶妻之风宦官娶妻现象在历史上由来已久屡见不鲜。从现存的史籍来看,官宦大量娶妻最早出现于东汉,常侍黄门亦广妻娶(后汉书刘瑜传),却受到了当时社会舆论的批评与谴责。竖宦之人,亦复虚以形势,威侮
多尔衮为什么自己不称帝而甘愿扶持顺治?1644年,在清军入关前夕,一心想入主中原的皇太极,心中的宏愿还没有实现,竟突然病死,由于继承人还没有确定,现在这皇位是谁的,就出现了难题。按照常理,皇长子豪格,应当继承皇位,可是
是非成败转头空,吴三桂与湖南常宁的一些往事吴三桂,明末清初人,曾经的山海关守将,平西王。常宁,湖南衡阳代管的一个普通县级市。这两者按理来说,很难联系在一起。不过,山不转水转,自从吴三桂在衡阳做了皇帝,于是就跟常宁的一些人和