kafka sink

Streams `log` events to Apache Kafka via the Kafka protocol.

The kafka sink streams log events to Apache Kafka via the Kafka protocol.

Config File

vector.toml (example)
vector.toml (schema)
vector.toml (specification)
[sinks.my_sink_id]
# REQUIRED - General
type = "kafka" # must be: "kafka"
inputs = ["my-source-id"]
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
key_field = "user_id"
topic = "topic-1234"
# OPTIONAL - General
encoding = "json" # no default, enum: "json" or "text"
healthcheck = true # default
# OPTIONAL - Buffer
[sinks.my_sink_id.buffer]
type = "memory" # default, enum: "memory" or "disk"
when_full = "block" # default, enum: "block" or "drop_newest"
max_size = 104900000 # no default, bytes, relevant when type = "disk"
num_items = 500 # default, events, relevant when type = "memory"

Options

Key

Type

Description

REQUIRED - General

type

string

The component type required must be: "kafka"

inputs

[string]

A list of upstream source or transform IDs. See Config Composition for more info. required example: ["my-source-id"]

bootstrap_servers

string

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself required example: (see above)

key_field

string

The field name to use for the topic key. If unspecified, the key will be randomly generated. If the field does not exist on the event, a blank value will be used. required example: "user_id"

topic

string

The Kafka topic name to write events to. required example: "topic-1234"

OPTIONAL - General

encoding

string

The encoding format used to serialize the events before flushing. The default is dynamic based on if the event is structured or not. See Encodings for more info. no default enum: "json" or "text"

healthcheck

bool

Enables/disables the sink healthcheck upon start. See Health Checks for more info. default: true

OPTIONAL - Buffer

buffer.type

string

The buffer's type / location. disk buffers are persistent and will be retained between restarts. default: "memory" enum: "memory" or "disk"

buffer.when_full

string

The behavior when the buffer becomes full. default: "block" enum: "block" or "drop_newest"

buffer.max_size

int

The maximum size of the buffer on the disk. Only relevant when type = "disk" no default example: 104900000 unit: bytes

buffer.num_items

int

The maximum number of events allowed in the buffer. Only relevant when type = "memory" default: 500 unit: events

How It Works

Delivery Guarantee

This component offers an at least once delivery guarantee if your pipeline is configured to achieve this.

Encodings

The kafka sink encodes events before writing them downstream. This is controlled via the encoding option which accepts the following options:

Encoding

Description

json

The payload will be encoded as a single JSON payload.

text

The payload will be encoded as new line delimited text, each line representing the value of the "message" key.

Dynamic encoding

By default, the encoding chosen is dynamic based on the explicit/implcit nature of the event's structure. For example, if this event is parsed (explicit structuring), Vector will use json to encode the structured data. If the event was not explicitly structured, the text encoding will be used.

To further explain why Vector adopts this default, take the simple example of accepting data over the tcp source and then connecting it directly to the kafka sink. It is less surprising that the outgoing data reflects the incoming data exactly since it was not explicitly structured.

Environment Variables

Environment variables are supported through all of Vector's configuration. Simply add ${MY_ENV_VAR} in your Vector configuration file and the variable will be replaced before being evaluated.

You can learn more in the Environment Variables section.

Health Checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization.

If the health check fails an error will be logged and Vector will proceed to start. If you'd like to exit immediately upon health check failure, you can pass the --require-healthy flag:

vector --config /etc/vector/vector.toml --require-healthy

And finally, if you'd like to disable health checks entirely for this sink you can set the healthcheck option to false.

Streaming

The kafka sink streams data on a real-time event-by-event basis. It does not batch data.

Troubleshooting

The best place to start with troubleshooting is to check the Vector logs. This is typically located at /var/log/vector.log, then proceed to follow the Troubleshooting Guide.

If the Troubleshooting Guide does not resolve your issue, please:

  1. Check for any open kafka_sink issues.

  2. If encountered a bug, please file a bug report.

  3. If encountered a missing feature, please file a feature request.

  4. If you need help, join our chat/forum community. You can post a question and search previous questions.

Resources