AWS Kinesis Firehose

Collect logs from AWS Kinesis Firehose

status: beta role: aggregator delivery: at-least-once egress: batch state: stateless output: log

Requirements

AWS Kinesis Firehose can only deliver data over HTTP. You will need to solve TLS termination by fronting Vector with a load balaner or configuring the tls.* options.

Configuration

access_key

common optional string
AWS Kinesis Firehose can be configured to pass along an access key to authenticate requests. If configured, access_key should be set to the same value. If not specified, vector will treat all requests as authenticated.

address

required string
The address to listen for connections on

record_compression

common optional string enum

The compression of records within the Firehose message.

Some services, like AWS CloudWatch Logs, will compress the events with gzip, before sending them AWS Kinesis Firehose. This option can be used to automatically decompress them before forwarding them to the next component.

Note that this is different from Content encoding option of the Firehose HTTP endpoint destination. That option controls the content encoding of the entire HTTP request.

Enum options string literal
OptionDescription
auto

Vector will try to determine the compression format of the object by looking at its file signature, also known as magic bytes.

Given that determining the encoding using magic bytes is not a perfect check, if the record fails to decompress with the discovered format, the record will be forwarded as-is. Thus, if you know the records will always be gzip encoded (for example if they are coming from AWS CloudWatch Logs) then you should prefer to set gzip here to have Vector reject any records that are not-gziped.

gzipGZIP format.
noneUncompressed.
default: text

tls

optional object
Configures the TLS options for incoming connections.

tls.ca_file

optional string literal
Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an in-line CA certificate in PEM format.

tls.crt_file

optional string literal
Absolute path to a certificate file used to identify this server, in DER or PEM format (X.509) or PKCS#12, or an in-line certificate in PEM format. If this is set, and is not a PKCS#12 archive, key_file must also be set. This is required if enabled is set to true.

tls.enabled

optional bool
Require TLS for incoming connections. If this is set, an identity certificate is also required.
default: false

tls.key_file

optional string literal
Absolute path to a private key file used to identify this server, in DER or PEM format (PKCS#8), or an in-line private key in PEM format.

tls.key_pass

optional string literal
Pass phrase used to unlock the encrypted key file. This has no effect unless key_file is set.

tls.verify_certificate

optional bool
If true, Vector will require a TLS certificate from the connecting host and terminate the connection if the certificate is not valid. If false (the default), Vector will not request a certificate from the client.
default: false

Output

Logs

line

One event will be published per incoming AWS Kinesis Firehose record.

Telemetry

Metrics

link

events_in_total

counter
The number of events accepted by this component either from tagged origin like file and uri, or cumulatively from other origins.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.
container_name optional
The name of the container from which the event originates.
file optional
The file from which the event originates.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the event originates.
peer_path optional
The pathname from which the event originates.
pod_name optional
The name of the pod from which the event originates.
uri optional
The sanitized URI from which the event originates.

events_out_total

counter
The total number of events emitted by this component.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.

processed_bytes_total

counter
The number of bytes processed by the component.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.
container_name optional
The name of the container from which the bytes originate.
file optional
The file from which the bytes originate.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the bytes originate.
peer_path optional
The pathname from which the bytes originate.
pod_name optional
The name of the pod from which the bytes originate.
uri optional
The sanitized URI from which the bytes originate.

request_automatic_decode_errors_total

counter
The total number of request errors for this component when it attempted to automatically discover and handle the content-encoding of incoming request data.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.

request_read_errors_total

counter
The total number of request read errors for this component.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.

requests_received_total

counter
The total number of requests received by this component.
component_kind required
The Vector component kind.
component_name required
The Vector component name.
component_type required
The Vector component type.

Examples

Given this event...
	{
		"requestId": "ed1d787c-b9e2-4631-92dc-8e7c9d26d804",
		"timestamp": 1600110760138,
		"records": [
			{
				"data": "H4sIABk1bV8AA52TzW7bMBCE734KQ2db/JdI3QzETS8FAtg91UGgyOuEqCQq5Mqua+TdS8lu0hYNUpQHAdoZDcn9tKfJdJo0EEL5AOtjB0kxTa4W68Xdp+VqtbheJrPB4A4t+EFiv6yzVLuHa+/6blARAr5UV+ihbH4vh/4+VN52aF37wdYIPkTDlyhF8SrabFsOWhIrtz+Dlnto8dV3Gp9RstshXKhMi0xpqk3GpNJccpFRKYw0WvCM5kIbzrVWipm4VK55rrSk44HGHLTx/lg2wxVYRiljVGWGCvPiuPRn2O60Se6P8UKbpOBZrulsk2xLhCEjljYJk2QFHeGU04KxQqpCsumcSko3SfQ+uoBnn8pTJmjKWZYyI0axAXx021G++bweS5136CpXj8WP6/UNYek5ycMOPPhReETsQkHI4XBIO2/bynZlXXkXwryrS9w536TWkab0XwED6e/tU2/R9eGS9NTD5VgEvnWwtQikcu0e/AO0FYyu4HpfwR3Gf2R0Btza9qxgiUNUISiLr30AP7fbyMzu7OWA803ynIzdfJ69B1EZpoVhsWMRZ8a5UVJoRoUyUlDNspxzZWiEnOXiXYiSvQOR5TnN/xsiNalmKZcy5Yr/yfB6+RZD/gbDC0IbOx8wQrMhxGGYx4lBW5X1wJBLkpO981jWf6EXogvIrm+rYYrKOn4Hgbg4b439/s8cFeVvcNwBtHBkOdWvQIdRnTxPfgCXvyEgSQQAAA=="
			}
		]
	}
...and this Vector configuration...
{
  "access_key": null,
  "address": "0.0.0.0:443",
  "record_compression": null,
  "tls": null,
  "type": null
}
...this Vector log event is produced:
{
  "message": "{\"messageType\":\"DATA_MESSAGE\",\"owner\":\"111111111111\",\"logGroup\":\"test\",\"logStream\":\"test\",\"subscriptionFilters\":[\"Destination\"],\"logEvents\":[{\"id\":\"35683658089614582423604394983260738922885519999578275840\",\"timestamp\":1600110569039,\"message\":\"{\\\"bytes\\\":26780,\\\"datetime\\\":\\\"14/Sep/2020:11:45:41 -0400\\\",\\\"host\\\":\\\"157.130.216.193\\\",\\\"method\\\":\\\"PUT\\\",\\\"protocol\\\":\\\"HTTP/1.0\\\",\\\"referer\\\":\\\"https://www.principalcross-platform.io/markets/ubiquitous\\\",\\\"request\\\":\\\"/expedite/convergence\\\",\\\"source_type\\\":\\\"stdin\\\",\\\"status\\\":301,\\\"user-identifier\\\":\\\"-\\\"}\"},{\"id\":\"35683658089659183914001456229543810359430816722590236673\",\"timestamp\":1600110569041,\"message\":\"{\\\"bytes\\\":17707,\\\"datetime\\\":\\\"14/Sep/2020:11:45:41 -0400\\\",\\\"host\\\":\\\"109.81.244.252\\\",\\\"method\\\":\\\"GET\\\",\\\"protocol\\\":\\\"HTTP/2.0\\\",\\\"referer\\\":\\\"http://www.investormission-critical.io/24/7/vortals\\\",\\\"request\\\":\\\"/scale/functionalities/optimize\\\",\\\"source_type\\\":\\\"stdin\\\",\\\"status\\\":502,\\\"user-identifier\\\":\\\"feeney1708\\\"}\"}]}",
  "request_id": "ed1d787c-b9e2-4631-92dc-8e7c9d26d804",
  "source_arn": "arn:aws:firehose:us-east-1:111111111111:deliverystream/test",
  "timestamp": "2020-09-14T19:12:40.138Z"
}

How it works

Context

By default, the aws_kinesis_firehose source augments events with helpful context keys.

State

This component is stateless, meaning its behavior is consistent across each input.

Forwarding CloudWatch Log events

This source is the recommended way to ingest logs from AWS CloudWatch logs via AWS CloudWatch Log subscriptions. To set this up:

  1. Deploy vector with a publicly exposed HTTP endpoint using this source. You will likely also want to use the aws_cloudwatch_logs_subscription_parser transform to extract the log events. Make sure to set the access_key to secure this endpoint. Your configuration might look something like:

     [sources.firehose]
     # General
     type = "aws_kinesis_firehose"
     address = "127.0.0.1:9000"
     access_key = "secret"
    
     [transforms.cloudwatch]
     type = "aws_cloudwatch_logs_subscription_parser"
     inputs = ["firehose"]
    
     [sinks.console]
     type = "console"
     inputs = ["cloudwatch"]
     encoding.codec = "json"
    
  2. Create a Kinesis Firewatch delivery stream in the region where the CloudWatch Logs groups exist that you want to ingest.

  3. Set the stream to forward to your Vector instance via its HTTP Endpoint destination. Make sure to configure the same access_key you set earlier.

  4. Setup a CloudWatch Logs subscription to forward the events to your delivery stream

Transport Layer Security (TLS)

Vector uses OpenSSL for TLS protocols. You can adjust TLS behavior via the tls.* options.