How to use @Transactional with @KafkaListener?(如何将@Transaction与@KafkaListener一起使用?)
问题描述
有没有可能将声明性TX管理(通过@Transaction)与@KafkaListener注释方法一起使用? 例如,我想使用它来为每个监听器定义单独的发送超时。 我的设置如下:
TransactionManager:
@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
                                                                                 org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
  return new ChainedKafkaTransactionManager<>(
    kafkaTransactionManager, 
    hibernateTransactionManager);
}
KafkaListener:
@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}
问题是-KafkaMessageListenerContainer在调用此类方法之前创建自己的事务-它使用自己的TransactionTemplate:
@Nullable
private TransactionTemplate determineTransactionTemplate() {
  return this.transactionManager != null
    ? new TransactionTemplate(this.transactionManager)
    : null;
}
未使用TransactionInterceptor。那么具体的@KafkaListener方法如何设置具体的TX超时时间呢?
推荐答案
可以这样做,但有点复杂,因为您必须将消耗的偏移量发送到Kafka交易。
不使用ChainedKafkaTransactionManager,您可以为容器使用KafkaTransactionManager,为HibernateTransactionManager使用@Transactional。
这将产生类似的结果,因为Hibernate Tx将在Kafka事务之前提交(如果Hibernate提交失败,则Kafka Tx将回滚)。
编辑
若要将不同的链式TM配置到每个侦听器容器中,可以执行以下操作。
@组件 类ContainerFactoryCustomizer{ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
        ChainedKafkaTransactionManager<?, ?> chainedOne,
        ChainedKafkaTransactionManager<?, ?> chainedTwo) {
    factory.setContainerCustomizer(
            container -> {
                String groupId = container.getContainerProperties().getGroupId();
                if (groupId.equals("foo")) {
                    container.getContainerProperties().setTransactionManager(chainedOne);
                }
                else {
                    container.getContainerProperties().setTransactionManager(chainedTwo);
                }
            });
}
}
Where each chained TM has a Hibernate TM with a different default timeout.
The `groupid` is populated from the `@KafkaListener` `id` or `groupId` property.
                        这篇关于如何将@Transaction与@KafkaListener一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何将@Transaction与@KafkaListener一起使用?
				
        
 
            
        基础教程推荐
- Java 实例变量在两个语句中声明和初始化 2022-01-01
 - 多个组件的复杂布局 2022-01-01
 - 大摇大摆的枚举 2022-01-01
 - 在 Java 中创建日期的正确方法是什么? 2022-01-01
 - 从 python 访问 JVM 2022-01-01
 - Java Swing计时器未清除 2022-01-01
 - 如何在 Spring @Value 注解中正确指定默认值? 2022-01-01
 - 如何在 JFrame 中覆盖 windowsClosing 事件 2022-01-01
 - 验证是否调用了所有 getter 方法 2022-01-01
 - 不推荐使用 Api 注释的描述 2022-01-01
 
    	
    	
    	
    	
    	
    	
    	
    	
						
						
						
						
						
				
				
				
				