API-First with AsyncAPI and ZenWave SDK
With ZenWave's spring-cloud-streams3
and jsonschema2pojo
plugins, you can generate:
- Strongly typed business interfaces
- Payload DTOs and
- Header objects from AsyncAPI definitions.
It uses Spring Cloud Streams as the default implementation, so it can connect to many different brokers via provided binders.
And because everything is hidden behind interfaces, we can encapsulate many Enterprise Integration Patterns:
- Transactional Outbox: with MongoDB ChangeStreams, Plain SQL, and Debezium SQL flavors
- Business DeadLetter Queues: allowing you to route different business Exceptions to different DeadLetter queues for non-retryable errors.
- Enterprise Envelope: when your organization uses a common Envelope for messages, you can still express your AsyncAPI definition in terms of your business payload.
See AsyncAPI and Spring Cloud Streams 3 Configuration Options and ZenWave Maven Plugin for more details.
1<plugin>
2 <groupId>io.github.zenwave360.zenwave-sdk</groupId>
3 <artifactId>zenwave-sdk-maven-plugin</artifactId>
4 <version>${zenwave.version}</version>
5 <configuration>
6 <inputSpec>classpath:/model/asyncapi.yml</inputSpec>
7 <addCompileSourceRoot>true</addCompileSourceRoot>
8 <addTestCompileSourceRoot>true</addTestCompileSourceRoot>
9 </configuration>
10 <executions>
11 {/* DTOs */}
12 <execution>
13 <id>generate-asyncapi-dtos</id>
14 <phase>generate-sources</phase>
15 <goals>
16 <goal>generate</goal>
17 </goals>
18 <configuration>
19 <generatorName>jsonschema2pojo</generatorName>
20 <configOptions>
21 <modelPackage>io.zenwave360.example.core.domain.events</modelPackage>
22 {/* <jsonschema2pojo.includeTypeInfo>true</jsonschema2pojo.includeTypeInfo>*/}
23 <jsonschema2pojo.useLongIntegers>true</jsonschema2pojo.useLongIntegers>
24 </configOptions>
25 </configuration>
26 </execution>
27 {/* Generate PROVIDER */}
28 <execution>
29 <id>generate-asyncapi</id>
30 <phase>generate-sources</phase>
31 <goals>
32 <goal>generate</goal>
33 </goals>
34 <configuration>
35 <generatorName>spring-cloud-streams3</generatorName>
36 <configOptions>
37 <role>provider</role>
38 <style>imperative</style>
39 <transactionalOutbox>mongodb</transactionalOutbox>
40 <modelPackage>io.zenwave360.example.core.domain.events</modelPackage>
41 <producerApiPackage>io.zenwave360.example.core.outbound.events</producerApiPackage>
42 <consumerApiPackage>io.zenwave360.example.adapters.commands</consumerApiPackage>
43 </configOptions>
44 </configuration>
45 </execution>
46 </executions>
47
48 <dependencies>
49 <dependency>
50 <groupId>io.github.zenwave360.zenwave-sdk.plugins</groupId>
51 <artifactId>asyncapi-spring-cloud-streams3</artifactId>
52 <version>${zenwave.version}</version>
53 </dependency>
54 <dependency>
55 <groupId>io.github.zenwave360.zenwave-sdk.plugins</groupId>
56 <artifactId>asyncapi-jsonschema2pojo</artifactId>
57 <version>${zenwave.version}</version>
58 </dependency>
59 </dependencies>
60</plugin>
Provider vs Client
Because broker-based API definitions are inherently symmetrical, it's difficult to establish the roles of client/server. ZenWave generates code based on provider
and client
roles, where a provider
"produces events" and "consumes commands". See API-First with AsyncAPI page for more details on "publish/subscribe", "producer/consumer," and "provider/client" roles.
Write your AsyncAPI definitions from the
provider
perspective and then configure the code generator to generate either aprovider
or aclient
.
If you still find it confusing which is a provider and a client, use this rule: In a given messaging scenario, there can be only one provider of a message, while there can be multiple clients. If the provider is producing messages, use the publish
section. If the provider is consuming messages, use the subscribe
section.
Spring Cloud Streams Producer: Using generated code to produce messages
On the producer side generates:
- Interface
ICustomerEventsProducer
to produce typed messages that uses your domain names:onCustomerEvent
,CustomerEventPayload
, andCustomerEventPayloadHeaders
. - Producer
@Component
CustomerEventsProducer
to use Autowire in your services.
To produce messages, all you need to do is @Autowire, the generated producer, as part of your code.
Because it sits behind a business-oriented interface, this producer component can be implemented in different flavors and integration patterns like transactional outbox (for MongoDB and SQL), or enterprise envelop, depending on how you configure the Zenwave maven generator.
1// Autogenerated: you can @Autowire it in your code
2public interface ICustomerEventsProducer {
3 // headers object omitted for brevity
4 /**
5 * Customer Domain Events
6 */
7 boolean onCustomerEvent(CustomerEventPayload payload, CustomerEventPayloadHeaders headers);
8
9}
1// Autogenerated: add it to your autoscan packages
2@Component
3public class CustomerEventsProducer implements ICustomerEventsProducer {
4
5 // details omitted for brevity
6
7 /**
8 * Customer Domain Events
9 */
10 public boolean onCustomerEvent(CustomerEventPayload payload, CustomerEventPayloadHeaders headers) {
11 // this is one of the many flavors, you shouldn't need to worry about the details
12 log.debug("Sending message to topic: {}", onCustomerEventBindingName);
13 Message message = MessageBuilder.createMessage(payload, new MessageHeaders(headers));
14 return streamBridge.send(onCustomerEventBindingName, message);
15 }
16}
1// Autowire this producer in your code
2@Autowired
3ICustomerEventsProducer customerEventsProducer;
4
5// and use it to produce messages
6var message = new CustomerEventPayload()
7 .withCustomerId("123")
8 // [...] set some more data
9 .withEventType(CustomerEventPayload.EventType.CREATED);
10// notice how headers are also strongly typed
11var headers = new ICustomerEventsProducer.CustomerEventPayloadHeaders()
12 .entityId("123")
13 .commonHeader("value")
14 .set("undocumented-header", "value");
15
16customerEventsProducer.onCustomerEvent(message, headers);
Spring Cloud Streams Consumer: Using generated code to consume messages
On the consumer side generates:
- Functional Consumer
DoCustomerRequestConsumer
for Spring Cloud Streams bindings. - Business Interface
IDoCustomerRequestConsumerService
you need to implement in order to receive strongly typed messages.
This Functional Consumer can abstract away different integration patterns like Business Dead Letter Queue and others... depending on how you configure the Zenwave maven generator.
To consume messages, you need to implement the generated business interface and register it as a Spring bean.
1// Autogenerated: you need to implement and provide this business interface
2public interface IOnCustomerEventConsumerService {
3 /**
4 * Customer Domain Events
5 */
6 default void onCustomerEvent(CustomerEventPayload payload, CustomerEventPayloadHeaders headers) {};
7}
1// Autogenerated: add it to your autoscan packages and provide business interface implementation
2@Component("on-customer-event")
3public class OnCustomerEventConsumer implements Consumer<Message<CustomerEventPayload>> {
4
5 // you need to implement this interface
6 protected IOnCustomerEventConsumerService service;
7
8 @Override
9 public void accept(Message<CustomerEventPayload> message) {
10 log.debug("Received message: {}", message);
11 try {
12 Object payload = message.getPayload();
13 if (payload instanceof CustomerEventPayload) {
14 var headers = new IOnCustomerEventConsumerService.CustomerEventPayloadHeaders();
15 headers.putAll(message.getHeaders());
16 service.onCustomerEvent((CustomerEventPayload) payload, headers);
17 return;
18 }
19 log.error("Received message without any business handler: [payload: {}, message: {}]", payload.getClass().getName(), message);
20 } catch (Exception e) {
21 // error handling and dead-letter-queue routing omitted for brevity
22 }
23 }
24}
1// Implement the business interface and add it to your context
2@Component
3class DoCustomerRequestConsumerService implements IDoCustomerRequestConsumerService {
4
5 @Override
6 public void doCustomerRequest(CustomerRequestPayload payload, CustomerRequestPayloadHeaders headers) {
7 log.info("Received '{}' message with payload: {}", payload.getClass(), payload);
8 // [...] do something with this message
9 }
10}
Exception Handling with Business Dead Letter Queue
ZenWave SDK consumers can be configured to route exceptions to different error queues. This is useful to manage non-retryable business exceptions so the stream processing is not interrupted. If your code throws an exception not configured for error routing, it will be rethrown and follow the standard error handling mechanism for your particular Spring Cloud Stream binder.
1spring.cloud.stream.bindings:
2 on-customer-event-in-0:
3 destination: customer.events
4 content-type: application/json
5 # configuring error routing for this consumer
6 dead-letter-queue-error-map: >
7 {
8 'javax.validation.ValidationException': 'on-customer-event-validation-error-out-0',
9 'java.lang.Exception': 'on-customer-event-error-out-0'
10 }
Populating Headers at Runtime Automatically
ZenWave SDK provides x-runtime-expression
for automatic header population at runtime. Values for this extension property are:
$message.payload#/<json pointer fragment>
: follows the same format as AsyncAPI Correlation ID object.$tracingIdSupplier
: will use the tracing idjava.function.Supplier
configured in your Spring context.
1 CustomerEventMessage:
2 name: CustomerEventMessage
3 // [...] other properties omitted for brevity
4 headers:
5 type: object
6 properties:
7 kafka_messageKey:
8 type: string
9 description: This one will be populated automatically at runtime
10 x-runtime-expression: $message.payload#/customer/id
11 tracingId:
12 type: string
13 description: This one will be populated automatically at runtime
14 x-runtime-expression: $tracingIdSupplier
1<configOption>
2 <tracingIdSupplierQualifier>myTracingIdSupplier</tracingIdSupplierQualifier>{/* default is "tracingIdSupplier" */}
3 <runtimeHeadersProperty>x-custom-runtime-expression</runtimeHeadersProperty>{/* you can also override this extension property name */}
4</configOption>
1 @Bean("myTracingIdSupplier")
2 public Supplier tracingIdSupplier() {
3 return () -> "test-tracing-id";
4 }
InMemory Producers as TestDoubles
Alongside the generated producer, ZenWave SDK also generates an in-memory producer captor that can be used as a test double and a singletone manual context so you easily include them in your unit/integration tests.
1// generated class, you can use in your tests
2public class ProducerInMemoryContext {
3
4 public static final ProducerInMemoryContext INSTANCE = new ProducerInMemoryContext();
5
6
7 private CustomerEventsProducerCaptor customerEventsProducerCaptor = new CustomerEventsProducerCaptor();
8
9 public <T extends ICustomerEventsProducer> T customerEventsProducer() {
10 return (T) customerEventsProducerCaptor;
11 }
12}
And you can use it in your tests to instantiate your service and perform assertions. You can find a working example here.
1// example of how you can instantiate your service using the in-memory producer captor
2public class InMemoryTestsManualContext extends InMemoryTestsConfig {
3
4 // [...] other beans omitted for brevity
5
6 public CustomerUseCasesImpl customerUseCases() {
7 // instantiating a bean with in-memory dependencies
8 return new CustomerUseCasesImpl(customerRepository(), ProducerInMemoryContext.INSTANCE.customerEventsProducer());
9 }
10}
11
12// and using it in your tests to perform assertions
13public class CustomerUseCasesTest {
14
15 // this is the in-memory producer captor wired
16 CustomerEventsProducerCaptor customerEventsProducer = ProducerInMemoryContext.INSTANCE.customerEventsProducer();
17
18 @Test
19 void testCustomerUseCase() {
20 // [...] test your use case
21 Assertions.assertEquals(3, customerEventsProducer.getCapturedMessages(customerEventsProducer.onCustomerEventBindingName).size());
22 }
23}
Enterprise Integration Patterns
Because access to the underlying implementation is encapsulated behind the generated interfaces, it's possible to implement many Enterprise Integration Patterns (EIP) on top of them.
- Transactional Outbox: for MongoDB, plain JDBC, and Debezium SQL
- Business DeadLetterQueue
- Enterprise Envelop
- Automatic headers
Jump to (https://zenwave360.github.io/zenwave-sdk/plugins/asyncapi-spring-cloud-streams3/[ZenWave SDK Spring-Cloud-Streams plugin] for more details on how to configure this.
Originally published at https://zenwave360.github.io.