由于onMessage()
不允许抛出已检查的异常,因此可以将异常包装在中RuntimeException
并重新抛出。
try {
testService.process(message);
} catch (BusinessException e) {
throw new RuntimeException(e);
}
但是请注意,这可能导致消息无限期地重新发送。这是这样的:
RabbitMQ支持拒绝消息并要求代理重新排队。这显示在这里。但是RabbitMQ本身没有重试策略的机制,例如设置最大重试次数,延迟等。
使用Spring AMQP时,“拒绝时重新排队”是默认选项。SimpleMessageListenerContainer
如果有未处理的异常,Spring 将默认执行此操作。因此,在您的情况下,您只需要重新引发捕获的异常即可。但是请注意,如果您无法处理消息并且总是抛出异常,则它将无限期地重新发送,并导致无限循环。
您可以通过引发AmqpRejectAndDontRequeueException
异常来覆盖每个消息的此行为,在这种情况下,不会重新排队该消息。
您也可以SimpleMessageListenerContainer
通过设置完全关闭“拒绝时重新排队”行为
container.setDefaultRequeueRejected(false)
如果在RabbitMQ中设置了一条消息,则该消息被拒绝但不重新排队将丢失或转移到DLQ。
如果您需要具有最大尝试次数,延迟次数等的重试策略,最简单的方法是设置一个弹簧“无状态”RetryOperationsInterceptor
,它将在线程内进行所有重试(使用Thread.sleep()
),而不会拒绝每次重试的消息(因此无需为每个重试返回RabbitMQ重试)。重试用尽时,默认情况下将记录一条警告,并且该消息将被使用。如果要发送到DLQ,则需要一个RepublishMessageRecoverer
或MessageRecoverer
不接受消息而无需重新排队的自定义(在后一种情况下,您还应该在队列上设置 RabbitMQ DLQ)。默认消息恢复程序的示例:
container.setAdviceChain(new Advice[] {
org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
.stateless()
.maxAttempts(5)
.backOffOptions(1000, 2, 5000)
.build()
});
显然,这样做的缺点是您将在重试的整个过程中占用线程。您还可以选择使用“有状态”RetryOperationsInterceptor
,该状态将在每次重试时将消息发送回RabbitMQ,但是延迟仍将Thread.sleep()
在应用程序内实现,此外,设置有状态拦截器会更加复杂。
因此,如果您希望在不占用任何延迟的情况下重试,Thread
则需要在RabbitMQ队列上使用TTL进行更复杂的自定义解决方案。如果您不希望出现指数退缩(因此每次重试不会增加延迟),则要简单一些。要实现这种解决方案,您基本上可以在RabbitMQ上创建另一个带有参数的队列:"x-message- ttl": <delay time in milliseconds>
和"x-dead-letter-exchange":"<name of the original queue>"
。然后在您设置的主队列上"x-dead-letter-exchange":"<name of the queue with the TTL>"
。因此,现在当您拒绝但不重新排队时,RabbitMQ会将其重定向到第二个队列。TTL过期时,它将被重定向到原始队列,然后重新传递到应用程序。因此,现在您需要一个重试拦截器,该拦截器在每次失败后拒绝发送给RabbitMQ的消息,并跟踪重试计数。为了避免需要在应用程序中保留状态(因为如果您的应用程序是集群的,则需要复制状态),则可以根据x-death
RabbitMQ设置的标头计算重试计数。在此处查看有关此标头的更多信息。因此,在这一点上,实现自定义拦截器比通过这种行为自定义Spring有状态拦截器要容易得多。