kafka

Found an error? Have a suggestion?Edit this page on GitHub

Kafka Bindings

This document defines how to describe Kafka-specific information on AsyncAPI.

Version

Current version is 0.5.0.

Server Binding Object

This object contains information about the server representation in Kafka.

Fixed Fields
Field NameTypeDescriptionApplicability [default]Constraints
schemaRegistryUrlstring (url)API URL for the Schema Registry used when producing Kafka messages (if a Schema Registry was used)OPTIONAL-
schemaRegistryVendorstringThe vendor of Schema Registry and Kafka serdes library that should be used (e.g. apicurio, confluent, ibm, or karapace)OPTIONALMUST NOT be specified if schemaRegistryUrl is not specified
bindingVersionstringThe version of this binding.OPTIONAL [latest]
Example
1servers:
2  production:
3    bindings:
4      kafka:
5        schemaRegistryUrl: 'https://my-schema-registry.com'
6        schemaRegistryVendor: 'confluent'
7        bindingVersion: '0.5.0'

Channel Binding Object

This object contains information about the channel representation in Kafka (eg. a Kafka topic).

Fixed Fields
Field NameTypeDescriptionApplicability [default]Constraints
topicstringKafka topic name if different from channel name.OPTIONAL-
partitionsintegerNumber of partitions configured on this topic (useful to know how many parallel consumers you may run).OPTIONALMust be positive
replicasintegerNumber of replicas configured on this topic.OPTIONALMUST be positive
topicConfigurationTopicConfiguration ObjectTopic configuration properties that are relevant for the API.OPTIONAL-
bindingVersionstringThe version of this binding. If omitted, "latest" MUST be assumed.OPTIONAL [latest]-

This object MUST contain only the properties defined above.

Example
1channels:
2  user-signedup:
3    bindings:
4      kafka:
5        topic: 'my-specific-topic-name'
6        partitions: 20
7        replicas: 3
8        topicConfiguration:
9          cleanup.policy: ["delete", "compact"]
10          retention.ms: 604800000
11          retention.bytes: 1000000000
12          delete.retention.ms: 86400000
13          max.message.bytes: 1048588
14        bindingVersion: '0.5.0'

TopicConfiguration Object

This objects contains information about the API relevant topic configuration in Kafka.

Field NameTypeDescriptionApplicability [default]Constraints
cleanup.policyarrayThe cleanup.policy configuration option.OPTIONALarray may only contain delete and/or compact
retention.mslongThe retention.ms configuration option.OPTIONALsee kafka documentation
retention.byteslongThe retention.bytes configuration option.OPTIONALsee kafka documentation
delete.retention.mslongThe delete.retention.ms configuration option.OPTIONALsee kafka documentation
max.message.bytesintegerThe max.message.bytes configuration option.OPTIONALsee kafka documentation
confluent.key.schema.validationbooleanIt shows whether the schema validation for the message key is enabled. Vendor specific config.OPTIONAL-
confluent.key.subject.name.strategystringThe name of the schema lookup strategy for the message key. Vendor specific config.OPTIONALClients should default to the vendor default if not supplied.
confluent.value.schema.validationbooleanIt shows whether the schema validation for the message value is enabled. Vendor specific config.OPTIONAL-
confluent.value.subject.name.strategystringThe name of the schema lookup strategy for the message value. Vendor specific config.OPTIONALClients should default to the vendor default if not supplied.

This object MAY contain the properties defined above including optional additional properties.

Example
1topicConfiguration:
2  cleanup.policy: ["delete", "compact"]
3  retention.ms: 604800000
4  retention.bytes: 1000000000
5  delete.retention.ms: 86400000
6  max.message.bytes: 1048588
7  confluent.key.schema.validation: true
8  confluent.key.subject.name.strategy: "TopicNameStrategy"
9  confluent.value.schema.validation: true
10  confluent.value.subject.name.strategy: "TopicNameStrategy"

Operation Binding Object

This object contains information about the operation representation in Kafka (eg. the way to consume messages)

Fixed Fields
Field NameTypeDescriptionApplicability [default]Constraints
groupIdSchema Object | Reference ObjectId of the consumer group.OPTIONAL-
clientIdSchema Object | Reference ObjectId of the consumer inside a consumer group.OPTIONAL-
bindingVersionstringThe version of this binding. If omitted, "latest" MUST be assumed.OPTIONAL [latest]-

This object MUST contain only the properties defined above.

Example
1channels:
2  user-signedup:
3operations:
4  userSignup:
5    action: receive
6    bindings:
7      kafka:
8        groupId:
9          type: string
10          enum: ['myGroupId']
11        clientId:
12          type: string
13          enum: ['myClientId']
14        bindingVersion: '0.5.0'

Message Binding Object

This object contains information about the message representation in Kafka.

Fixed Fields
Field NameTypeDescriptionApplicability [default]Constraints
keySchema Object | Reference Object | AVRO Schema ObjectThe message key. NOTE: You can also use the reference object way.OPTIONAL-
schemaIdLocationstringIf a Schema Registry is used when performing this operation, tells where the id of schema is stored (e.g. header or payload).OPTIONALMUST NOT be specified if schemaRegistryUrl is not specified at the Server level
schemaIdPayloadEncodingstringNumber of bytes or vendor specific values when schema id is encoded in payload (e.g confluent/ apicurio-legacy / apicurio-new).OPTIONALMUST NOT be specified if schemaRegistryUrl is not specified at the Server level
schemaLookupStrategystringFreeform string for any naming strategy class to use. Clients should default to the vendor default if not supplied.OPTIONALMUST NOT be specified if schemaRegistryUrl is not specified at the Server level
bindingVersionstringThe version of this binding. If omitted, "latest" MUST be assumed.OPTIONAL [latest]-

This object MUST contain only the properties defined above.

This example is valid for any Confluent compatible schema registry. Here we describe the implementation using the first 4 bytes in payload to store schema identifier.

1channels:
2  test:
3    address: test-topic
4    messages:
5      testMessage:
6        bindings:
7          kafka:
8            key:
9              type: string
10              enum: ['myKey']
11            schemaIdLocation: 'payload'
12            schemaIdPayloadEncoding: '4'
13            bindingVersion: '0.5.0'

This is another example that describes the use if Apicurio schema registry. We describe the apicurio-new way of serializing without details on how it's implemented. We reference a specific lookup strategy that may be used to retrieve schema Id from registry during serialization.

1channels:
2  test:
3    address: test-topic
4    messages:
5      testMessage:
6        bindings:
7          kafka:
8            key:
9              type: string
10              enum: ['myKey']
11            schemaIdLocation: 'payload'
12            schemaIdPayloadEncoding: 'apicurio-new'
13            schemaLookupStrategy: 'TopicIdStrategy'
14            bindingVersion: '0.5.0'
Was this helpful?
Help us improve the docs by adding your contribution.
OR
Github:AsyncAPICreate Issue on GitHub