kafka source

Ingests data through Kafka 0.9 or later and outputs `log` events.

The kafka source is in beta. Please see the current enhancements and bugs for known issues. We kindly ask that you add any missing issues as it will help shape the roadmap of this component.

The kafka source ingests data through Kafka 0.9 or later and outputs log events.

Config File

vector.toml (example)
vector.toml (schema)
vector.toml (specification)
[sources.my_source_id]
type = "kafka" # must be: "kafka"
bootstrap_servers = "10.14.22.123:9092,10.14.23.332:9092"
group_id = "consumer-group-name"
topics = ["topic-1", "topic-2", "^(prefix1|prefix2)-.+"]
auto_offset_reset = "smallest"
key_field = "user_id" # no default
session_timeout_ms = 5000 # milliseconds

Options

Key

Type

Description

REQUIRED

type

string

The component type required must be: "kafka"

bootstrap_servers

string

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself. required example: (see above)

group_id

string

The consumer group name to be used to consume events from Kafka. required example: "consumer-group-name"

topics

[string]

The Kafka topics names to read events from. Regex is supported if the topic begins with ^. required example: (see above)

OPTIONAL

auto_offset_reset

string

If offsets for consumer group do not exist, set them using this strategy. librdkafka documentation for auto.offset.reset option for explanation. default: "largest"

key_field

string

The log field name to use for the topic key. If unspecified, the key would not be added to the log event. If the message has null key, then this field would not be added to the log event. no default example: "user_id"

session_timeout_ms

int

The Kafka session timeout in milliseconds. default: 10000 unit: milliseconds

Examples

Given the following message in a Kafka topic:

stdin
2019-02-13T19:48:34+00:00 [info] Started GET "/" for 127.0.0.1

A log event will be emitted with the following structure:

log
{
"timestamp": <timestamp> # current time,
"message": "2019-02-13T19:48:34+00:00 [info] Started GET "/" for 127.0.0.1",
"host": "10.2.22.122" # current nostname
}

How It Works

Delivery Guarantee

This component offers an at least once delivery guarantee if your pipeline is configured to achieve this.

Environment Variables

Environment variables are supported through all of Vector's configuration. Simply add ${MY_ENV_VAR} in your Vector configuration file and the variable will be replaced before being evaluated.

You can learn more in the Environment Variables section.

Troubleshooting

The best place to start with troubleshooting is to check the Vector logs. This is typically located at /var/log/vector.log, then proceed to follow the Troubleshooting Guide.

If the Troubleshooting Guide does not resolve your issue, please:

  1. If encountered a bug, please file a bug report.

  2. If encountered a missing feature, please file a feature request.

  3. If you need help, join our chat/forum community. You can post a question and search previous questions.

Resources