EDA implementation — Processing and integrating event streams

Chathura Ekanayake
9 min readMay 20, 2021

We have discussed a generic architecture for event-driven systems and some use cases of integrating business systems in to the event-driven architecture (EDA) in previous posts [1], [2]. In this article, we are considering event streams and how we can capture, filter, summarize and integrate various event streams into event-driven systems.

When handling event streams, it is necessary to receive, process and output continuous flows of data items (or events). These events may occur at very high rates and it may be necessary to provide outputs with very low latency (e.g. generating alerts if an unusual behavior is detected). Therefore, dedicated stream processing software with above capabilities is used for such stream processing/integration applications. In this article, we are using WSO2 Streaming Integrator (SI) as the stream processing software to illustrate the implementation of some EDA scenarios.

Scenario 1 — Exposing data streams over websockets

Events and data streams are usually transmitted over messaging protocols such as AMQP, Kafka and MQTT. These protocols are more suitable for transmitting high volumes of frequent events with lower bandwidth and many message brokers natively support those.

However, there are scenarios where we need to expose event streams to external parties or to display real-time event streams in web pages (e.g. render a real-time graph of sales by each outlet). In such use cases, it is usually not possible to use messaging protocols as they may be blocked by firewalls or web browsers do not support such protocols. One option we can use in such situations is to convert event streams to websockets and expose websocket endpoint to external parties. We can illustrate this with the below scenario:

A retail company’s PoS systems publish details about each sale to a Kafka topic. Company’s sales management wants to have a mobile app with a real-time dashboard displaying sales of different items. In addition, the mobile app should display a notification whenever a high volume sale is done.

In order to implement this use case, first we need to define an SI stream to capture events from the Kafka topic containing sales details. Kafka bootstrap server, topic name and message format are specified in the SI stream definition as below:

Similarly, we need to define the websocket output stream by specifying the port and the message format as below:

Now we can write a simple streaming query to select all events from the Kafka stream (i.e. HMartOrdersStream) and insert those into the websocket stream (i.e. HMartOrdersWSStream):

However, sales management may not be interested in all sales events. Instead it would be more efficient to calculate total sales per item within some time period (e.g. 30 minutes) and publish this summarized data to mobile apps. We can define another websocket stream to represent summarized sales data and modify the streaming query to compute total sales per item as below:

Now we can implement the second part of the use case where we need to identify each high volume sale (e.g. quantity greater than 100) and publish those sales events to another websocket stream. Again we can start by defining a stream to represent the websocket output:

Then we can use filtering capabilities of WSO2 SI to apply filtering criteria on sales events and only output high volume orders (e.g. where quantity is greater than 100) to the websocket stream as show bellow:

Complete WSO2 SI script for all this scenarios is shown below. Once this script is deployed in WSO2 SI, it will consume events from the Kafka server hosted in localhost:9092 and output summarized sales events via websocket port 9095 and high volume orders via websocket port 9096. Therefore, it is possible to develop a mobile application to connect to those websocket ports and display sales events in a dashboard.

WSO2 SI also provides a drag-and-drop based graphical editor to create stream processing applications. The same script in the graphical editor is shown below:

Scenario 2 — Capturing database updates as event streams

Another possible use case would be to capture database updates as events (e.g. inserting records to a given table) and injecting them to a Kafka topic. This could be particularly useful if we want to integrate existing systems into EDA, which do not natively support publishing updates to external parties. Let’s consider the below scenario as an example:

An organization has a CRM system to maintain all customer information. They are introducing some new systems, which need to work with customer data maintained in the CRM system. Therefore, in addition to importing all current customer data, it is necessary to push any new customer records to these new systems.

Furthermore, it is also required to validate each customer record before making those available to new systems (e.g. by ensuring that customer name, phone and address are in the correct format and do not exceed a given length)

We can start implementing this scenario by tapping into the CRM database and listen for any customer record updates by using the following stream definition:

Here we have specified the database connection details and the fields of the database table in the stream definition. This will listen to updates on “customers” table and injects all updated records into “customerRecordStream”.

Then we need to define another stream to represent the output Kafka topic named “new-customers” as below:

Finally, we have to write a query to select customer record events from the database stream that matches the validation criteria and insert those into the Kafka stream:

Note that we have used regular expressions to validate name and phone fields against required formats. One this script is deployed in WSO2 SI, it will listen to updates on the “customers” table in the given MySQL database, validates each new record against the given criteria and inserts those into the Kafka topic named “new-customers”. Then all other systems that are interested in customer records can subscribe to this Kafka topic and consume customer data as shown in the below diagram:

Scenario 3 — Real-time pattern matching

When working with event streams, it is usually required to identity and act on certain event patterns as soon as possible. Furthermore, as some details of individual events will be lost in summarizations, it may not be possible to detect some patterns later. Consider the below scenario as an example:

A retail company wants to identify possible suspicious buying patterns from various available data streams. One such suspicious pattern is a credit card failure followed by 3 or more high value purchases made by a customer within a week.

For this scenario, let’s assume that the retail company has a Kafka server with two topics named “sales” and “card-failures” to capture sales events and card failure events respectively. We can start by defining two WSO2 SI streams to represent these Kafka topics:

Let’s also assume that we want to publish customer Ids of customers who make suspicious purchases to another Kafka topic named “suspicious-purchases’. We can define another WSO2 SI stream to represent this:

Now we can write a stream processing query to identify patterns, where a card failure event is followed by 3 or more high value purchases (greater than $100) made by a same customer within a week:

Scenario 4 — Hierarchically processing large number of events

Organizations may have multiple locations which can generate events. If we try to collect all these individual events into a central location and process them, stream processing systems may get overloaded and may delay reactions to certain events.

A more scalable solution for such situations is to pre-process and summarize events closer to event sources and publish only important events and summarization to a central location. Consider the following scenario as an example:

A supermarket chain maintains multiple shopping outlets across the country and PoS machines at each outlet generate a sales event for each customer checkout. Company’s HQ want to get an insight about how each shopping outlet is performing and the demand of different items.

Stream processing at shopping outlets

First we can look at the stream processing script to be deployed in WSO2 SI cluster running at each shopping outlet. Let’s assume that PoS machines send checkout messages as HTTP/REST calls. We can define a stream to capture such sales events as below:

Further, let’s assume that HQ has a Kafka server to handle all events and it contains a topic named “daily-sales” to receive summarized daily sales from each outlet. We need to define another stream to represent this Kafka topic:

Then it is possible to summarize all sales events of within a day by calculating the total sales of each item and publish summarized events to a Kafka server hosted by HQ as follows:

Here we are adding a shopId to each summarized event as HQ need a way to differentiate among events generated by different outlets.

Stream processing at company’s HQ

Now we can focus on the streaming scripts to be deployed in WSO2 SI in HQ. Kafka server deployed in HQ contains the “daily-sales” topic containing summarized data from each outlet. First, we need to define a stream to capture events from the “daily-sales” topic.

Then we can summarize this data by calculating total sales of each item and total sales of each shopping outlet within required time duration (e.g. 1 day). Such summarized data can be exposed as websocket streams, so that company’s sales dashboard can display those in real-time graphs.

This complete scenario is depicted below:

As we have seen in this article, continuous event sources (i.e. event streams) can be integrated with EDA according to various requirements. However, as discussed in some of the scenarios, there are situations where it is necessary to expose events to external systems (e.g. mobile apps). In such use cases, it is required to enforce security and other relevant policies on external event consumption. We will discuss this in the next article.

--

--