Kafka Bindings
This document defines how to describe Kafka-specific information on AsyncAPI.
Version
Current version is 0.5.0.
Server Binding Object
This object contains information about the server representation in Kafka.
Fixed Fields
Example
1servers:
2 production:
3 bindings:
4 kafka:
5 schemaRegistryUrl: 'https://my-schema-registry.com'
6 schemaRegistryVendor: 'confluent'
7 bindingVersion: '0.5.0'Channel Binding Object
This object contains information about the channel representation in Kafka (eg. a Kafka topic).
Fixed Fields
| Field Name | Type | Description | Applicability [default] | Constraints |
|---|---|---|---|---|
topic | string | Kafka topic name if different from channel name. | OPTIONAL | - |
partitions | integer | Number of partitions configured on this topic (useful to know how many parallel consumers you may run). | OPTIONAL | Must be positive |
replicas | integer | Number of replicas configured on this topic. | OPTIONAL | MUST be positive |
topicConfiguration | TopicConfiguration Object | Topic configuration properties that are relevant for the API. | OPTIONAL | - |
bindingVersion | string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [latest] | - |
This object MUST contain only the properties defined above.
Example
1channels:
2 user-signedup:
3 bindings:
4 kafka:
5 topic: 'my-specific-topic-name'
6 partitions: 20
7 replicas: 3
8 topicConfiguration:
9 cleanup.policy: ["delete", "compact"]
10 retention.ms: 604800000
11 retention.bytes: 1000000000
12 delete.retention.ms: 86400000
13 max.message.bytes: 1048588
14 bindingVersion: '0.5.0'TopicConfiguration Object
This objects contains information about the API relevant topic configuration in Kafka.
| Field Name | Type | Description | Applicability [default] | Constraints |
|---|---|---|---|---|
cleanup.policy | array | The cleanup.policy configuration option. | OPTIONAL | array may only contain delete and/or compact |
retention.ms | long | The retention.ms configuration option. | OPTIONAL | see kafka documentation |
retention.bytes | long | The retention.bytes configuration option. | OPTIONAL | see kafka documentation |
delete.retention.ms | long | The delete.retention.ms configuration option. | OPTIONAL | see kafka documentation |
max.message.bytes | integer | The max.message.bytes configuration option. | OPTIONAL | see kafka documentation |
confluent.key.schema.validation | boolean | It shows whether the schema validation for the message key is enabled. Vendor specific config. | OPTIONAL | - |
confluent.key.subject.name.strategy | string | The name of the schema lookup strategy for the message key. Vendor specific config. | OPTIONAL | Clients should default to the vendor default if not supplied. |
confluent.value.schema.validation | boolean | It shows whether the schema validation for the message value is enabled. Vendor specific config. | OPTIONAL | - |
confluent.value.subject.name.strategy | string | The name of the schema lookup strategy for the message value. Vendor specific config. | OPTIONAL | Clients should default to the vendor default if not supplied. |
This object MAY contain the properties defined above including optional additional properties.
Example
1topicConfiguration:
2 cleanup.policy: ["delete", "compact"]
3 retention.ms: 604800000
4 retention.bytes: 1000000000
5 delete.retention.ms: 86400000
6 max.message.bytes: 1048588
7 confluent.key.schema.validation: true
8 confluent.key.subject.name.strategy: "TopicNameStrategy"
9 confluent.value.schema.validation: true
10 confluent.value.subject.name.strategy: "TopicNameStrategy"Operation Binding Object
This object contains information about the operation representation in Kafka (eg. the way to consume messages)
Fixed Fields
| Field Name | Type | Description | Applicability [default] | Constraints |
|---|---|---|---|---|
groupId | Schema Object | Reference Object | Id of the consumer group. | OPTIONAL | - |
clientId | Schema Object | Reference Object | Id of the consumer inside a consumer group. | OPTIONAL | - |
bindingVersion | string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [latest] | - |
This object MUST contain only the properties defined above.
Example
1channels:
2 user-signedup:
3operations:
4 userSignup:
5 action: receive
6 bindings:
7 kafka:
8 groupId:
9 type: string
10 enum: ['myGroupId']
11 clientId:
12 type: string
13 enum: ['myClientId']
14 bindingVersion: '0.5.0'Message Binding Object
This object contains information about the message representation in Kafka.
Fixed Fields
| Field Name | Type | Description | Applicability [default] | Constraints |
|---|---|---|---|---|
key | Schema Object | Reference Object | AVRO Schema Object | The message key. NOTE: You can also use the reference object way. | OPTIONAL | - |
schemaIdLocation | string | If a Schema Registry is used when performing this operation, tells where the id of schema is stored (e.g. header or payload). | OPTIONAL | MUST NOT be specified if schemaRegistryUrl is not specified at the Server level |
schemaIdPayloadEncoding | string | Number of bytes or vendor specific values when schema id is encoded in payload (e.g confluent/ apicurio-legacy / apicurio-new). | OPTIONAL | MUST NOT be specified if schemaRegistryUrl is not specified at the Server level |
schemaLookupStrategy | string | Freeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied. | OPTIONAL | MUST NOT be specified if schemaRegistryUrl is not specified at the Server level |
bindingVersion | string | The version of this binding. If omitted, "latest" MUST be assumed. | OPTIONAL [latest] | - |
This object MUST contain only the properties defined above.
This example is valid for any Confluent compatible schema registry. Here we describe the implementation using the first 4 bytes in payload to store schema identifier.
1channels:
2 test:
3 address: test-topic
4 messages:
5 testMessage:
6 bindings:
7 kafka:
8 key:
9 type: string
10 enum: ['myKey']
11 schemaIdLocation: 'payload'
12 schemaIdPayloadEncoding: '4'
13 bindingVersion: '0.5.0'This is another example that describes the use if Apicurio schema registry. We describe the apicurio-new way of serializing without details on how it's implemented. We reference a specific lookup strategy that may be used to retrieve schema Id from registry during serialization.
1channels:
2 test:
3 address: test-topic
4 messages:
5 testMessage:
6 bindings:
7 kafka:
8 key:
9 type: string
10 enum: ['myKey']
11 schemaIdLocation: 'payload'
12 schemaIdPayloadEncoding: 'apicurio-new'
13 schemaLookupStrategy: 'TopicIdStrategy'
14 bindingVersion: '0.5.0'