kafka source

Ingests data through Kafka 0.9 or later and outputs `log` events.

The kafka source is in beta. Please see the current enhancements and bugs for known issues. We kindly ask that you add any missing issues as it will help shape the roadmap of this component.

The kafka source ingests data through Kafka 0.9 or later and outputs log events.

Config File

vector.toml (simple)
vector.toml (advanced)
[sources.my_source_id]
type = "kafka" # must be: "kafka"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
group_id = "consumer-group-name"
topics = ["topic-1", "topic-2", "^(prefix1|prefix2)-.+"]
# For a complete list of options see the "advanced" tab above.
[sources.kafka_source]
# The component type
#
# * required
# * no default
# * must be: "kafka"
type = "kafka"
# A comma-separated list of host and port pairs that are the addresses of the
# Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to
# initially to bootstrap itself.
#
# * required
# * no default
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
# The consumer group name to be used to consume events from Kafka.
#
# * required
# * no default
group_id = "consumer-group-name"
# The Kafka topics names to read events from. Regex is supported if the topic
# begins with `^`.
#
# * required
# * no default
topics = ["topic-1", "topic-2", "^(prefix1|prefix2)-.+"]
# If offsets for consumer group do not exist, set them using this strategy.
# librdkafka documentation for `auto.offset.reset` option for explanation.
#
# * optional
# * no default
auto_offset_reset = "smallest"
auto_offset_reset = "earliest"
auto_offset_reset = "beginning"
auto_offset_reset = "largest"
auto_offset_reset = "latest"
auto_offset_reset = "end"
auto_offset_reset = "error"
# The log field name to use for the topic key. If unspecified, the key would
# not be added to the log event. If the message has null key, then this field
# would not be added to the log event.
#
# * optional
# * no default
key_field = "user_id"
# The Kafka session timeout in milliseconds.
#
# * optional
# * no default
# * unit: milliseconds
session_timeout_ms = 5000
session_timeout_ms = 10000

Examples

Given the following message in a Kafka topic:

stdin
2019-02-13T19:48:34+00:00 [info] Started GET "/" for 127.0.0.1

A log event will be emitted with the following structure:

log
{
"timestamp": <timestamp> # current time,
"message": "2019-02-13T19:48:34+00:00 [info] Started GET "/" for 127.0.0.1",
"host": "10.2.22.122" # current nostname
}

How It Works

Delivery Guarantee

This component offers an at least once delivery guarantee if your pipeline is configured to achieve this.

Environment Variables

Environment variables are supported through all of Vector's configuration. Simply add ${MY_ENV_VAR} in your Vector configuration file and the variable will be replaced before being evaluated.

You can learn more in the Environment Variables section.

Troubleshooting

The best place to start with troubleshooting is to check the Vector logs. This is typically located at /var/log/vector.log, then proceed to follow the Troubleshooting Guide.

If the Troubleshooting Guide does not resolve your issue, please:

  1. If encountered a bug, please file a bug report.

  2. If encountered a missing feature, please file a feature request.

  3. If you need help, join our chat/forum community. You can post a question and search previous questions.

Resources