Spring AMQP 集成 - 消费者手册 致谢

Spring AMQP Integration - Consumer Manual Acknowledgement(Spring AMQP 集成 - 消费者手册 致谢)
本文介绍了Spring AMQP 集成 - 消费者手册 致谢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我正在测试 Spring-AMQPSpring-Integration 支持,我有以下配置和测试:

I am testing Spring-AMQP with Spring-Integration support, I've following configuration and test:

<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
    <int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>

<int-amqp:inbound-channel-adapter 
    channel="consumingChannel"
    queue-names="durableQ" 
    connection-factory="connectionFactory"
    concurrent-consumers="1"
    acknowledge-mode="AUTO"
    />


public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
    "classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");

    PollableChannel consumingChannel = context.getBean("consumingChannel",   
                                                          PollableChannel.class);           
        int count = 0;
        while (true) {
            Message<?> msg = consumingChannel.receive(1000);
            System.out.println((count++) + " 	 -> " + msg);

            try { //sleep to check number of messages in queue
                Thread.sleep(50000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

在此配置中,很明显,一旦消息到达 sumptionChannel,它们就会被确认并因此从队列中删除.我通过在 receive 之后放置一个高 sleep 并检查 queue-size 来验证这一点.没有进一步的控制.

In this configuration it was evident that as soon as message arrives at consumingChannel they are Acked and hence removed from queue. I validated this by placing a high sleep after receive and check queue-size. There are no further control on it.

现在,如果我设置 acknowledge-mode=MANUAL,似乎没有办法通过 spring 集成手动执行 ack.

Now if I set acknowledge-mode=MANUAL, there are no ways seems to do manual ack via spring integration.

我需要处理消息,并在处理后执行 manual-ack,以便 ack 消息保持在 durableQ 处.

My need is to process message and after processing do a manual-ack so till ack message remains persisted at durableQ.

有没有办法用 spring-amqp-integration 处理 MANUAL ack?我想避免将 ChannelAwareMessageListener 传递给 inbound-channel-adapter,因为我想控制消费者的 receive.

Is there any way to handle MANUAL ack with spring-amqp-integration? I want to avoid passing ChannelAwareMessageListener to inbound-channel-adapter since I want to have control of consumer's receive.

更新:

当使用自己的 listener-containerinbound-channel-adapter 时,这似乎是不可能的:

It even doesn't seems to be possible when using own listener-container with inbound-channel-adapter:

// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" /> 

<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="durableQ" />
    <property name="acknowledgeMode" value="MANUAL" />

// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
    <property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>

上述配置抛出错误,因为 messageListener 属性是不允许的,请参阅标记的内联注释.所以使用 listner-container 的目的被打败了(通过 ChannelAwareMessageListener 暴露 channel).

Above configuration throws error as messageListener property is not allowed, see inline comment on tag. So purpose of using listner-container got defeated (for exposing channel via ChannelAwareMessageListener).

对我来说 spring-integration 不能用于 manual-acknowledgement (我知道,这很难说!),任何人都可以帮我验证这个或者是我缺少任何特定的方法/配置吗?

To me spring-integration cannot be used for manual-acknowledgement (I know, this is a hard saying!), Can anyone help me in validating this or Is there any specific approach/configuration required for this which I am missing?

推荐答案

问题是因为您使用的是使用 QueueChannel 的异步切换.通常最好控制容器中的并发性(concurrent-consumers="2"),并且不要在流程中进行任何异步切换(使用 DirectChannels).这样,AUTO ack 就可以正常工作了.而不是从 PollableChannel 接收 new MessageHandler()SubscribableChannel.

The problem is because you are using async handoff using a QueueChannel. It is generally better to control the concurrency in the container (concurrent-consumers="2") and don't do any async handoffs in your flow (use DirectChannels). That way, AUTO ack will work just fine. Instead of receiving from the PollableChannel subscribe a new MessageHandler() to a SubscribableChannel.

更新:

您通常不需要在 SI 应用程序中处理消息,但您使用 DirectChannel 进行的测试相当于...

You normally don't need to deal with Messages in an SI application, but the equivalent of your test with a DirectChannel would be...

    SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class);

    channel.subscribe(new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("Got " + message);
        }
    });

这篇关于Spring AMQP 集成 - 消费者手册 致谢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

How to send data to COM PORT using JAVA?(如何使用 JAVA 向 COM PORT 发送数据?)
How to make a report page direction to change to quot;rtlquot;?(如何使报表页面方向更改为“rtl?)
Use cyrillic .properties file in eclipse project(在 Eclipse 项目中使用西里尔文 .properties 文件)
Is there any way to detect an RTL language in Java?(有没有办法在 Java 中检测 RTL 语言?)
How to load resource bundle messages from DB in Java?(如何在 Java 中从 DB 加载资源包消息?)
How do I change the default locale settings in Java to make them consistent?(如何更改 Java 中的默认语言环境设置以使其保持一致?)