AMQP

Send events to AMQP 0.9.1 compatible brokers like RabbitMQ

status: beta delivery: at-least-once acknowledgements: no egress: dynamic state: stateless

Configuration

Example configurations

{
  "sinks": {
    "my_sink_id": {
      "type": "amqp",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "connection": null,
      "exchange": "message_exchange",
      "compression": "none",
      "encoding": {
        "codec": "json"
      },
      "healthcheck": null
    }
  }
}
[sinks.my_sink_id]
type = "amqp"
inputs = [ "my-source-or-transform-id" ]
exchange = "message_exchange"
compression = "none"

  [sinks.my_sink_id.encoding]
  codec = "json"
---
sinks:
  my_sink_id:
    type: amqp
    inputs:
      - my-source-or-transform-id
    connection: null
    exchange: message_exchange
    compression: none
    encoding:
      codec: json
    healthcheck: null
{
  "sinks": {
    "my_sink_id": {
      "type": "amqp",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "connection": null,
      "exchange": "message_exchange",
      "buffer": null,
      "compression": "none",
      "encoding": {
        "codec": "json"
      },
      "healthcheck": null,
      "tls": null,
      "routing_key": "{{ field_a }}-{{ field_b }}"
    }
  }
}
[sinks.my_sink_id]
type = "amqp"
inputs = [ "my-source-or-transform-id" ]
exchange = "message_exchange"
compression = "none"
routing_key = "{{ field_a }}-{{ field_b }}"

  [sinks.my_sink_id.encoding]
  codec = "json"
---
sinks:
  my_sink_id:
    type: amqp
    inputs:
      - my-source-or-transform-id
    connection: null
    exchange: message_exchange
    buffer: null
    compression: none
    encoding:
      codec: json
    healthcheck: null
    tls: null
    routing_key: "{{ field_a }}-{{ field_b }}"

buffer

optional object

Configures the sink specific buffer behavior.

More information about the individual buffer types, and buffer behavior, can be found in the Buffering Model section.

buffer.max_events

common optional uint
The maximum number of events allowed in the buffer.
Relevant when: type = "memory"
default: 500 (events)

buffer.max_size

required uint
The maximum size of the buffer on the disk. Must be at least ~256 megabytes (268435488 bytes).
Relevant when: type = "disk"
Examples
268435488

buffer.type

common optional string literal enum
The type of buffer to use.
Enum options
OptionDescription
disk

Events are buffered on disk.

This is less performant, but more durable. Data that has been synchronized to disk will not be lost if Vector is restarted forcefully or crashes.

Data is synchronized to disk every 500ms.

memory

Events are buffered in memory.

This is more performant, but less durable. Data will be lost if Vector is restarted forcefully or crashes.

default: memory

buffer.when_full

optional string literal enum
The behavior when the buffer becomes full.
Enum options
OptionDescription
block

Waits for capacity in the buffer.

This will cause backpressure to propagate to upstream components, which can cause data to pile up on the edge.

drop_newest

Drops the event without waiting for capacity in the buffer.

The data is lost. This should only be used when performance is the highest priority.

default: block

compression

common optional string literal enum

The compression strategy used to compress the encoded event data before transmission.

The default compression level of the chosen algorithm is used. Some cloud storage API clients and browsers will handle decompression transparently, so files may not always appear to be compressed depending how they are accessed.

Enum options string literal
OptionDescription
gzipGzip standard DEFLATE compression. Compression level is 6 unless otherwise specified.
lz4lz4 compression.
noneNo compression.
snappySnappy compression.
zstdzstd compression. Compression level is 3 unless otherwise specified. Dictionaries are not supported.
default: none

connection

required object
Connection options for the AMQP sink.

connection.connection_string

required string literal
Connection string to use when connecting to an AMQP server in the format of amqp://user:password@host:port/vhost?timeout=seconds. The default vhost can be represented as %2f.
Examples
"amqp://user:password@127.0.0.1:5672/%2f?timeout=10"

encoding

required object
Configures the encoding specific sink behavior.

encoding.codec

required string literal enum
The encoding codec used to serialize the events before outputting.
Enum options
OptionDescription
jsonJSON encoded event.
textThe message field from the event.
Examples
"json"
"text"

encoding.except_fields

optional [string]
Prevent the sink from encoding the specified fields.

encoding.only_fields

optional [string]
Makes the sink encode only the specified fields.

encoding.timestamp_format

optional string literal enum
How to format event timestamps.
Enum options
OptionDescription
rfc3339Formats as a RFC3339 string
unixFormats as a unix timestamp
default: rfc3339

exchange

required string literal
The exchange to publish messages to.
Examples
"message_exchange"

healthcheck

common optional object
Health check options for the sink.

healthcheck.enabled

common optional bool
Enables/disables the healthcheck upon Vector boot.
default: true

inputs

required [string]

A list of upstream source or transform IDs. Wildcards (*) are supported.

See configuration for more info.

Array string literal
Examples
[
  "my-source-or-transform-id",
  "prefix-*"
]

routing_key

optional string literal
Template use to generate a routing key which corresponds to a queue binding.
Examples
"{{ field_a }}-{{ field_b }}"

tls

optional object
Configures the TLS options for outgoing connections.

tls.alpn_protocols

optional [string]
Sets the list of supported ALPN protocols, which are used during negotiation with peer. Prioritized in the order they are defined.

tls.ca_file

optional string literal
Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an inline CA certificate in PEM format.
Examples
"/path/to/certificate_authority.crt"

tls.crt_file

common optional string literal
Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12, or an inline certificate in PEM format. If this is set and is not a PKCS#12 archive, key_file must also be set.
Examples
"/path/to/host_certificate.crt"

tls.enabled

common optional bool
Enable TLS during connections to the remote.
default: false

tls.key_file

common optional string literal
Absolute path to a private key file used to identify this connection, in DER or PEM format (PKCS#8), or an inline private key in PEM format. If this is set, crt_file must also be set.
Examples
"/path/to/host_certificate.key"

tls.key_pass

optional string literal
Pass phrase used to unlock the encrypted key file. This has no effect unless key_file is set.
Examples
"${KEY_PASS_ENV_VAR}"
"PassWord1"

Telemetry

Metrics

link

buffer_byte_size

gauge
The number of bytes current in the buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_discarded_events_total

counter
The number of events dropped by this non-blocking buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_events

gauge
The number of events currently in the buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_received_event_bytes_total

counter
The number of bytes received by this buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_received_events_total

counter
The number of events received by this buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_sent_event_bytes_total

counter
The number of bytes sent by this buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

buffer_sent_events_total

counter
The number of events sent by this buffer.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

component_received_event_bytes_total

counter
The number of event bytes accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_count

histogram

A histogram of the number of events passed in each internal batch in Vector’s internal topology.

Note that this is separate than sink-level batching. It is mostly useful for low level debugging performance issues in Vector due to small internal batches.

component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

component_received_events_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

events_discarded_total

counter
The total number of events discarded by this component.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.
reason
The type of the error

events_in_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins. This metric is deprecated and will be removed in a future version. Use component_received_events_total instead.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
container_name optional
The name of the container from which the data originated.
file optional
The file from which the data originated.
host optional
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the data originated.
peer_path optional
The pathname from which the data originated.
pid optional
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the data originated.
uri optional
The sanitized URI from which the data originated.

processing_errors_total

counter
The total number of processing errors encountered by this component. This metric is deprecated in favor of component_errors_total.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
error_type
The type of the error
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

utilization

gauge
A ratio from 0 to 1 of the load on a component. A value of 0 would indicate a completely idle component that is simply waiting for input. A value of 1 would indicate a that is never idle. This value is updated every 5 seconds.
component_id
The Vector component ID.
component_kind
The Vector component kind.
component_name
Deprecated, use component_id instead. The value is the same as component_id.
component_type
The Vector component type.
host optional
The hostname of the system Vector is running on.
pid optional
The process ID of the Vector instance.

How it works

Health checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.

Require health checks

If you’d like to exit immediately upon a health check failure, you can pass the --require-healthy flag:

vector --config /etc/vector/vector.toml --require-healthy

Disable health checks

If you’d like to disable health checks for this sink you can set the healthcheck option to false.

Lapin

The amqp source and sink uses lapin under the hood. This is a reliable pure rust library that facilitates communication with AMPQ servers such as RabbitMQ.

State

This component is stateless, meaning its behavior is consistent across each input.

Transport Layer Security (TLS)

Vector uses OpenSSL for TLS protocols due to OpenSSL’s maturity. You can enable and adjust TLS behavior using the tls.* options.