Translating between a synchronous and an asynchronous API

One important aspect of the machine-to-machine (API) integration is the decision if the communication should be synchronous or asynchronous. The difference can be explained easily using the human-to-human communication analogy:

But it also happens that you need to connect two systems where one of them wants to communicate synchronously (typically using the HTTP protocol, nowadays frequently called a "REST interface") while the other one asynchronously (via some messaging system). Let's take a look at how a solution to this problem can look like.

Starting and finishing asynchronously

When the communication starts and ends with asynchronous messages is the easy option. In the human communication analogy, I read an email, call someone, ask a question, I get the answer immediately, so I write the answer in a response email and send it. Task done, no need to remember anything.

Actually, one of the main reasons why synchronous APIs are used is their simplicity. We just call the synchronous API within the asynchronous consumer (event handler) and after getting the response, we forward it in an asynchronous message as in the following pseudocode:

        AsyncAPIConsumer:

            onMessage(msg)
                output = syncAPI.call(msg)
                sendAsync(msg.responseDestination, output)
    

Starting and finishing synchronously

Here it starts to get complicated: I am on the phone with a client, I send a request in an email, I am waiting for the reply. Some other unrelated emails may come instead, so I need to be able to identify the message I am waiting for, e.g. by subject. Someone with access to the same mailbox can read the response I am waiting for, not knowing what to do with it, they may even decide to delete the message. The email response may take so long that I say "sorry" to the client and end the phone call. The human analogy may seem a little bit weird, but it illustrates that the technology solution will be complex, too.

Correlation ID

In the human example above the "email subject" was used to identify a reply. In general, if you have to handle an asynchronous response, you need a way to match it to the request you sent. This is done using a unique value that you send in the request and that has to be echoed by the asynchronous API in the response. This value is called a correlation ID and, if possible, we try to put it in some message header so that we can quickly route the message without having to parse its content (payload).

Single-instance solution

Smaller non-critical systems without a lot of traffic and without a requirement to be highly available can be implemented so that there is only one instance of the synchronous API deployed. This makes the routing of the asynchronous responses much easier because all of them are consumed by the one and only instance.

We still need to solve the problem that different requests of the API will be processed concurrently (in parallel) by different execution threads. The thread that has received an asynchronous response has to use a thread-safe implementation of a collection (typically a hash map) to search by the correlation ID to find out which other thread is waiting for the response. Then the waiting thread is notified and passed the response payload.

        global waitingMap = new ConcurrentHashMap

        SyncAPI:

            process(request)
                futureResponse = new CompletableFuture
                correlationID = generateUUID()
                waitingMap[correlationID] = futureResponse
                request.correlationID = correlationID
                sendAsync(asyncAPI.requestDestination, request)
                try {
                    response = futureResponse.get(timeout)
                    return response
                } catch(e: InterruptedException) {
                    return new ErrorResponse
                } finally {
                    waitingMap.remove(correlationID)
                }

            onMessage(asyncResponse)
                waitingFuture = waitingMap[asyncResponse.correlationID]
                waitingFuture.complete(asyncResponse)
    

In the above pseudocode the process method receives the synchronous request and waits for the asynchronous response. The onMessage method receiving the asynchronous response is executed by a different thread and notifies the waiting thread to continue. In case the response does not arrive within a reasonable time, the synchronous API responds with an error.

Persistence?

Even in the simple case above we have already introduced a timeout. This is because the synchronous interfaces are expected to respond within a limited time (we don't want to block our phone line for the whole day trying to finish just one call). On the other hand, asynchronous interfaces by their nature can take as long as they need to respond. When the timeout has expired, it does not make sense to continue waiting for the asynchronous API because the client of the synchronous API will most probably timeout as well.

This leads us to the question if we should persist the map of the waiting synchronous requests (e.g. in a database) instead of just keeping it in-memory. The answer is almost always "no" because in case our synchronous API dies, its client will usually get an error response automatically, so we won't be able to send it the response from the asynchronous API anyway. It is conventional for synchronous clients to retry in such cases.

Distributed (clustered) solutions

When there are multiple instances of the synchronous API, we need to solve the problem of the inter-process coordination to route the response messages properly. For it, we already have to be more concrete and tailor the solution to the various possible asynchronous messaging setups possible. Let's go through some of the most common ones.

JMS temporary queues

JMS message brokers support creating queues dynamically. The asynchronous API can be designed so that it creates a temporary queue whose name contains the correlation ID and then sends the messages to it. The synchronous API, when processing a request, consumes the response from the uniquely named queue, so the response messages is routed to the correct instance.

You should check if your message broker has an automatic clean-up and re-use feature for the temporary queues (and it is configured properly) to ensure that this solution does not cause performance problems.

If the message broker can handle the large number of dynamically created queues, this solution can have a great performance due to the fact that there will only be one message in each of the queues without any need to search for messages within the queues.

Solution pseudocode:

        SyncAPI:

            process(request)
                correlationID = generateUUID()
                request.correlationID = correlationID
                sendAsync(asyncAPI.requestDestination, request)
                try {
                    return consumeAsync(queueName = correlationID, timeout)
                } catch(e: InterruptedException) {
                    return new ErrorResponse
                }
    

JMS message selector

This solution does not require the asynchronous API to send the responses to temporary queues. It can send all messages in one response queue, it is enough to just put the correlation ID in a message header. The message broker can filter the messages using a message selector, so the JMS client (synchronous API implementation) will get exactly the message it is waiting for:

        SyncAPI:

            process(request)
                correlationID = generateUUID()
                request.correlationID = correlationID
                sendAsync(asyncAPI.requestDestination, request)
                try {
                    return consumeAsync(messageSelector = "JMSCorrelationID = '" + correlationID + "'", timeout)
                } catch(e: InterruptedException) {
                    return new ErrorResponse
                }
    

This solution is quite elegant, but you should make sure that not too many messages build up in the response queue because it will require the broker to regularly scan through all of them to check which ones it should return to the JMS client.

Kafka partitioning by client

Kafka is more lightweight comparing to JMS and does not provide the features mentioned above. We have to consider other approaches.

One possibility is to try to use the Kafka topic partitioning so that the reply message is consumed by the instance of the synchronous API implementation waiting for it. This means the correlation ID would have to be constructed so that it contains the identification of the node that should receive the response. Then the asynchronous API can use the node identifier to specify the Kafka partition to write the response to. Each consumer of the asynchronous responses will consume from exactly one Kafka partition and receive only the messages that belong to it.

I personally do not recommend this solution because there are multiple things that can break:

Kafka message filtering on the client

A much more robust solution is to let all the consumers consume all the response messages and just ignore those that are not for them. All the consumers have to be configured with unique consumer group names to make Kafka deliver each message to all of them.

The consumer can use a message header to read the correlation ID and ignore the message if the ID is not in the list of messages it is waiting for.

The disadvantage of this solution is that unnecessary duplicate messages are delivered to nodes that are not interested in them.

Redis to the rescue?

In some circumstances you may want to combine the simplicity of Kafka with the possibility to route the responses selectively to the right node without having to consume duplicates. You can use an additional distributed data store for that, especially if you already use it for some other purposes in your infrastructure.

Redis could be one such possibility. It is fast because it stores the data in-memory. That is OK as explained above in the Persistence section. It also has a publish-subscribe feature that allows you to create a message listener for the unique "channel" bound to the correlation ID which makes the code structure similar to using the JMS temporary queue or message selector.

Do you find this overview useful? Let me know on LinkedIn.

Back to all blog posts