RabbitMQ
一、初始化队列
rabbitmqctl add_vhost /path
rabbitmqctl add_user name password
rabbitmqctl set_permissions -p /path name ".*" ".*" ".*"
rabbitmqctl set_policy ha-two ".*_#ha#_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' -p /path
##历史遗留问题,导致部分没有带#ha#的队列也需要做mirror
rabbitmqctl set_policy ha-cluster-2 "^#cluster#_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' -p /path
二、时间key命名
- com.系统名.一级模块.二级模块.事件名称/三级模块名.事件名称(子事件名称).事件结果
- 子事件没有时,默认使用“default”作为占位符替代
- 事件结果,可自行定义
三、如何使用
1.产生事件
//创建发送者
EventSender es = EventCenterFactory.createSender(new String[]{"127.0.0.1"},EventPriority.NORMAL);
//异步发送-data为json string
es.asyncPub("com.lemonhh.status.server.signup.email.success",data);
es.syncPub("com.lemonhh.status.server.signup.email.success",data);
- 发送方式区别
- 异步发送:交给后台线程发送,发送者不关心发送结果。事件有丢失的可能。
- 同步发送:发送失败即可返回,发送者需要根据返回结果自行处理事件发送失败的后续事宜。
- 事件级别
- 事件定义了3个级别:NONE/NORMAL/HIGH。默认情况下使用NORMAL。
- 如果事件时消息量比较大的那种,比如:实时股价信息、日志等,请使用NONE,以减轻服务器负担。
2.消费事件
//创建pull方式的接收者
PullEventReceiver pullEventReceiver = EventCenterFactory.createPullReceiver(new String[]{"192.168.1.11"},"queue_name",HaLevel.NONE,false);
//绑定要监听的事件
pullEventReceriver.binding(new String[]{"com.lemonhh.#"});
//pull方式接收者获取事件,方法会阻塞。通过循环方式获取多条消息
Event e = pullEventReceiver.next();
//push方式接收者获取事件
pushEventReceiver.binding(new String[]{"com.lemonhh.#"},new PushProcessor(){
public void process(Event e){
System.out.println("Push : " + e)
}
})
- 两种接收方式
- push方式:使用方便。实际上如果不设置channel.basicQos(1),那么broker端会一次推送多条数据 RabbitMQ的每一数据帧(Frame)都是以0xCE结尾。
请求来源source | 请求目的地destination | 请求信息info | 备注remarks |
---|---|---|---|
client | server | Protocol-Header 0-9-1 | 发送请求 |
server | client | Connection.Start | 服务端开始建立连接 |
client | server | Connection.Start-Ok | 回复Ok |
server | client | Connection.Tune | 服务端确认客户端状态 |
client | server | Connection.Tune-Ok | 回复Ok |
client | server | Connection.Open vhost=/ | vhost像是虚拟机,用于资源隔离,拥有自己的交换机、队列、绑定等。调整好后,客户端主动请求将start状态变为opening状态 |
server | client | Connection.Open-Ok | 回复Ok |
client | server | Basic.Qos | |
server | client | Basic.Qos-Ok | |
client | server | Basic.Consume q=queue.pk | 指定订阅的队列 |
server | client | Basic.Consume-Ok Basic.Deliver x=exchangePk rk=pk Content-Header type=text/plain Content-Body… | Exchange, routing-key,Content-Header |
client | server | Basic.Ack | 收到信息 |
client | server | Channel.Close reply=ok | 关闭Channel,回复Ok |
server | client | Basic.Deliver x=exchangePk rk=pk Content-Header type=text/plain Content-Body | |
server | client | Channel.Close-ok | |
client | server | Connection.Close reply=ok | |
server | client | Connection.Close-OK |
- pull方式:可以设置等待超时,适合非阻塞下的场景。
请求来源source | 请求目的地destination | 请求信息info | 备注remarks |
---|---|---|---|
client | server | Protocol-Header 0-9-1 | 发送请求 |
server | client | Connection.Start | 服务端开始建立连接 |
client | server | Connection.Start-Ok | 回复Ok |
server | client | Connection.Tune | 服务端确认客户端状态 |
client | server | Connection.Tune-Ok | 回复Ok |
client | server | Connection.Open vhost=/ | vhost像是虚拟机,用于资源隔离,拥有自己的交换机、队列、绑定等。调整好后,客户端主动请求将start状态变为opening状态 |
server | client | Connection.Open-Ok | 回复Ok |
client | server | Basic.Get q=queue.pk | |
server | client | Basic.Get-Ok x=exchangePk rk=pk Content-Header type=text/plain Content-Body… | |
client | server | Basic.Ack | 收到信息 |
clientclient | serverserver | Channel.Close reply=ok | 关闭Channel,回复Ok |
serverserver | clientclient | Basic.Deliver x=exchangePk rk=pk Content-Header type=text/plain Content-Body | |
serverclient | clientserver | Channel.Close-ok | |
clientserver | serverclient | Connection.Close reply=ok |
- 接收者HA级别
- 级别总共有3个,NONE/NORMAL/HIGH
- NONE:客户端连接中断到再次链接这段时间的消息会丢失。
- NORMAL:客户端连接中断到再次链接这段时间的消息能够保存
- HIGH:事件做复制,事件服务器损坏一台不丢失消息。
- 如无特殊要求,尽量使用NONE级别。
- cluster配置
- 配置为cluster方式,标记会有多个接收者一起消费事件
四、使用方法
1.mq的使用都是基于eventcenter,从使用上看
- 事件生产者
- 事件的发送主要封装在QueueMessageService和EventCenterService两个类中,实际都是调用EventSender。
- EventSender是EventCenter提供的事件发送器,提供:
- 发送机制:同步发送和异步发送
- 消息级别:NONE/NORMAL/HIGH
- 不同级别消息对应的特性
Level | Durable | Priority |
---|---|---|
NONE | false | 1 |
NORMAL | true | 3 |
HIGH | true | 5 |
- 事件消费者
- 事件的消费主要封装在MQReceive和EventCenterFactory两个类中,实际都是调用EventCenterFactory中的不同方法
- EventConsumer的具体存在形式为EventReceiver
- 消费机制:拉模式和推模式,具体为:PullEventReceiver/PushEventReceiver
- 消费者HA界别:NONE/NORMAL/HIGH
- 消息ack机制:自动和手动可配置
- 不同界别对应的特性
level | durable | exclusive | autoDelete |
---|---|---|---|
NONE | 30min | false | false |
NORMAL | 3d | false | false |
HIGH | 3d | false | false |
注:如果是ifCluster = true,durable为一周
2.具体使用情况
- 事件生产者,即EventSender的使用方式基本一致。
- 事件消费者,即EventReceiver的使用,目前接触的业务都是采用的PushEventReceiver的方式
queue | exchange | routing_key | event_level | priority | ha_level | durable | autoAck | consumer_service |
---|---|---|---|---|---|---|---|---|
async_lemonhh _status_thread | ex_default_topic | com.lemonhh.status.addComment.addReceiveTimeline | NORMAL | 3 | HIGH | true | false | lemonhh -status-thread |
lemonhh_mq_timeline | ex_default_topic | com.lemonhh.mq.timeline | NORMAL | 3 | NORMAL | true | true | lemonhh -thread-consumer-timeline |
replyTimelineCommentRankRefresher | ex_default_topic | com.lemonhh.xommunity.status.replytimeline.commentsrank.cache.refresh | NORMAL | 3 | HIGH | true | true | lemonhh -thread-consumer-status |
消费消息需要confirm,即如果consumer没有手动ack,那么队列不会删除对应的event,在consumer连接断开后,mq会重新分配event到其他的consumer。
ha_level为HIGH,表示mq server挂掉之后,queue中的数据会被持久化,并保留3天。
3.存在问题
-
consumer进程不可用时
- 如果autoAck=true,即自动confirm机制,event分配后即从queue中删除,如果consumer被kill,event丢失。
- 如果autoAck=false,即手动confirm机制,失败的event会重新分配,event不会丢失
-
consumer被kill时,event的process过程如果未完成
- 如果autoAck=true,即自动confirm机制,可能处理操作无法完成。
- 如果autoAck=false,即手动confirm机制,失败的event会重新分配,有些过程会重新处理一遍。
-
解决方法
- 结合业务特征,event的重要程度,选择是否设置手动confirm
- 选择合适的任务调度系统,保证consumer-thread对每一个event的完成处理,平稳,安全升级。