Spring AMQP Integration - Consumer Manual Acknowledgement(Spring AMQP 集成 - 消费者手册 致谢)
问题描述
我正在测试 Spring-AMQP 与 Spring-Integration 支持,我有以下配置和测试:
I am testing Spring-AMQP with Spring-Integration support, I've following configuration and test:
<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
    <int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>
<int-amqp:inbound-channel-adapter 
    channel="consumingChannel"
    queue-names="durableQ" 
    connection-factory="connectionFactory"
    concurrent-consumers="1"
    acknowledge-mode="AUTO"
    />
public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
    "classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");
    PollableChannel consumingChannel = context.getBean("consumingChannel",   
                                                          PollableChannel.class);           
        int count = 0;
        while (true) {
            Message<?> msg = consumingChannel.receive(1000);
            System.out.println((count++) + " 	 -> " + msg);
            try { //sleep to check number of messages in queue
                Thread.sleep(50000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
在此配置中,很明显,一旦消息到达 sumptionChannel,它们就会被确认并因此从队列中删除.我通过在 receive 之后放置一个高 sleep 并检查 queue-size 来验证这一点.没有进一步的控制.
In this configuration it was evident that as soon as message arrives at consumingChannel they are Acked and hence removed from queue. I validated this by placing a high sleep after receive and check queue-size. There are no further control on it.
现在,如果我设置 acknowledge-mode=MANUAL,似乎没有办法通过 spring 集成手动执行 ack.
Now if I set acknowledge-mode=MANUAL, there are no ways seems to do manual ack via spring integration. 
我需要处理消息,并在处理后执行 manual-ack,以便 ack 消息保持在 durableQ 处.
My need is to process message and after processing do a manual-ack so till ack message remains persisted at durableQ. 
有没有办法用 spring-amqp-integration 处理 MANUAL ack?我想避免将 ChannelAwareMessageListener 传递给 inbound-channel-adapter,因为我想控制消费者的 receive.
Is there any way to handle MANUAL ack with spring-amqp-integration? I want to avoid passing ChannelAwareMessageListener to inbound-channel-adapter since I want to have control of consumer's receive.
更新:
当使用自己的 listener-container 和 inbound-channel-adapter 时,这似乎是不可能的:
It even doesn't seems to be possible when using own listener-container with inbound-channel-adapter:
// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" /> 
<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="durableQ" />
    <property name="acknowledgeMode" value="MANUAL" />
// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
    <property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>
上述配置抛出错误,因为 messageListener 属性是不允许的,请参阅标记的内联注释.所以使用 listner-container 的目的被打败了(通过 ChannelAwareMessageListener 暴露 channel).
Above configuration throws error as messageListener property is not allowed, see inline comment on tag. So purpose of using listner-container got defeated (for exposing channel via ChannelAwareMessageListener).
对我来说 spring-integration 不能用于 manual-acknowledgement (我知道,这很难说!),任何人都可以帮我验证这个或者是我缺少任何特定的方法/配置吗?
To me spring-integration cannot be used for manual-acknowledgement (I know, this is a hard saying!),  Can anyone help me in validating this or Is there any specific approach/configuration required for this which I am missing?
推荐答案
问题是因为您使用的是使用 QueueChannel 的异步切换.通常最好控制容器中的并发性(concurrent-consumers="2"),并且不要在流程中进行任何异步切换(使用 DirectChannels).这样,AUTO ack 就可以正常工作了.而不是从 PollableChannel 接收 new MessageHandler() 到 SubscribableChannel.
The problem is because you are using async handoff using a QueueChannel. It is generally better to control the concurrency in the container (concurrent-consumers="2") and don't do any async handoffs in your flow (use DirectChannels). That way, AUTO ack will work just fine. Instead of receiving from the PollableChannel subscribe a new MessageHandler() to a SubscribableChannel.
更新:
您通常不需要在 SI 应用程序中处理消息,但您使用 DirectChannel 进行的测试相当于...
You normally don't need to deal with Messages in an SI application, but the equivalent of your test with a DirectChannel would be...
    SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class);
    channel.subscribe(new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("Got " + message);
        }
    });
                        这篇关于Spring AMQP 集成 - 消费者手册 致谢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Spring AMQP 集成 - 消费者手册 致谢
				
        
 
            
        基础教程推荐
- Java Swing计时器未清除 2022-01-01
 - 验证是否调用了所有 getter 方法 2022-01-01
 - 不推荐使用 Api 注释的描述 2022-01-01
 - 如何在 JFrame 中覆盖 windowsClosing 事件 2022-01-01
 - 如何在 Spring @Value 注解中正确指定默认值? 2022-01-01
 - 大摇大摆的枚举 2022-01-01
 - Java 实例变量在两个语句中声明和初始化 2022-01-01
 - 多个组件的复杂布局 2022-01-01
 - 从 python 访问 JVM 2022-01-01
 - 在 Java 中创建日期的正确方法是什么? 2022-01-01
 
    	
    	
    	
    	
    	
    	
    	
    	
						
						
						
						
						
				
				
				
				