Message Sequencing and Grouping

@rework

For many message processing scenarios - especially for FIX processing -, it is important to correlate different messages as messages may have been received in the wrong order and messages received first rely on the other for their processing or are even making the processing of the previously received message redundant (e.g. in case of order cancellation). The earlier in the processing chain this sequencing of messages is performed the less it will affect downstream resources (systems and individuals).

A related aspect is the requirement to group certain messages and process them as a whole.

In both scenarios the receiving system needs to wait and cache a set of received messages until either

FIX engines would take care of message being processed in order. However, as long as not both endpoints, sender and receiver, are FIX engines, the order of messages has to be taken care by the messaging platform in between.

There are two ways to ensure that messages are processed in sequential order.

Sequencing and Correlation

FIX engines would take care of message being processed in order. However, as long as not both endpoints, sender and receiver, are FIX engines, the order of messages has to be taken care by the messaging platform in between.

There are two ways to ensure that messages are processed in sequential order.

  1. Unit of Order Processing
  2. Single Thread Message Processor

Unit of Order Processing

In distributed multi-threaded environments, it’s difficult to guarantee that messages that have to be processed in sequence are eventually processed in sequence. Depending on the application a complete serialization of message processing is not always possible respectively desirable. To avoid moving logic that guarantees messages being processed in the right order into the application itself, the underlying layer should be able to handle it.

Unit of Order Processing can be applied on Process Definition layer. Each message triggering a certain process instance will be routed through a messaging provider utilizing a group ID in the header. The group ID could be anything in the original message that can be used to group messages. If it’s not the default indicator (groupID in the header), an additional pre-processor needs to be defined on the process trigger, that prepares the message header accordingly. All messages with the same group ID are processed by one thread at the same time only. Therefore exclusive access of a thread on any kinds of resources (data structures) is guaranteed. Another header field should contain a sequence number. Messages that arrive out of order are not processed until all predecessor messages have been processed correctly. Pending messages are rejected after a certain time (some milliseconds) and stored in a distributed in-memory cache due to failover reasons.

arbitrary from goes on JMS with group ID to, in cluster distributed queue - becomes single thread per group id

may have Stardust (transform) in between

Message Preprocessing

Goal of the Message Preprocessing step is

At the end of the preprocessing, the normalized message is sent to a JMS message queue via a Camel Application Type with the Route Definition

<to uri="activemq:queue:sequencerQueue"/>

Sequencing

<from uri="activemq:queue:sequencerQueue?concurrentConsumers=10&maxConcurrentConsumers=50"/>
<process bean="messageSequencePreprocessor">
<split streaming="true"> <to uri="ipp:direct"> </split>

with Additional Bean Definitions set to

<bean id="messageSequencePreprocessor" class="com.infinity.integration.message.MessageSequenceProcessor"/>

Unit of Order Processing

In distributed multi-threaded environments, it’s difficult to guarantee that messages that have to be processed in sequence are eventually processed in sequence. Depending on the application a complete serialization of message processing is not always possible respectively desirable. To avoid moving logic that guarantees messages being processed in the right order into the application itself, the underlying layer should be able to handle it.

Unit of Order Processing can be applied on Process Definition layer. Each message triggering a certain process instance will be routed through a messaging provider utilizing a group ID in the header. The group ID could be anything in the original message that can be used to group messages. If it’s not the default indicator (groupID in the header), an additional pre-processor needs to be defined on the process trigger, that prepares the message header accordingly. All messages with the same group ID are processed by one thread at the same time only. Therefore exclusive access of a thread on any kinds of resources (data structures) is guaranteed. Another header field should contain a sequence number. Messages that arrive out of order are not processed until all predecessor messages have been processed correctly. Pending messages are rejected after a certain time (some milliseconds) and stored in a distributed in-memory cache due to failover reasons.

Using Camel Aggregators

<aggregate>
<correlationExpression>
<simple>header.cheese</simple>
</correlationExpression>
<to uri="..."/>
</aggregate>

You