Kafka Transactions – Integrating with Legacy Programs – Grape Up
The article covers organising and utilizing Kafka transactions, particularly within the context of legacy programs that run on JPA/JMS frameworks. We have a look at numerous points which will happen from utilizing totally different TransactionManagers and the way to correctly use these totally different transactions to attain desired outcomes. Lastly, we analyze how Kafka transactions might be built-in with JTA.
Many legacy functions had been constructed on JMS shoppers with the JPA database, counting on transactions to make sure exactly-once supply. These programs depend on the soundness and surety of transactional protocols in order that errors are prevented. The issue comes once we attempt to combine such programs with newer programs constructed upon non-JMS/JPA options – issues like Kafka, MongoDB, and so forth.
A few of these programs, like MongoDB, actively work to make the combination with legacy JMS/JPA simpler. Others, like Kafka, introduce their very own options to such issues. We’ll look extra deeply into Kafka and the methods we are able to combine it with our legacy system.
In order for you some introduction to Kafka fundamentals, begin with this text overlaying the fundamentals.
Basic JMS/JPA setup
First, allow us to do a fast evaluate of the most typical setups for legacy programs. They usually use JMS to change messages between totally different functions, be it IBM MQ, RabbitMQ, ActiveMQ, Artemis, or different JMS suppliers – these are used with transactions to make sure exactly-once supply. Messages are then processed within the utility, oftentimes saving states in a database by way of JPA API utilizing Hibernate/Spring Knowledge to take action. Typically further frameworks are used to make the processing simpler to put in writing and handle, however typically, the processing could look much like this instance:
@JmsListener(vacation spot = "message.queue")
@Transactional(propagation = Propagation.REQUIRED)
public void processMessage(String message) {
exampleService.processMessage(message);
MessageEntity entity = MessageEntity.builder().content material(message).construct();
messageDao.save(entity);
exampleService.postProcessMessage(entity);
messageDao.save(entity);
jmsProducer.sendMessage(exampleService.createResponse(entity));
}
Messages are learn, processed, saved to the database, processed additional, up to date within the database, and the response is distributed to an extra JMS queue. It’s all executed in a transactional context in one in all two attainable methods:
1) Utilizing a separate JMS and JPA transaction throughout processing, committing a JPA transaction proper earlier than committing JMS.
2) Utilizing JTA to merge JMS and JPA transactions in order that each are dedicated or aborted on the similar time.
Each options have their upsides and pitfalls; neither of them totally ensures a scarcity of duplicates, although JTA positively offers higher ensures than separate transactions. JTA additionally doesn’t run into the issue of idempotent shoppers, it does, nevertheless, include an overhead. In both case, we could run into issues if we attempt to combine this with Kafka.
What are Kafka transactions?
Kafka dealer is quick and scalable, however the default mode by which it runs doesn’t maintain to exactly-once message supply assure. We may even see duplicates, or we may even see some messages misplaced relying on circumstances, one thing that previous legacy programs based mostly on transactions can not settle for. As such, we have to change Kafka to transactional mode, enabling exactly-once assure.
Transactions in Kafka are designed in order that they’re primarily dealt with on the producer/message dealer facet, slightly than the patron facet. The buyer is successfully an idempotent reader, whereas the producer/coordinator deal with the transaction.
This reduces efficiency overload on the patron facet, although at the price of the dealer facet. The movement appears to be like roughly like this:

1) Decide which dealer is the coordinator within the group
2) Producer sends beginTransaction() request to the coordinator
3) The coordinator generates transaction-id
4) Producer receives a response from the coordinator with transaction-id
5) Producer sends its messages to the main brokers of knowledge partitions along with transaction-id
6) Producer sends commitTransaction() request to the coordinator and awaits the response
7) Coordinator sends commitTransaction() request to each chief dealer and awaits their responses
8) Chief brokers set the transaction standing to dedicated for the written information and ship the response to the coordinator
9) Coordinator sends transaction outcome to the producer
This doesn’t comprise all the small print, explaining every little thing is past the scope of this text and plenty of sources might be discovered on this. It does nevertheless give us a transparent view on the transaction course of – the primary participant accountable is the transaction coordinator. It notifies leaders in regards to the state of the transaction and is liable for propagating the commit. There’s some locking concerned within the producer/coordinator facet which will have an effect on efficiency negatively relying on the size of our transactions.
Readers, in the meantime, merely function in read-committed mode, so they are going to be unable to learn messages from transactions that haven’t been dedicated.
Kafka transactions – setup and pitfalls
We’ll have a look at a sensible instance of organising and utilizing Kafka transactions, along with potential pitfalls on the patron and producer facet, additionally taking a look at particular methods Kafka transactions work as we undergo examples. We’ll use Spring to arrange our Kafka client/producer. To do that, we first must import Kafka into our pom.xml:
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
To allow transactional processing for the producer, we have to inform Kafka to explicitly allow idempotence, in addition to give it transaction-id:
producer:
bootstrap-servers: localhost:9092
transaction-id-prefix: tx-
properties:
allow.idempotence: true
transactional.id: tran-id-1
Every producer wants its personal, distinctive transaction-id, in any other case, we are going to encounter errors if multiple producer makes an attempt to carry out a transaction on the similar time. It’s essential to be sure that every occasion of an utility in a cloud surroundings has its personal distinctive prefix/transaction-id. Further setup should even be executed for the patron:
client:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
enable-auto-commit: false
isolation-level: read_committed
The properties that curiosity us set enable-auto-commit to false in order that Kafka doesn’t periodically commit transactions by itself. Moreover, we set isolation-level to learn dedicated, so that we’ll solely devour messages when the producer totally commits them. Now each the patron and the producer are set to exactly-once supply with transactions.
We will run our client and see what occurs if an exception is thrown after writing to the queue however earlier than the transaction is totally dedicated. For this goal, we are going to create a quite simple REST mapping in order that we write a number of messages to the Kafka matter earlier than throwing an exception:
@PostMapping(worth = "/required")
@Transactional(propagation = Propagation.REQUIRED)
public void sendMessageRequired() {
producer.sendMessageRequired("Take a look at 1");
producer.sendMessageRequired("Take a look at 2");
throw new RuntimeException("It is a take a look at exception");
}
The result’s precisely as anticipated – the messages are written to the queue however not dedicated when an exception is thrown. As such your entire transaction is aborted and every batch is aborted as properly. This may be seen within the logs:
2021-01-20 19:44:29.776 INFO 11032 --- [io-9001-exec-10] c.g.okay.kafka.KafkaProducer : Producing message "Take a look at 1"
2021-01-20 19:44:29.793 INFO 11032 --- [io-9001-exec-10] c.g.okay.kafka.KafkaProducer : Producing message "Take a look at 2"
2021-01-20 19:44:29.808 ERROR 11032 --- [producer-tx-1-0] o.s.okay.help.LoggingProducerListener : Exception thrown when sending a message with key='key-1-Take a look at 1' and payload='1) Take a look at 1' to matter messages_2:
org.apache.kafka.widespread.KafkaException: Failing batch since transaction was aborted
at org.apache.kafka.shoppers.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:422) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.shoppers.producer.internals.Sender.runOnce(Sender.java:312) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.shoppers.producer.internals.Sender.run(Sender.java:239) ~[kafka-clients-2.5.1.jar:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-01-20 19:44:29.808 ERROR 11032 --- [producer-tx-1-0] o.s.okay.help.LoggingProducerListener : Exception thrown when sending a message with key='key-1-Take a look at 2' and payload='1) Take a look at 2' to matter messages_2:
org.apache.kafka.widespread.KafkaException: Failing batch since transaction was aborted
at org.apache.kafka.shoppers.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:422) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.shoppers.producer.internals.Sender.runOnce(Sender.java:312) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.shoppers.producer.internals.Sender.run(Sender.java:239) ~[kafka-clients-2.5.1.jar:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
The LoggingProducerListener exception comprises the important thing and contents of the message that didn’t be despatched. The exception tells us that the batch has been failed as a result of the transaction was aborted. Precisely as anticipated, your entire transaction is atomic so failing it on the finish will trigger messages efficiently written beforehand to not be processed.
We will do the identical take a look at for the patron, the expectation is that the transaction can be rolled again if a message processing error happens. For that, we are going to create a easy client that can log one thing after which throw it.
@KafkaListener(subjects = "messages_2", groupId = "group_id")
public void consumePartitioned(String message) {
log.data(String.format("Consumed partitioned message "%s"", message));
throw new RuntimeException("It is a take a look at exception");
}
We will now use our REST endpoints to ship some messages to the patron. Positive sufficient, we see the precise conduct we anticipate – the message is learn, the log occurs, after which rollback happens.
2021-01-20 19:48:33.420 INFO 14840 --- [ntainer#0-0-C-1] c.g.okay.kafka.KafkaConsumer : Consumed partitioned message "1) Take a look at 1"
2021-01-20 19:48:33.425 ERROR 14840 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled again
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener technique 'public void com.grapeup.kafkatransactions.kafka.KafkaConsumer.consumePartitioned(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: It is a take a look at exception
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:350) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
In fact, due to the rollback, the message goes again on the subject. This ends in the patron studying it once more, throwing and rolling again, creating an infinite loop that can lock different messages out for this partition. It is a potential challenge that we should consider when utilizing Kafka transactions messaging, the identical manner as we might with JMS. The message will persist if we restart the applying or the dealer so aware dealing with of the exception is required – we have to establish exceptions that require a rollback and people that don’t. It is a very-application-specific drawback so there is no such thing as a approach to give a clear-cut answer on this article just because such an answer doesn’t exist.
Final however not least, it’s price noting that propagation works as anticipated with Spring and Kafka transactions. If we begin a brand new transaction by way of @Transactional annotation with REQUIRES_NEW propagation, then Kafka will begin a brand new transaction that commits individually from the unique one and whose commit/abort outcome has no impact on the dad or mum one.
There are a couple of extra issues we have now to bear in mind when working with Kafka transactions, a few of them to be anticipated, others not as a lot. The very first thing is the truth that producer transactions lock down the subject partition that it writes. This may be seen if we run 2 servers and make one transaction delayed. In our case, we began a transaction on server 1 that wrote messages to a subject after which waited 10 seconds to commit the transaction. Server 2 within the meantime wrote its personal messages and dedicated instantly whereas Server 1 was ready. The outcome might be seen within the logs:
Server 1:
2021-01-20 21:38:27.560 INFO 15812 --- [nio-9001-exec-1] c.g.okay.kafka.KafkaProducer : Producing message "Take a look at 1"
2021-01-20 21:38:27.578 INFO 15812 --- [nio-9001-exec-1] c.g.okay.kafka.KafkaProducer : Producing message "Take a look at 2"
Server 2:
2021-01-20 21:38:35.296 INFO 14864 --- [ntainer#0-0-C-1] c.g.okay.kafka.KafkaConsumer : Consumed message "1) Take a look at 1 Sleep"
2021-01-20 21:38:35.308 INFO 14864 --- [p_id.messages.0] o.a.okay.c.p.internals.TransactionManager : [Producer clientId=producer-tx-2-group_id.messages.0, transactionalId=tx-2-group_id.messages.0] Found group coordinator gu17.advert.grapeup.com:9092 (id: 0 rack: null)
2021-01-20 21:38:35.428 INFO 14864 --- [ntainer#0-0-C-1] c.g.okay.kafka.KafkaConsumer : Consumed message "1) Take a look at 2 Sleep"
2021-01-20 21:38:35.549 INFO 14864 --- [ntainer#0-0-C-1] c.g.okay.kafka.KafkaConsumer : Consumed message "1) Take a look at 1"
2021-01-20 21:38:35.676 INFO 14864 --- [ntainer#0-0-C-1] c.g.okay.kafka.KafkaConsumer : Consumed message "1) Take a look at 2"
Messages had been consumed by Server 2 after Server 1 has dedicated its long-running transaction. Solely a partition is locked, not your entire matter – as such, relying on the partitions that producers ship messages to, we could encounter full, partial, or no locking in any respect. The lock is held till the top of the transaction, be it by way of commit or abort.
One other fascinating factor is the order of messages – messages from Server 1 seem earlier than messages from Server 2, although Server 2 dedicated its transaction first. That is in distinction to what we might anticipate from JMS – the messages dedicated to JMS first would seem first, not like our instance. It shouldn’t be a serious drawback however it’s one thing we should, as soon as once more, consider whereas designing our functions.
Placing all of it collectively
Now that we have now Kafka transactions operating, we are able to attempt to add JMS/JPA configuration to it. We will as soon as once more make the most of the Spring setup to rapidly combine these. For the sake of the demo, we use an in-memory H2 database and ActiveMQ:
<!-- JPA setup -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Energetic MQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
We will arrange a easy JMS listener, which reads a message in a transaction, saves one thing to the database by way of JPA, after which publishes an extra Kafka message. This displays a standard approach to attempt to combine JMS/JPA with Kafka:
@JmsListener(vacation spot = "message.queue")
@Transactional(propagation = Propagation.REQUIRED)
public void processMessage(String message) {
log.data("Acquired JMS message: {}", message);
messageDao.save(MessageEntity.builder().content material(message).construct());
kafkaProducer.sendMessageRequired(message);
}
Now if we attempt operating this code, we are going to run into points – Spring will protest that it obtained 2 beans of TransacionManager class. It’s because JPA/JMS makes use of the bottom TransactionManager and Kafka makes use of its personal KafkaTransactionManager. To correctly run this code we have now to specify which transaction supervisor is for use by which @Transactional annotation. These transaction managers are fully separate and the transactions they begin or commit don’t have an effect on one another. As such, one might be dedicated and one aborted if we throw an exception at an accurate time. Let’s amend our listener for additional evaluation:
@JmsListener(vacation spot = "message.queue")
@Transactional(transactionManager = "transactionManager", propagation = Propagation.REQUIRED)
public void processMessage(String message) {
log.data("Acquired JMS message: {}", message);
messageDao.save(MessageEntity.builder().content material(message).construct());
kafkaProducer.sendMessageRequired(message);
exampleService.processMessage(message);
}
On this instance, we appropriately mark @Transactional annotation to make use of a bean named transactionManager, which is the JMS/JPA bean. In an analogous manner, @Transactional annotation in KafkaProducer is marked to make use of kafkaTransactionManager, in order that Kafka transaction is began and dedicated inside that operate. The problem with this code instance is the scenario, by which ExampleService throws in its processMessage operate at line 10.
If such a factor happens, then the JMS transaction is dedicated and the message is completely faraway from the queue. The JPA transaction is rolled again, and nothing is definitely written to the database regardless of line 6. The Kafka transaction is dedicated as a result of no exception was thrown in its scope. We’re left with a really peculiar state that will most likely want handbook fixing.
To attenuate such conditions we must be very cautious about when to begin which transaction. Optimally, we might begin Kafka transactions proper after beginning JMS and JPA transactions and commit it proper earlier than we commit JPA and JMS. This manner we decrease the possibility of such a scenario occurring (although nonetheless can not totally eliminate it) – the one factor that would trigger one transaction to interrupt and never the opposite is connection failure between commits.
Comparable care must be executed on the patron facet. If we begin a Kafka transaction, do some processing, save to database, ship a JMS message, and ship a Kafka response in a naive manner:
@KafkaListener(subjects = "messages_2", groupId = "group_id")
@Transactional(transactionManager = "kafkaTransactionManager", propagation = Propagation.REQUIRED)
public void processMessage(String message) {
exampleService.processMessage(message);
MessageEntity entity = MessageEntity.builder().content material(message).construct();
messageDao.save(entity);
exampleService.postProcessMessage(entity);
messageDao.save(entity);
jmsProducer.sendMessage(message);
kafkaProducer.sendMessageRequired(exampleService.createResponse(entity));
}
Assuming MessageDAO/JmsProducer begin their very own transaction of their operate, what we are going to find yourself with if line 12 throws is a replica entry within the database and a replica JMS message. The Kafka transaction can be correctly rolled again, however the JMS and JPA transactions had been already dedicated, and we are going to now must deal with the duplicate. What we must always do in our case, is to begin all transactions instantly and do all of our logic inside their scope. One of many options to take action, is to create a helper bean that accepts a operate to carry out inside a @Transactional name:
@Service
public class TransactionalHelper {
@Transactional(transactionManager = "transactionManager",
propagation = Propagation.REQUIRED)
public void executeInTransaction(Operate f) {
f.carry out();
}
@Transactional(transactionManager = "kafkaTransactionManager",
propagation = Propagation.REQUIRED)
public void executeInKafkaTransaction(Operate f) {
f.carry out();
}
public interface Operate {
void carry out();
}
}
This manner, our name appears to be like like this:
@KafkaListener(subjects = "messages_2", groupId = "group_id")
@Transactional(transactionManager = "kafkaTransactionManager", propagation = Propagation.REQUIRED)
public void processMessage(String message) {
transactionalHelper.executeInTransaction(() -> {
exampleService.processMessage(message);
MessageEntity entity = MessageEntity.builder().content material(message).construct();
messageDao.save(entity);
exampleService.postProcessMessage(entity);
messageDao.save(entity);
jmsProducer.sendMessage(message);
kafkaProducer.sendMessageRequired(exampleService.createResponse(entity));
});
}
Now we begin the processing inside the Kafka transaction and finish it proper earlier than the Kafka transaction is dedicated. That is after all assuming no REQUIRES_NEW propagation is used all through the inside capabilities. As soon as once more, in an precise utility, we would want to rigorously take into account transactions in every subsequent operate name to be sure that no separate transactions are operating with out our express information and consent.
We’ll run into an issue, nevertheless – the best way Spring works, JPA transactions will behave precisely as anticipated. JMS transaction can be began in JmsProducer anyway and dedicated by itself. The influence of this might be minimized by shifting ExampleService name from line 13 to earlier than line 12, however it’s nonetheless a difficulty we have to control. It turns into particularly vital if we have now to put in writing to a number of totally different JMS queues as we course of our message.
There isn’t any simple approach to power Spring to merge JPA/JMS transactions, we would want to make use of JTA for that.
What can and can’t be executed with JTA
JTA has been designed to merge a number of totally different transactions, successfully treating them as one. When the JTA transaction ends, every participant votes whether or not to commit or abort it, with the results of the voting being broadcasted in order that members commit/abort directly. It isn’t 100% foolproof, we could encounter a connection dying through the voting course of, which can trigger a number of of the members to carry out a unique motion. The chance, nevertheless, is minimal as a result of manner transactions are dealt with.
The primary advantage of JTA is that we are able to successfully deal with a number of totally different transactions as one – that is most frequently used with JMS and JPA transactions. So the query arises, can we merge Kafka transactions into JTA and deal with all of them as one? Properly, the reply to that’s sadly no – the Kafka transactions don’t comply with JTA API and don’t outline XA connection factories. We will, nevertheless, use JTA to repair the difficulty we encountered beforehand between JMS and JPA transactions.
To arrange JTA in our utility, we do want a supplier; nevertheless, base Java doesn’t present an implementation of JTA, solely the API itself. There are numerous suppliers for this, generally coming with the server, Websphere, and its UOP Transaction Supervisor being a very good instance. Different occasions, like with Tomcat, nothing is offered out of the field and we have now to make use of our personal. An instance of a library that does that is Atomikos – it does have a paid model however for using easy JTA, we’re adequate with the free one.
Spring made importing Atomikos simple with a starter dependency:
<!-- JTA setup -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
Spring configures our JPA connection to make use of JTA by itself; so as to add JMS to it, nevertheless, we have now to do some configuration. In one in all our @Configuration lessons, we must always add the next beans:
@Configuration
public class JmsConfig {
@Bean
public ActiveMQXAConnectionFactory connectionFactory() {
ActiveMQXAConnectionFactory connectionFactory = new ActiveMQXAConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
connectionFactory.setPassword("admin");
connectionFactory.setUserName("admin");
connectionFactory.setMaxThreadPoolSize(10);
return connectionFactory;
}
@Bean(initMethod = "init", destroyMethod = "shut")
public AtomikosConnectionFactoryBean atomikosConnectionFactory() {
AtomikosConnectionFactoryBean atomikosConnectionFactory = new AtomikosConnectionFactoryBean();
atomikosConnectionFactory.setUniqueResourceName("XA_JMS_ConnectionFactory");
atomikosConnectionFactory.setXaConnectionFactory(connectionFactory());
atomikosConnectionFactory.setMaxPoolSize(10);
return atomikosConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(atomikosConnectionFactory());
return template;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(PlatformTransactionManager transactionManager) {
DefaultJmsListenerContainerFactory manufacturing facility = new DefaultJmsListenerContainerFactory();
manufacturing facility.setConnectionFactory(atomikosConnectionFactory());
manufacturing facility.setConcurrency("1-1");
manufacturing facility.setTransactionManager(transactionManager);
return manufacturing facility;
}
}
We outline an ActiveMQXAConnectionFactory, which implements XAConnectionFactory from JTA API. We then outline a separate AtomikosConnectionFactory, which makes use of ActiveMQ one. For all intents and functions, every little thing else makes use of Atomikos connection manufacturing facility – we set it for JmsTemplate and DefaultJmsListenerContainerFactory. We additionally set the transaction supervisor, which is able to now turn into the JTA transaction supervisor.
Having all of that set, we are able to run our utility once more and see if we nonetheless encounter points with transactions not behaving as we would like them to. Let’s arrange a JMS listener with further logs for readability:
@JmsListener(vacation spot = "message.queue")
@Transactional(transactionManager = "transactionManager", propagation = Propagation.REQUIRED)
public void processMessage(remaining String message) {
transactionalHelper.executeInKafkaTransaction(() -> {
MessageEntity entity = MessageEntity.builder().content material(message).construct();
messageDao.save(entity);
log.data("Saved database entity");
kafkaProducer.sendMessageRequired(message);
log.data("Despatched kafka message");
jmsProducer.sendMessage("response.queue", "Response: " + message);
log.data("Despatched JMS response");
throw new RuntimeException("It is a take a look at exception");
});
}
We anticipate that JTA and Kafka transactions will each roll again, nothing can be written to the database, nothing can be written to response.queue, nothing can be written to Kafka matter, and that the message won’t be consumed. Once we run this, we get the next logs:
2021-01-20 21:56:00.904 INFO 9780 --- [enerContainer-1] c.g.kafkatransactions.jms.JmsConsumer : Saved database entity
2021-01-20 21:56:00.906 INFO 9780 --- [enerContainer-1] c.g.okay.kafka.KafkaProducer : Producing message "It is a take a look at message"
2021-01-20 21:56:00.917 INFO 9780 --- [enerContainer-1] c.g.kafkatransactions.jms.JmsConsumer : Despatched kafka message
2021-01-20 21:56:00.918 INFO 9780 --- [enerContainer-1] c.g.kafkatransactions.jms.JmsProducer : Sending JMS message: Response: It is a take a look at message
2021-01-20 21:56:00.922 INFO 9780 --- [enerContainer-1] c.g.kafkatransactions.jms.JmsConsumer : Despatched JMS response
2021-01-20 21:56:00.935 WARN 9780 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Execution of JMS message listener failed, and no ErrorHandler has been set.
org.springframework.jms.listener.adapter.ListenerExecutionFailedException: Listener technique 'public void com.grapeup.kafkatransactions.jms.JmsConsumer.processMessage(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: It is a take a look at exception
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:122) ~[spring-jms-5.2.10.RELEASE.jar:5.2.10.RELEASE]
The exception thrown is adopted by a number of errors about rolled again transactions. After checking our H2 database and taking a look at Kafka/JMS queues, we are able to certainly see that every little thing we anticipated has been fulfilled. The unique JMS message was not consumed both, beginning an limitless loop which, as soon as once more, we must handle in a operating utility. The important thing half although is that transactions behaved precisely as we meant them to.
Is JTA price it for that little little bit of surety? Is determined by the necessities – do we have now to put in writing to a number of JMS queues concurrently whereas writing to the database and Kafka? We should use JTA. Can we get away with a single write on the finish of the transaction? We’d not have to. There’s sadly no clear-cut reply, we should use the precise instruments for the precise job.
Abstract
We managed to efficiently launch Kafka in transactional mode, enabling exactly-once supply mechanics. This may be built-in with JMS/JPA transactions, though we could encounter issues in our listeners/shoppers relying on circumstances. If wanted, we could introduce JTA to permit us a neater management of various transactions and whether or not they’re dedicated or aborted. We used ActiveMQ/H2/Atomikos for this goal, however this works with any JMS/JPA/JTA suppliers.
Should you’re in search of assist in mastering cloud applied sciences, find out how our workforce works with progressive corporations.