请教activemq 消息持久化应用中消息进入DLQ的问题

ActiveMQ的消息重发与死信管理(DLQ)
DLQ-死信队列(Dead Letter
Queue)用来保存处理失败或者过期的消息。
出现以下情况时,消息会被redelivered
A transacted session is used and rollback() is called.
A transacted session is closed before commit is called.
A session is using CLIENT_ACKNOWLEDGE and Session.recover() is
当一个消息被redelivered超过maximumRedeliveries(缺省为6次,具体设置请参考后面的链接)次数时,会给broker发送一个"Poison
ack",这个消息被认为是a poison pill,这时broker会将这个消息发送到DLQ,以便后续处理。
缺省的死信队列是ActiveMQ.DLQ,如果没有特别指定,死信都会被发送到这个队列。
缺省持久消息过期,会被送到DLQ,非持久消息不会送到DLQ
可以通过配置文件(activemq.xml)来调整死信发送策略。
1.& 不使用缺省的死信队列
&&&&&&& 缺省所有队列的死信消息都被发送到同一个缺省死信队列,不便于管理。可以通过individualDeadLetterStrategy或sharedDeadLetterStrategy策略来进行修改。如下:
&broker...&
& &destinationPolicy&
&&& &policyMap&
&&&&& &policyEntries&
&&&&&&& &!— 设置所有队列,使用 '&' ,否则用队列名称 --&
&&&&&&& &policyEntry queue="&"&
&&&&&&&&& &deadLetterStrategy&
&&&&&&&&&&& &!--
&&&&&&&&&&&&&&&&&&& queuePrefix:设置死信队列前缀
&&&&&&&&&&&&&&&&&&& useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信
&&&&&&&&&&& --&
&&&&&&&&&&& &individualDeadLetterStrategy&& queuePrefix="DLQ." useQueueForQueueMessages="true" /&
& &&&&&&&&&/deadLetterStrategy&
&&&&&&& &/policyEntry&
&&&&& &/policyEntries&
&&& &/policyMap&
& &/destinationPolicy&
2.& 非持久消息保存到死信队列
&&&&&& &policyEntry queue="&"&
&&&&&&&& &deadLetterStrategy&
&&&&&&&&&& &sharedDeadLetterStrategy processNonPersistent="true" /&
&&&&&&&& &/deadLetterStrategy&
&&&&&& &/policyEntry&
3.& 过期消息不保存到死信队列
&&&&&& &policyEntry queue="&"&
&&&&&&&& &deadLetterStrategy&
&&&&&&&&&& &sharedDeadLetterStrategy processExpired="false" /&
&&&&&&&& &/deadLetterStrategy&
&&&&&& &/policyEntry&
4.& 持久消息不保存到死信队列
&&&&&& 对于过期的,可以通过processExpired属性来控制,对于redelivered的失败的消息,需要通过插件来实现如下:
丢弃所有死信
&&&broker ...&
&&& &plugins&
&&&&& &discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" /&
&&& &/plugins&
& &/broker&
丢弃指定目的死信
&&&broker ...&
&&& &plugins&
&&&&& &discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" reportInterval="1000" /&
&&& &/plugins&
& &/broker&
注意,目的名称使用空格分隔
The reportInterval property is used to denote how frequently do we output how many messages we have dropped - use 0 to disable.
用正则表达式过滤丢弃消息:
&&&broker ...&
&&& &plugins&
&&&&& &discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}" reportInterval="3000" /&
&&& &/plugins&
& &/broker&
Notice that the destination names use regular expressions. These match the number 000..999 at the end of each destination name.
5.& 死信队列消息的属性
死信队列中的消息,会增加几个属性,比如原过期时间(originalExpiration),原originalDeliveryMode等
DLQ处理说明:
individualDeadLetterStrategy属性说明:
sharedDeadLetterStrategy属性说明:
redelivery属性说明:
已投稿到:
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。推荐这篇日记的豆列
&&&&&&&&&&&&查看: 4303|回复: 2
? activemq5.1的怪问题,一批消息不能接收完全,感觉activemq问题很多
论坛徽章:14
spring2.54+activemq5.1+quartz1.6,必须定时接收一批消息(一批几百条,一批消息是一个整体),因此采用同步接收,消息接收超时时间30秒,现在问题是接收到几十条时开始出现: 21:20:20 DEBUG [InactivityMonitor WriteCheck] (InactivityMonitor.java:99) - 10000 ms elapsed since last write check.消息就接收不到了,应用程序重启仍然不能接收。重启activemq,又可以接收几十条,但仍然不完全。
& &以前用5.0时没有这样的问题。但是会出现 11:38:27&&WARN [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] (FailoverTransport.java:236)&&- Transport failed, attempting to automatically reconnect due to: java.net.SocketException: socket closed&&java.net.SocketException: socket closed
&&感觉activemq问题很多,5.0时安装windows service时会出现启动后立即停止,google后发现是其附带的配置文件有问题,是4.x的配置,缺少start参数。activemq据说是开源比较好的jms,但使用下来很失望,总有这样那样的问题,基本功能使用有时也不能保证
论坛徽章:14
∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞
力争成为中国最大的架构师群联盟,架构师技术交流群:正式开放!!!
& && &&&欢迎大家的加入!期待大家的加入!
已经上传的顶级软件产品的架构分析,本群资料仅供研究学习,不得商用!!!
技术文章包括:
《自己动手写操作系统》
《搜索引擎-原理、技术与系统》
《企业应用架构模式》
重要的RUP实例
设计模式精解
资料陆续上传中
∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞∽∝∞
论坛徽章:4
我最近开始学习,一个小的测试实例。同样也是接收不到message
itpub.net All Right Reserved. 北京皓辰网域网络信息技术有限公司版权所有    
 北京市公安局海淀分局网监中心备案编号: 广播电视节目制作经营许可证:编号(京)字第1149号&?xml version="1.0" encoding="utf-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd"&
配置与授权
&property name="locations"&
&value&file:${activemq.conf}/credentials.properties&/value&
&/property&
&!-- 审计日志 --&
lazy-init="false"
scope="singleton"
init-method="start"
destroy-method="stop"&
&bean destroy-method="close"&
&property name="driverClassName" value="com.mysql.jdbc.Driver"/&
&property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/&
&property name="username" value="activemq"/&
&property name="password" value="activemq"/&
&property name="maxActive" value="128"&&/property&
&property name="maxIdle" value="2"&&/property&
&property name="minIdle" value="1"&&/property&
&property name="maxWait" value="3000"&&/property&
&property name="defaultAutoCommit" value="true"&&/property&
&property name="poolPreparedStatements" value="true"/&
1、brokerName
每个broker必须持有唯一不同的名称,我们通常,以broker + {IP}方式
我们开启jmx,适用于组件监控,配合下文中的&managementContext/&
3、dataDirectory
数据目录,包括日志,cursor文件,数据文件等。数据文件可以在persistence配置中“重写”。
4、enableStatistics
开启统计,此后可以通过active ${status}等相关指令查看,开启有一定的性能损耗。
5、persistent
开启持久化功能,即数据将会写入Store.
如果为false,那么所有的消息都将以内存方式存储,请使用&memoryPersistenceAdapter&
6、schedulerSupport
开启调度,如果需要Broker执行,比如定期清理过期消息、检测磁盘和内容容量、清理离线订阅者等,此时必须开启。
7、useVirtualTopics
开启虚拟Topics功能
8、offlineDurableSubscriberTimeout
对于“持久订阅者”,如果长期离线,将导致Topic消息积压,验证影响Topic的转发效率。
我们应该将那些“长期离线”的订阅者删除。此值为7天,单位:毫秒
9、offlineDurableSubscriberTaskSchedule
用于“检测”离线订阅者的定时器调度间隔,此值为1个小时
10、schedulePeriodForDestinationPurge
如果一个空的Destination(没有消息积压)在一定时间内,没有Consumer消费时,将会被删除。
需要配合才能生效
&policyEntry
gcInactiveDestinati inactiveTimeoutBeforeGC="30000"/&
本实例中为7天有效期,每隔1小时检测一次
11、advisorySupport
开启通知,主要用于监控,当出现慢消费者、DLQ、容量不足等问题时,将会在“advisory”相关的Queue、Topic中发送内置的消息,
对于监控程序,可以通过消费advisory,实现组件监控机制。
有一定的性能开支
12、schedulePeriodForDiskUsageCheck
每个5分钟检测一次磁盘存储使用率。参见&systemUsage&
&broker xmlns="http://activemq.apache.org/schema/core"
brokerName="broker-01"
useJmx="true"
dataDirectory="${activemq.data}"
enableStatistics="true"
persistent="true"
useVirtualTopics="true"
schedulerSupport="true"
offlineDurableSubscriberTimeout=""
offlineDurableSubscriberTaskSchedule="3600000"
schedulePeriodForDestinati
advisorySupport="true"
schedulePeriodForDiskUsageCheck="300000"&
&destinationPolicy&
&policyMap&
&policyEntries&
关于持久化订阅者的相关配置
http://activemq.apache.org/manage-durable-subscribers.html
http://activemq.apache.org/per-destination-policies.html
删除不活跃通道
http://activemq.apache.org/delete-inactive-destinations.html
通用正则表达式,表示“全部topic”
2、expireMessagesPeriod
每个5分钟检测一次消息,对于TLL过期的消息将会被移除。(根据DLQ策略)
3、advisoryForSlowConsumers
如果“advisorySupport”开启时,当Broker判定某个消费者为慢速消费者(待确认消息 &= 2 * prefetch)
将会发送通知。
4、advisoryWhenFull
如果cursor、store溢满时,发送通知
5、maxPageSize
从store中pageIn消息列表的批量大小
6、producerFlowControl
是否开启“生产者流量控制”,如果开启,当内存溢满、“待发送消息达到阈值”将会阻塞producer。
因为我们采用store存储,所以不需要流量控制
7、durableTopicPrefetch
8、gcInactiveDestinations
不活跃的通道,是否允许被删除。
9、inactiveTimeoutBeforeGC
当一个通道中没有消息,且没有消费者时,此通道将会被认定为“不活跃”
&policyEntry topic="&" expireMessagesPeriod="300000"
advisoryForSlowC
advisoryWhenFull="true"
maxPageSize="512"
producerFlowC
durableTopicPrefetch="200"
gcInactiveDestinati
inactiveTimeoutBeforeGC=""
&!-- 转发策略 --&
&dispatchPolicy&
&!-- 对于Topic,我们通常采用轮训机制 --&
&roundRobinDispatchPolicy/&
&/dispatchPolicy&
对于non-durable Topic,积压的消息数量 ,如果超过限制,则剔除
http://activemq.apache.org/slow-consumer-handling.html
仅对非持久化Topic有效,目的是提高Topic的转发效率。
&pendingMessageLimitStrategy&
&constantPendingMessageLimitStrategy limit="256"/&
&/pendingMessageLimitStrategy&
&messageEvictionStrategy&
&oldestMessageEvictionStrategy/&
&/messageEvictionStrategy&
不支持"可回溯"订阅者,即新加入的订阅者只能获取订阅操作发生之后的消息
http://activemq.apache.org/subscription-recovery-policy.html
&subscriptionRecoveryPolicy&
&noSubscriptionRecoveryPolicy/&
&/subscriptionRecoveryPolicy&
对于TTL过期的、或者临时存储溢满被剔除的、重发次数超过限制的等等,都有可能进入DLQ
1、processExpired
TTL过期的消息,将直接移除,不会进入DLQ
2、processNonPersistent
对于非持久化消息,无论如何都进入DLQ
3、expiration
DLQ中消息的TTL,从进入DLQ开始。此值为“3天”
&deadLetterStrategy&
&sharedDeadLetterStrategy processExpired="false"
expiration=""/&
&/deadLetterStrategy&
积压消息的转发策略,cursor机制
当Producer发送小于大于Consumer消费效率时,这意味着Broker在转发层面需要对
“积压”的消息进行buffer或者临时存储。
1、对于非持久化订阅者,消息直接保存在内存中,存储量受限于systemUsage。
2、对于持久化订阅者,消息将使用store(内部基于VM + File)
http://activemq.apache.org/message-cursors.html
&pendingSubscriberPolicy&
&vmCursor/&
&/pendingSubscriberPolicy&
&pendingDurableSubscriberPolicy&
&storeDurableSubscriberCursor/&
&/pendingDurableSubscriberPolicy&
&/policyEntry&
因为Queue总是基于prefetch批量推送机制,所有当consumer有多个,且消息的密度不大时,如果使用
strictOrderDispatch将会导致总是转发给一个consumer的问题。
strictOrderDispatch + prefetch需要注意
&policyEntry queue="&" expireMessagesPeriod="300000"
maxPageSize="512"
producerFlowC
queuePrefetch="1000"
strictOrderDispatch="false"
sendAdvisoryIfNoC
advisoryForSlowC
advisoryWhenFull="true"
gcInactiveDestinati
inactiveTimoutBeforeGC=""&
&deadLetterStrategy&
私信队列,统一使用一个,避免不必要的维护成本,易于监控
&sharedDeadLetterStrategy processExpired="false"
expiration=""/&
&/deadLetterStrategy&
积压待发的消息,采用store
&pendingQueuePolicy&
&storeCursor/&
&/pendingQueuePolicy&
&/policyEntry&
&/policyEntries&
&/policyMap&
&/destinationPolicy&
虚拟Topic,我们让所有的Topic都支持虚拟化
http://activemq.apache.org/virtual-destinations.html
&destinationInterceptors&
&virtualDestinationInterceptor&
&virtualDestinations&
&virtualTopic name="&" prefix="VConsumers.*." selectorAware="false"/&
&/virtualDestinations&
&/virtualDestinationInterceptor&
&/destinationInterceptors&
http://activemq.apache.org/jmx.html
&managementContext&
&managementContext createC/&
&/managementContext&
PersistenceAdapter
http://activemq.apache.org/persistence.html
&persistenceAdapter&
&jdbcPersistenceAdapter dataDirectory="${activemq.data}" dataSource="#mysql-ds" lockKeepAlivePeriod="5000"&
&lease-database-locker lockAcquireSleepInterval="10000"/&
&/jdbcPersistenceAdapter&
&/persistenceAdapter&
Memory Setting and Flow-Control
http://activemq.apache.org/producer-flow-control.html
&systemUsage&
&systemUsage&
&memoryUsage&
&memoryUsage percentOfJvmHeap="70"/&
&/memoryUsage&
&storeUsage&
&storeUsage limit="50 gb"/&
&/storeUsage&
&tempUsage&
&tempUsage limit="20 gb"/&
&/tempUsage&
&/systemUsage&
&/systemUsage&
TransportConnector and Protocol Setting
http://activemq.apache.org/configuring-transports.html
&transportConnectors&
&!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --&
&transportConnector name="openwire"
uri="tcp://0.0.0.0:61616?maximumC/&
&transportConnector name="amqp"
uri="amqp://0.0.0.0:5672?maximumC/&
&transportConnector name="stomp"
uri="stomp://0.0.0.0:61613?maximumC/&
&transportConnector name="mqtt"
uri="mqtt://0.0.0.0:1883?maximumC/&
&transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumC/&
&/transportConnectors&
&!-- destroy the spring context on shutdown to stop jetty --&
&shutdownHooks&
&bean xmlns="http://www.springframework.org/schema/beans"
&/shutdownHooks&
私信队列处理
http://activemq.apache.org/message-redelivery-and-dlq-handling.html
&simpleAuthenticationPlugin&
&authenticationUser username="amq_manager" password="012345"
groups="users,admins"/&
&authenticationUser username="amq_common" password="123456"
groups="users"/&
&/simpleAuthenticationPlugin&
&redeliveryPlugin fallbackToDeadLetter="true"
sendToDlqIfMaxRetriesExceeded="true"&
&redeliveryPolicyMap&
&!-- 重发策略,对于超过重发次数的消息将会被添加到DLQ --&
&redeliveryPolicyMap&
&redeliveryPolicyEntries&
重发机制,默认重发6,重发延迟基于backOff模式
&redeliveryPolicy maximumRedeliveries="6"
initialRedeliveryDelay="1000"
backOffMultiplier="5"
queue="&"/&
&/redeliveryPolicyEntries&
&defaultEntry&
&!-- 其他策略 --&
&redeliveryPolicy maximumRedeliveries="6"
initialRedeliveryDelay="1000"
backOffMultiplier="5"/&
&/defaultEntry&
&/redeliveryPolicyMap&
&/redeliveryPolicyMap&
&/redeliveryPlugin&
&/plugins&
Web Manager
&import resource="jetty.xml"/&
&/beans&六、Producer配置(基于Spring)
上一篇: 下一篇:
activemq相关图片
activemq相关文章MQ返回队列中获取消息,messageId、correlationId作用5C从MQ返回队列中获取消息,MQMessage为什么要带上messageId、correlationId两属性代码如下:方式
MQ返回队列中获取消息,messageId、correlationId作用
从MQ返回队列中获取消息,MQMessage为什么要带上messageId、correlationId两属性
代码如下:
MQMessage m = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = 3;
gmo.waitInterval = this.
if (messageId != null) {
gmo.matchOptions |= 1;
m.messageId = messageId;
if (correlationId != null) {
gmo.matchOptions |= 2;
m.correlationId = correlationId;
this.queue.get(m, gmo);
MQMessage m = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = 3;
gmo.waitInterval = this.
this.queue.get(m, gmo);
这两种方式有什么区别,是不是同一个队列会有多处代码往其中设值进去,所以需要用自己对应的id来获取数据?刚接触MQ接口,希望得到大使指导。
MessageID唯一地标识了一条消息。
CorrelationID提供了一个消息头,用于将当前的消息和先前的某些消息或应用程序特定的ID关联起来。
解决方案二:
【云栖快讯】首届阿里巴巴中间件技术峰会,揭秘阿里10年分布式技术沉淀!阿里高可用体系核心缔造者、全链路压测创始人,DRDS与TDDL负责人等大咖出场,干货分享,不可错过!&&
为您提供简单高效、处理能力可弹性伸缩的计算服务,帮助您快速构建更稳定、安全的应用,提升运维效率,降低 IT 成本...
RDS是一种稳定可靠、可弹性伸缩的在线数据库服务。支持MySQL、SQL Server、PostgreSQL、高...

我要回帖

更多关于 activemq消息队列 的文章

 

随机推荐