RabbitMQ消息的发送和消费

RabbitMQ

Posted by HH on July 21, 2021

RabbitMQ

一、初始化队列

rabbitmqctl add_vhost /path
rabbitmqctl add_user name password
rabbitmqctl set_permissions -p /path name ".*" ".*" ".*"
rabbitmqctl set_policy ha-two ".*_#ha#_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' -p /path
##历史遗留问题,导致部分没有带#ha#的队列也需要做mirror
rabbitmqctl set_policy ha-cluster-2 "^#cluster#_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' -p /path

二、时间key命名

  • com.系统名.一级模块.二级模块.事件名称/三级模块名.事件名称(子事件名称).事件结果
  • 子事件没有时,默认使用“default”作为占位符替代
  • 事件结果,可自行定义

三、如何使用

1.产生事件

//创建发送者
EventSender es = EventCenterFactory.createSender(new String[]{"127.0.0.1"},EventPriority.NORMAL);
//异步发送-data为json string
es.asyncPub("com.lemonhh.status.server.signup.email.success",data);
es.syncPub("com.lemonhh.status.server.signup.email.success",data);
  • 发送方式区别
    • 异步发送:交给后台线程发送,发送者不关心发送结果。事件有丢失的可能。
    • 同步发送:发送失败即可返回,发送者需要根据返回结果自行处理事件发送失败的后续事宜。
  • 事件级别
    • 事件定义了3个级别:NONE/NORMAL/HIGH。默认情况下使用NORMAL。
    • 如果事件时消息量比较大的那种,比如:实时股价信息、日志等,请使用NONE,以减轻服务器负担。

2.消费事件

//创建pull方式的接收者
PullEventReceiver pullEventReceiver = EventCenterFactory.createPullReceiver(new String[]{"192.168.1.11"},"queue_name",HaLevel.NONE,false);
//绑定要监听的事件
pullEventReceriver.binding(new String[]{"com.lemonhh.#"});
//pull方式接收者获取事件,方法会阻塞。通过循环方式获取多条消息
Event e = pullEventReceiver.next();
//push方式接收者获取事件
pushEventReceiver.binding(new String[]{"com.lemonhh.#"},new PushProcessor(){
	public void process(Event e){
		System.out.println("Push : " + e)
	}
})
  • 两种接收方式
    • push方式:使用方便。实际上如果不设置channel.basicQos(1),那么broker端会一次推送多条数据 RabbitMQ的每一数据帧(Frame)都是以0xCE结尾。
请求来源source 请求目的地destination 请求信息info 备注remarks
client server Protocol-Header 0-9-1 发送请求
server client Connection.Start 服务端开始建立连接
client server Connection.Start-Ok 回复Ok
server client Connection.Tune 服务端确认客户端状态
client server Connection.Tune-Ok 回复Ok
client server Connection.Open vhost=/ vhost像是虚拟机,用于资源隔离,拥有自己的交换机、队列、绑定等。调整好后,客户端主动请求将start状态变为opening状态
server client Connection.Open-Ok 回复Ok
client server Basic.Qos  
server client Basic.Qos-Ok  
client server Basic.Consume q=queue.pk 指定订阅的队列
server client Basic.Consume-Ok Basic.Deliver x=exchangePk rk=pk Content-Header type=text/plain Content-Body… Exchange, routing-key,Content-Header
client server Basic.Ack 收到信息
client server Channel.Close reply=ok 关闭Channel,回复Ok
server client Basic.Deliver x=exchangePk rk=pk Content-Header type=text/plain Content-Body  
server client Channel.Close-ok  
client server Connection.Close reply=ok  
server client Connection.Close-OK  
  • pull方式:可以设置等待超时,适合非阻塞下的场景。
请求来源source 请求目的地destination 请求信息info 备注remarks
client server Protocol-Header 0-9-1 发送请求
server client Connection.Start 服务端开始建立连接
client server Connection.Start-Ok 回复Ok
server client Connection.Tune 服务端确认客户端状态
client server Connection.Tune-Ok 回复Ok
client server Connection.Open vhost=/ vhost像是虚拟机,用于资源隔离,拥有自己的交换机、队列、绑定等。调整好后,客户端主动请求将start状态变为opening状态
server client Connection.Open-Ok 回复Ok
client server Basic.Get q=queue.pk  
server client Basic.Get-Ok x=exchangePk rk=pk Content-Header type=text/plain Content-Body…  
client server Basic.Ack 收到信息
clientclient serverserver Channel.Close reply=ok 关闭Channel,回复Ok
serverserver clientclient Basic.Deliver x=exchangePk rk=pk Content-Header type=text/plain Content-Body  
serverclient clientserver Channel.Close-ok  
clientserver serverclient Connection.Close reply=ok  
  • 接收者HA级别
    • 级别总共有3个,NONE/NORMAL/HIGH
    • NONE:客户端连接中断到再次链接这段时间的消息会丢失。
    • NORMAL:客户端连接中断到再次链接这段时间的消息能够保存
    • HIGH:事件做复制,事件服务器损坏一台不丢失消息。
    • 如无特殊要求,尽量使用NONE级别。
  • cluster配置
    • 配置为cluster方式,标记会有多个接收者一起消费事件

四、使用方法

1.mq的使用都是基于eventcenter,从使用上看

  • 事件生产者
    • 事件的发送主要封装在QueueMessageService和EventCenterService两个类中,实际都是调用EventSender。
    • EventSender是EventCenter提供的事件发送器,提供:
      • 发送机制:同步发送和异步发送
      • 消息级别:NONE/NORMAL/HIGH
    • 不同级别消息对应的特性
Level Durable Priority
NONE false 1
NORMAL true 3
HIGH true 5
  • 事件消费者
    • 事件的消费主要封装在MQReceive和EventCenterFactory两个类中,实际都是调用EventCenterFactory中的不同方法
    • EventConsumer的具体存在形式为EventReceiver
      • 消费机制:拉模式和推模式,具体为:PullEventReceiver/PushEventReceiver
      • 消费者HA界别:NONE/NORMAL/HIGH
      • 消息ack机制:自动和手动可配置
    • 不同界别对应的特性
level durable exclusive autoDelete
NONE 30min false false
NORMAL 3d false false
HIGH 3d false false

注:如果是ifCluster = true,durable为一周

2.具体使用情况

  • 事件生产者,即EventSender的使用方式基本一致。
  • 事件消费者,即EventReceiver的使用,目前接触的业务都是采用的PushEventReceiver的方式
queue exchange routing_key event_level priority ha_level durable autoAck consumer_service
async_lemonhh _status_thread ex_default_topic com.lemonhh.status.addComment.addReceiveTimeline NORMAL 3 HIGH true false lemonhh -status-thread
lemonhh_mq_timeline ex_default_topic com.lemonhh.mq.timeline NORMAL 3 NORMAL true true lemonhh -thread-consumer-timeline
replyTimelineCommentRankRefresher ex_default_topic com.lemonhh.xommunity.status.replytimeline.commentsrank.cache.refresh NORMAL 3 HIGH true true lemonhh -thread-consumer-status

消费消息需要confirm,即如果consumer没有手动ack,那么队列不会删除对应的event,在consumer连接断开后,mq会重新分配event到其他的consumer。

ha_level为HIGH,表示mq server挂掉之后,queue中的数据会被持久化,并保留3天。

3.存在问题

  • consumer进程不可用时

    • 如果autoAck=true,即自动confirm机制,event分配后即从queue中删除,如果consumer被kill,event丢失。
    • 如果autoAck=false,即手动confirm机制,失败的event会重新分配,event不会丢失
  • consumer被kill时,event的process过程如果未完成

    • 如果autoAck=true,即自动confirm机制,可能处理操作无法完成。
    • 如果autoAck=false,即手动confirm机制,失败的event会重新分配,有些过程会重新处理一遍。
  • 解决方法

    • 结合业务特征,event的重要程度,选择是否设置手动confirm
    • 选择合适的任务调度系统,保证consumer-thread对每一个event的完成处理,平稳,安全升级。