Automatically pipe events from AsyncAPI channels with TriggerMesh

Jonathan Michaux

Jonathan Michaux

·10 min read

This tutorial demonstrates how to use AsyncAPI with TriggerMesh. It shows how the two play nicely together because TriggerMesh can easily ingest, transform, filter, and route events from channels defined in an AsyncAPI definition. This is one of many possible ways to use the two technologies together. This post assumes you have basic knowledge of AsyncAPI already, but are potentially new to TriggerMesh.

If you want access to the source files for this tutorial, head to the dedicated GitHub TriggerMesh repo.

The scenario is based on the perspective of a DevOps engineer that has been provided with an AsyncAPI definition that describes an application (or set of applications) that produce order events over various channels, including a Kafka topic, a Google pub/sub topic, and an HTTP service. The engineer's task is to ingest orders from these channels and route them to a single downstream Kafka topic called unified-orders.

The tutorial leverages the TriggerMesh open-source command-line interface called tmctl to create the TriggerMesh sources, brokers, triggers, and targets that make up the event flow. tmctl lets you run these components locally on any laptop that has Docker.

The main components of TriggerMesh

The main components of TriggerMesh

The project includes a prototype parser built with AsyncAPI's js-parser. It parses the provided AsyncAPI definition and generates tmctl CLI commands that create TriggerMesh event sources that ingest events from various channels into a TriggerMesh broker for further processing. You could write the tmctl commands manually too, but this parser shows how a well-documented AsyncAPI definition can be used alongside TriggerMesh.

The following schema illustrates what we'll build in this tutorial.

Schema shows the event flow we'll build during the tutorial

Schema shows the event flow we'll build during the tutorial

Please reach out to the TriggerMesh team on the TriggerMesh Slack workspace or GitHub TriggerMesh if you need help getting this to work or have any feedback.

Prerequisites

tmctl

Install tmctl with homebrew or other methods:

brew install triggermesh/cli/tmctl

AsyncAPI's js-parser

We also need to install the AsyncAPI parser. Make sure you have a recent enough version of npm and node:

npm install @asyncapi/parser

Quickstart a Kafka cluster

One of the order management systems is producing orders on a Kafka topic called orders. We also want to write all unified orders to a downstream unified-orders topic. We'll need a cluster to implement these topics.

Here we provide an easy way to start one using Redpanda and Docker Compose, but you could use any Kafka distribution you like (self-hosted or managed).

The provided docker-compose file will start a single node Redpanda cluster and a handy console. It is configured to work with the docker desktop and could require some adjustments to the listeners and advertised listeners for it to work on other environments. Reach out to us on Slack or GitHub if you need help, or read this kafka listeners blog post if you want to dive deep into this.

Run the following command in the same directory as the docker compose file to start Redpanda:

docker-compose up -d

Start a mock HTTP service

We'll start a mock HTTP service locally to simulate the order management system that provides orders through an HTTP API. Copy the order.json file to the current directory, it will be used as a mock event. Run the mock HTTP server, ideally in a new terminal (requires python 2 or 3):

python3 -m http.server 8000

(Optional) AsyncAPI CLI

You don't necessarily need it, but the AsyncAPI CLI is a great companion for working with AsyncAPI definitions during development, particularly the Studio that you can easily run in your browser to view or edit the provided AsyncAPI definition.

1brew install asyncapi
2asyncapi start studio &

Overview of the AsyncAPI definition

Servers

Let's see what is in our AsyncAPI definition. We've defined three servers that represent the three order management systems. These definitions contain information we'll need to create the TriggerMesh source components that will read from these servers.

1servers:
2  kafkaserver:
3    url: host.docker.internal:9092
4    protocol: kafka
5  httpserver:
6    url: http://host.docker.internal:8000
7    protocol: http
8  googlepubsub:
9    url: https://cloud.google.com
10    protocol: googlepubsub

Channels

I've defined one channel per order management system, each with its own reference to the server and bindings for its specific protocol. There may be ways to improve on this by creating a single channel that has different implementations depending on the server.

1channels:
2  orders:
3    description: Kafka topic to which orders are produced
4    servers:
5      - kafkaserver
6    subscribe:
7      message:
8        $ref: '#/components/messages/order'
9  /orders/order.json:
10    description: REST API endpoint that responds with an order
11    servers:
12      - httpserver
13    subscribe:
14      message:
15        $ref: '#/components/messages/order'
16      bindings:
17        http:
18          type: request
19          method: GET
20  orders-gcp:
21    description: Google pub/sub topic to which orders are produced
22    servers:
23      - googlepubsub
24    subscribe:
25      message:
26        $ref: '#/components/messages/order'
27      bindings:
28        googlepubsub:
29          topic: projects/jmcx-asyncapi/topics/ordersgcp

There is nothing overly special about the order message and schema so we don't need to go into detail on it here (but you can see it at the end of the AsyncAPI definition).

We can see that by combining information from the servers and the channels, we have everything we need (except, in some cases, auth credentials) to create TriggerMesh source components that can subscribe to and read from these channels. Let's do that!

Generating the TriggerMesh source components

Let's use the parser to generate some TriggerMesh components that will consume events from the channels defined in the AsyncAPI definition.

Make sure you're in the same directory as the JS file and try running the following command:

node parser.js

It should print the generated tmctl commands to standard out, and will also write them to a file called tmctl.sh. It will overwrite the file on each run. It's contents should look something like this:

1tmctl create broker TriggerMeshAsyncAPI
2tmctl create source kafka --name orders-kafkasource --topic orders --bootstrapServers host.docker.internal:9092 --groupID orders-group
3tmctl create source httppoller --name ordersorderjson-httppollersource --method GET --endpoint http://host.docker.internal:8000/order.json --interval 10s --eventType io.triggermesh.httppoller.event
4tmctl create source googlecloudpubsub --name orders-gcp-pubsubsource --topic projects/jmcx-asyncapi/topics/ordersgcp --serviceAccountKey $(cat serviceaccountkey.json)

The first command creates a lightweight event broker, the central component that will decouple event producers and consumers, and provide pub/sub-style reliable delivery of events to their targets. The name of the broker is derived from the title of the AsyncAPI definition.

Next, one source component is created per channel that provides a subscribe operation and has a reference to a server with a supported protocol such as http, kafka, or googlepubsub.

To get this working on your environment, you may want to play with the bootstrapServers value for the Kafka source, and the endpoint host for the HTTP poller source. You can change them in the AsyncAPI definition and then re-run the parser. You'll also need to create a file called serviceaccountkey.json with a GCP service account key, if you want to get the Google Pub/Sub channel working. If not, you can delete the Pub/Sub channel in the asyncAPI definition.

You can visit the TriggerMesh documentation for the Kafka source, HTTP Poller source, and Google Pub/Sub source for more information on parameters and usage.

We can now execute these tmctl commands to create the TriggerMesh components. You can either copy/paste them into your terminal or run the generated script:

sh tmctl.sh

Or you can pipe the output of the parser into the shell:

node parser.js | sh

We'll also open a second terminal and start tmctl watch to watch events flowing through the broker:

tmctl watch

Orders are already coming in from the HTTP service

The first thing you'll notice is that the HTTP poller is starting to produce events from the order management system that exposes the HTTP API, we can see the event showing up in the broker in the tmctl watch terminal:

A CloudEvent that originates from the HTTP poller

A CloudEvent that originates from the HTTP poller

The poller is configured to fetch an event every 10 seconds. You can adjust the endpoint and other parameters depending on your environment and needs. I’m using host.docker.internal because I’m running on Docker Desktop.

Ingest orders from Kafka

Now we can send an order to the orders topic and watch it land in the broker too. To do this, you can open the Redpanda console that was started in the docker compose and should be available at http://localhost:8080/ by default.

Go to the orders topic and publish this:

1{
2  "orderid": 18,
3  "ordertime": 1497014222380,
4  "region": "eu",
5  "category": "fashion",
6  "item": {
7    "itemid": "184",
8    "brand": "Patagonia",
9    "category": "Kids",
10    "name": "Tribbles Hoody"
11	}
12}

You should see it show up in the terminal that is running tmctl watch:

A new order from the Kafka channel shows up in TriggerMesh

A new order from the Kafka channel shows up in TriggerMesh

Pub/Sub

The same idea goes for the Google pub/sub topic. You can head to GCP and publish an event there and it'll also show up in the TriggerMesh broker:

Publishing an event to a Google Pub/Sub topic

Publishing an event to a Google Pub/Sub topic

Routing all orders to a single Kafka topic

We just showed how simple it was to capture order events from three different AsyncAPI channels with TriggerMesh. Now they are all arriving in our central broker, wrapped as CloudEvents so that they all have a uniform envelope that can be used to implement transformations and filters.

Let's keep it simple here and route events from all sources to a new Kafka topic called unified-orders. To do that, we'll start by creating a new Kafka target:

tmctl create target kafka --name unified-orders-target --topic unified-orders --bootstrapServers <serverURL>

And then we can define a "catch-all" trigger that will send all events to that target:

tmctl create trigger --target unified-orders-target

Although we generally recommend being more specific when creating triggers by adding a list of event types that should fire the Trigger. For example I could send only the events from the HTTP service to the Kafka topic as such:

tmctl create trigger --target unified-orders-target --eventTypes io.triggermesh.httppoller.event

Now take a look at the RedPanda console and you should see all orders arriving on the unified-orders topic. You can send more orders into GCP Pub/Sub and AWS SQS and see them get routed to the Kafka topic.

Wrap-up

In this post, we piped events from multiple AsyncAPI channels into a single Kafka topic. By pairing AsyncAPI with TriggerMesh, we can generate the TriggerMesh source components that will ingest and centralize the events into a broker. From there, we can start creating routes to deliver filtered sets of events to different targets. We did this with a Kafka target, but there are many other targets available.

If you wanted to take this example further, you could implement some JSON transformations that would standardize legacy order formats coming from some of the sources or could customize the format for a specific consumer on a new Kafka topic (as shown in the initial diagram). You could also model other parts of the architecture with their own AsyncAPI definitions.

Oh and one more thing, try the tmctl dump command. It will produce Kubernetes manifests that you can deploy onto a Kubernetes cluster with TriggerMesh installed and run these event flows as a Kubernetes-native application.

Head to AsyncAPI.com to learn more about AsyncAPI, and the TriggerMesh quickstart if you want to try out tmctl for yourself. You can also reach the TriggerMesh community on Slack, we'd love to hear from you!

All graphics by Jonathan Michaux.