GCP PubSub

Publish observability events to GCP’s PubSub messaging system

status: beta delivery: at-least-once egress: batch state: stateless

Configuration

Example configurations

{
  "sinks": {
    "my_sink_id": {
      "type": "gcp_pubsub",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "credentials_path": "/path/to/credentials.json",
      "project": "vector-123456",
      "topic": "this-is-a-topic"
    }
  }
}
[sinks.my_sink_id]
type = "gcp_pubsub"
inputs = [ "my-source-or-transform-id" ]
credentials_path = "/path/to/credentials.json"
project = "vector-123456"
topic = "this-is-a-topic"
---
sinks:
  my_sink_id:
    type: gcp_pubsub
    inputs:
      - my-source-or-transform-id
    credentials_path: /path/to/credentials.json
    project: vector-123456
    encoding: null
    healthcheck: null
    topic: this-is-a-topic
{
  "sinks": {
    "my_sink_id": {
      "type": "gcp_pubsub",
      "inputs": [
        "my-source-or-transform-id"
      ],
      "api_key": "${GCP_API_KEY}",
      "credentials_path": "/path/to/credentials.json",
      "endpoint": "https://pubsub.googleapis.com",
      "project": "vector-123456",
      "topic": "this-is-a-topic"
    }
  }
}
[sinks.my_sink_id]
type = "gcp_pubsub"
inputs = [ "my-source-or-transform-id" ]
api_key = "${GCP_API_KEY}"
credentials_path = "/path/to/credentials.json"
endpoint = "https://pubsub.googleapis.com"
project = "vector-123456"
topic = "this-is-a-topic"
---
sinks:
  my_sink_id:
    type: gcp_pubsub
    inputs:
      - my-source-or-transform-id
    api_key: ${GCP_API_KEY}
    credentials_path: /path/to/credentials.json
    endpoint: https://pubsub.googleapis.com
    project: vector-123456
    buffer: null
    batch: null
    encoding: null
    healthcheck: null
    request: null
    tls: null
    proxy: null
    topic: this-is-a-topic

api_key

optional string literal
A Google Cloud API key used to authenticate access the pubsub project and topic. Either this or credentials_path must be set.
Examples
"${GCP_API_KEY}"
"ef8d5de700e7989468166c40fc8a0ccd"

batch

optional object
Configures the sink batching behavior.

batch.max_bytes

optional uint
The maximum size of a batch, in bytes, before it is flushed.
default: 1.048576e+07 (bytes)

batch.max_events

optional uint
The maximum size of a batch, in events, before it is flushed.
default: 1000 (events)

batch.timeout_secs

optional uint
The maximum age of a batch before it is flushed.
default: 1 (seconds)

buffer

optional object
Configures the sink specific buffer behavior.

buffer.max_events

optional uint
The maximum number of events allowed in the buffer.
Relevant when: type = "memory"
default: 500 (events)

buffer.max_size

optional uint
The maximum size of the buffer on the disk.
Relevant when: type = "disk"
Examples
104900000

buffer.type

optional string literal enum
The buffer’s type and storage mechanism.
Enum options
OptionDescription
diskStores the sink’s buffer on disk. This is less performant, but durable. Data will not be lost between restarts. Will also hold data in memory to enhance performance. WARNING: This may stall the sink if disk performance isn’t on par with the throughput. For comparison, AWS gp2 volumes are usually too slow for common cases.
memoryStores the sink’s buffer in memory. This is more performant, but less durable. Data will be lost if Vector is restarted forcefully.
default: memory

buffer.when_full

optional string literal enum
The behavior when the buffer becomes full.
Enum options
OptionDescription
blockApplies back pressure when the buffer is full. This prevents data loss, but will cause data to pile up on the edge.
drop_newestDrops new data as it’s received. This data is lost. This should be used when performance is the highest priority.
default: block

credentials_path

common optional string literal

The filename for a Google Cloud service account credentials JSON file used to authenticate access to the pubsub project and topic. If this is unset, Vector checks the GOOGLE_APPLICATION_CREDENTIALS environment variable for a filename.

If no filename is named, Vector will attempt to fetch an instance service account for the compute instance the program is running on. If Vector is not running on a GCE instance, you must define a credentials file as above.

Examples
"/path/to/credentials.json"

encoding

common optional object
Configures the encoding specific sink behavior.

encoding.except_fields

optional [string]
Prevent the sink from encoding the specified fields.

encoding.only_fields

optional [string]
Makes the sink encode only the specified fields.

encoding.timestamp_format

optional string literal enum
How to format event timestamps.
Enum options
OptionDescription
rfc3339Formats as a RFC3339 string
unixFormats as a unix timestamp
default: rfc3339

endpoint

optional string literal
The endpoint to which to send data.
Examples
"https://us-central1-pubsub.googleapis.com"
default: https://pubsub.googleapis.com

healthcheck

common optional object
Health check options for the sink.

healthcheck.enabled

optional bool
Enables/disables the healthcheck upon Vector boot.
default: true

inputs

required [string]

A list of upstream source or transform IDs. Wildcards (*) are supported.

See configuration for more info.

Array string literal
Examples
[
  "my-source-or-transform-id",
  "prefix-*"
]

project

required string literal
The project name to which to publish logs.
Examples
"vector-123456"

proxy

optional object
Configures an HTTP(S) proxy for Vector to use. By default, the globally configured proxy is used.

proxy.enabled

optional bool
If false the proxy will be disabled.
default: true

proxy.http

optional string literal
The URL to proxy HTTP requests through.
Examples
"http://foo.bar:3128"

proxy.https

optional string literal
The URL to proxy HTTPS requests through.
Examples
"http://foo.bar:3128"

proxy.no_proxy

optional [string]

A list of hosts to avoid proxying. Allowed patterns here include:

PatternExample match
Domain namesexample.com matches requests to example.com
Wildcard domains.example.com matches requests to example.com and its subdomains
IP addresses127.0.0.1 matches requests to 127.0.0.1
CIDR blocks192.168.0.0./16 matches requests to any IP addresses in this range
Splat* matches all hosts

request

optional object
Configures the sink request behavior.
Configure the adaptive concurrency algorithms. These values have been tuned by optimizing simulated results. In general you should not need to adjust these.
The fraction of the current value to set the new concurrency limit when decreasing the limit. Valid values are greater than 0 and less than 1. Smaller values cause the algorithm to scale back rapidly when latency increases. Note that the new limit is rounded down after applying this ratio.
default: 0.9
The adaptive concurrency algorithm uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with the current RTT. This value controls how heavily new measurements are weighted compared to older ones. Valid values are greater than 0 and less than 1. Smaller values cause this reference to adjust more slowly, which may be useful if a service has unusually high response variability.
default: 0.7
When calculating the past RTT average, we also compute a secondary "deviation" value that indicates how variable those values are. We use that deviation when comparing the past RTT average to the current measurements, so we can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to an appropriate range. Valid values are greater than or equal to 0, and we expect reasonable values to range from 1.0 to 3.0. Larger values cause the algorithm to ignore larger increases in the RTT.
default: 2

request.concurrency

optional uint
The maximum number of in-flight requests allowed at any given time, or “adaptive” to allow Vector to automatically set the limit based on current network and service conditions.
The time window, in seconds, used for the rate_limit_num option.
default: 1 (seconds)
The maximum number of requests allowed within the rate_limit_duration_secs time window.
default: 9.223372036854776e+18
The maximum number of retries to make for failed requests. The default, for all intents and purposes, represents an infinite number of retries.
default: 1.8446744073709552e+19
The amount of time to wait before attempting the first retry for a failed request. Once, the first retry has failed the fibonacci sequence will be used to select future backoffs.
default: 1 (seconds)
The maximum amount of time, in seconds, to wait between retries.
default: 3600 (seconds)
The maximum time a request can take before being aborted. It is highly recommended that you do not lower this value below the service’s internal timeout, as this could create orphaned requests, pile on retries, and result in duplicate data downstream.
default: 60 (seconds)

tls

optional object
Configures the TLS options for incoming connections.

tls.ca_file

optional string literal
Absolute path to an additional CA certificate file, in DER or PEM format (X.509), or an inline CA certificate in PEM format.
Examples
"/path/to/certificate_authority.crt"

tls.crt_file

optional string literal
Absolute path to a certificate file used to identify this connection, in DER or PEM format (X.509) or PKCS#12, or an inline certificate in PEM format. If this is set and is not a PKCS#12 archive, key_file must also be set.
Examples
"/path/to/host_certificate.crt"

tls.key_file

optional string literal
Absolute path to a private key file used to identify this connection, in DER or PEM format (PKCS#8), or an inline private key in PEM format. If this is set, crt_file must also be set.
Examples
"/path/to/host_certificate.key"

tls.key_pass

optional string literal
Pass phrase used to unlock the encrypted key file. This has no effect unless key_file is set.
Examples
"${KEY_PASS_ENV_VAR}"
"PassWord1"

tls.verify_hostname

optional bool
If true (the default), Vector will validate the configured remote host name against the remote host’s TLS certificate. Do NOT set this to false unless you understand the risks of not verifying the remote hostname.
default: true

topic

required string literal
The topic within the project to which to publish logs.
Examples
"this-is-a-topic"

Environment variables

GOOGLE_APPLICATION_CREDENTIALS

common optional string literal
The filename for a Google Cloud service account credentials JSON file used for authentication.
Examples
/path/to/credentials.json

Telemetry

Metrics

link

component_received_event_bytes_total

counter
The number of event bytes accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the event originates.
file optional
The file from which the event originates.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the event originates.
peer_path optional
The pathname from which the event originates.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the event originates.
uri optional
The sanitized URI from which the event originates.

component_received_events_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the event originates.
file optional
The file from which the event originates.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the event originates.
peer_path optional
The pathname from which the event originates.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the event originates.
uri optional
The sanitized URI from which the event originates.

component_sent_bytes_total

counter
The number of raw bytes sent by this component to destination sinks.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
endpoint optional
The endpoint that the bytes were sent to. For HTTP, this will be the host and path only, excluding the query string.
file optional
The absolute path of the destination file.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.
protocol required
The protocol used to send the bytes.

component_sent_event_bytes_total

counter
The total number of event bytes emitted by this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

component_sent_events_total

counter
The total number of events emitted by this component.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

events_in_total

counter
The number of events accepted by this component either from tagged origins like file and uri, or cumulatively from other origins. This metric is deprecated and will be removed in a future version. Use component_received_events_total instead.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
container_name optional
The name of the container from which the event originates.
file optional
The file from which the event originates.
host required
The hostname of the system Vector is running on.
mode optional
The connection mode used by the component.
peer_addr optional
The IP from which the event originates.
peer_path optional
The pathname from which the event originates.
pid required
The process ID of the Vector instance.
pod_name optional
The name of the pod from which the event originates.
uri optional
The sanitized URI from which the event originates.

events_out_total

counter
The total number of events emitted by this component. This metric is deprecated and will be removed in a future version. Use component_sent_events_total instead.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

utilization

gauge
A ratio from 0 to 1 of the load on a component. A value of 0 would indicate a completely idle component that is simply waiting for input. A value of 1 would indicate a that is never idle. This value is updated every 5 seconds.
component_id required
The Vector component ID.
component_kind required
The Vector component kind.
component_name required
Deprecated, use component_id instead. The value is the same as component_id.
component_type required
The Vector component type.
host required
The hostname of the system Vector is running on.
pid required
The process ID of the Vector instance.

Permissions

Platform: Google Cloud Platform
Relevant policies
PolicyRequired forRequired when
pubsub.topics.get
  • healthcheck
pubsub.topics.publish
  • operation

How it works

Buffers and batches

This component buffers & batches data as shown in the diagram above. You’ll notice that Vector treats these concepts differently, instead of treating them as global concepts, Vector treats them as sink specific concepts. This isolates sinks, ensuring services disruptions are contained and delivery guarantees are honored.

Batches are flushed when 1 of 2 conditions are met:

  1. The batch age meets or exceeds the configured timeout_secs.
  2. The batch size meets or exceeds the configured max_size or max_events.

Buffers are controlled via the buffer.* options.

GCP Authentication

GCP offers a variety of authentication methods and Vector is concerned with the server to server methods and will find credentials in the following order:

  1. If the credentials_path option is set.
  2. If the api_key option is set.
  3. If the GOOGLE_APPLICATION_CREDENTIALS environment variable is set.
  4. Finally, Vector will check for an instance service account.

If credentials aren’t found, Vector’s health checks fail and an error is logged.

Health checks

Health checks ensure that the downstream service is accessible and ready to accept data. This check is performed upon sink initialization. If the health check fails an error will be logged and Vector will proceed to start.

Require health checks

If you’d like to exit immediately upon a health check failure, you can pass the --require-healthy flag:

vector --config /etc/vector/vector.toml --require-healthy

Disable health checks

If you’d like to disable health checks for this sink you can set the healthcheck option to false.

Partitioning

Vector supports dynamic configuration values through a simple template syntax. If an option supports templating, it will be noted with a badge and you can use event fields to create dynamic values. For example:

[sinks.my-sink]
dynamic_option = "application={{ application_id }}"

In the above example, the application_id for each event will be used to partition outgoing data.

Rate limits & adapative concurrency

Adaptive Request Concurrency (ARC)

Adaptive Request Concurrency is a feature of Vector that does away with static concurrency limits and automatically optimizes HTTP concurrency based on downstream service responses. The underlying mechanism is a feedback loop inspired by TCP congestion control algorithms. Checkout the announcement blog post,

We highly recommend enabling this feature as it improves performance and reliability of Vector and the systems it communicates with. As such, we have made it the default, and no further configuration is required.

Static concurrency

If Adaptive Request Concurrency is not for you, you can manually set static concurrency limits by specifying an integer for request.concurrency:

[sinks.my-sink]
  request.concurrency = 10

Rate limits

In addition to limiting request concurrency, you can also limit the overall request throughput via the request.rate_limit_duration_secs and request.rate_limit_num options.

[sinks.my-sink]
  request.rate_limit_duration_secs = 1
  request.rate_limit_num = 10

These will apply to both adaptive and fixed request.concurrency values.

Retry policy

Vector will retry failed requests (status == 429, >= 500, and != 501). Other responses will not be retried. You can control the number of retry attempts and backoff rate with the request.retry_attempts and request.retry_backoff_secs options.

State

This component is stateless, meaning its behavior is consistent across each input.

Transport Layer Security (TLS)

Vector uses OpenSSL for TLS protocols due to OpenSSL’s maturity. You can enable and adjust TLS behavior using the tls.* options.