Reduce
Collapse multiple log events into a single event based on a set of conditions and merge strategies
Configuration
Example configurations
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
],
"group_by": [
"request_id"
]
}
}
}
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
group_by = [ "request_id" ]
---
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
group_by:
- request_id
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
],
"ends_when": ".status_code != 200 && !includes([\"info\", \"debug\"], .severity)",
"expire_after_ms": 30000,
"flush_period_ms": 1000,
"group_by": [
"request_id"
],
"merge_strategies": {
"method": "discard",
"path": "discard",
"duration_ms": "sum",
"query": "array"
},
"starts_when": null
}
}
}
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
ends_when = '.status_code != 200 && !includes(["info", "debug"], .severity)'
expire_after_ms = 30_000
flush_period_ms = 1_000
group_by = [ "request_id" ]
[transforms.my_transform_id.merge_strategies]
method = "discard"
path = "discard"
duration_ms = "sum"
query = "array"
---
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
ends_when: .status_code != 200 && !includes(["info", "debug"], .severity)
expire_after_ms: 30000
flush_period_ms: 1000
group_by:
- request_id
merge_strategies:
method: discard
path: discard
duration_ms: sum
query: array
starts_when: null
ends_when
optional string literaltrue
for an event, the current transaction is immediately flushed with this event.expire_after_ms
optional uint30000
(milliseconds)flush_period_ms
optional uint1000
(milliseconds)group_by
common optional [string]An ordered list of fields by which to group events. Each group with matching values for the specified keys is reduced independently, allowing you to keep independent event streams separate.
For example, if group_by = ["host", "region"]
, then all incoming events that have the same host and
region will be grouped together before being reduced.
When no fields are specified, all events will be combined in a single group.
inputs
required [string]A list of upstream source or transform
IDs. Wildcards (*
) are supported.
See configuration for more info.
merge_strategies
optional objectA map of field names to custom merge strategies. For each field specified this strategy will be used for combining events rather than the default behavior.
The default behavior is as follows:
- The first value of a string field is kept, subsequent values are discarded.
- For timestamp fields the first is kept and a new field
[field-name]_end
is added with the last received timestamp value. - Numeric values are summed.
merge_strategies.*
optional string literal enumOption | Description |
---|---|
array | Each value is appended to an array. |
concat | Concatenate each string value (delimited with a space). |
concat_newline | Concatenate each string value (delimited with a newline). |
concat_raw | Concatenate each string value (without any delimiter). |
discard | Discard all but the first value found. |
flat_unique | Create a flattened array of all the unique values. |
longest_array | Retains the longest array seen |
max | The maximum of all numeric values. |
min | The minimum of all numeric values. |
retain | Discard all but the last value found. Works as a coalesce by not retaining null. |
shortest_array | Retains the shortest array seen |
sum | Sum all numeric values. |
starts_when
optional conditiontrue
for an event, the previous transaction is flushed (without this event) and a new transaction is started.type
.Available syntaxes
Syntax | Description | Example |
---|---|---|
vrl | A Vector Remap Language (VRL) Boolean expression. | .status_code != 200 && !includes(["info", "debug"], .severity) |
datadog_search | A Datadog Search query string. | *stack |
Shorthand for VRL
If you opt for the vrl
syntax for this condition, you can set the condition
as a string via the condition
parameter, without needing to specify both a source
and a type
. The
table below shows some examples:
Config format | Example |
---|---|
TOML | condition = ".status == 200" |
YAML | condition: .status == 200 |
JSON | "condition": ".status == 200" |
Condition config examples
Standard VRL
starts_when = { type = "vrl", source = ".status == 500" }
starts_when:
type: "vrl"
source: ".status == 500"
"starts_when": {
"type": "vrl",
"source": ".status == 500"
}
Outputs
<component_id>
Telemetry
Metrics
linkcomponent_received_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_received_events_count
histogramcomponent_id
instead. The value is the same as component_id
.component_received_events_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_event_bytes_total
countercomponent_id
instead. The value is the same as component_id
.component_sent_events_total
countercomponent_id
instead. The value is the same as component_id
.events_in_total
countercomponent_received_events_total
instead.component_id
instead. The value is the same as component_id
.events_out_total
countercomponent_sent_events_total
instead.component_id
instead. The value is the same as component_id
.processed_bytes_total
countercomponent_id
instead. The value is the same as component_id
.processed_events_total
countercomponent_received_events_total
and
component_sent_events_total
metrics.component_id
instead. The value is the same as component_id
.stale_events_flushed_total
countercomponent_id
instead. The value is the same as component_id
.utilization
gaugecomponent_id
instead. The value is the same as component_id
.Examples
Merge Ruby exceptions
Given this event...[{"log":{"host":"host-1.hostname.com","message":"foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":" from foobar.rb:6:in `bar'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":" from foobar.rb:2:in `foo'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":" from foobar.rb:9:in `\u003cmain\u003e'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"Hello world, I am a new log","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:22.123528Z"}}]
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
group_by = [ "host", "pid", "tid" ]
starts_when = "match(string!(.message), r'^[^\\s]')"
[transforms.my_transform_id.merge_strategies]
message = "concat_newline"
---
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
group_by:
- host
- pid
- tid
merge_strategies:
message: concat_newline
starts_when: match(string!(.message), r'^[^\s]')
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
],
"group_by": [
"host",
"pid",
"tid"
],
"merge_strategies": {
"message": "concat_newline"
},
"starts_when": "match(string!(.message), r'^[^\\s]')"
}
}
}
[{"log":{"host":"host-1.hostname.com","message":"foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n from foobar.rb:6:in `bar'\n from foobar.rb:2:in `foo'\n from foobar.rb:9:in `\u003cmain\u003e'","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"host":"host-1.hostname.com","message":"Hello world, I am a new log","pid":1234,"tid":5678,"timestamp":"2020-10-07T12:33:22.123528Z"}}]
Reduce Rails logs into a single transaction
Given this event...[{"log":{"message":"Received GET /path","request_id":"abcd1234","request_params":{"key":"val"},"request_path":"/path","timestamp":"2020-10-07T12:33:21.223543Z"}},{"log":{"message":"Executed query in 5.2ms","query":"SELECT * FROM table","query_duration_ms":5.2,"request_id":"abcd1234","timestamp":"2020-10-07T12:33:21.832345Z"}},{"log":{"message":"Rendered partial _partial.erb in 2.3ms","render_duration_ms":2.3,"request_id":"abcd1234","template":"_partial.erb","timestamp":"2020-10-07T12:33:22.457423Z"}},{"log":{"message":"Executed query in 7.8ms","query":"SELECT * FROM table","query_duration_ms":7.8,"request_id":"abcd1234","timestamp":"2020-10-07T12:33:22.543323Z"}},{"log":{"message":"Sent 200 in 15.2ms","request_id":"abcd1234","response_duration_ms":5.2,"response_status":200,"timestamp":"2020-10-07T12:33:22.742322Z"}}]
[transforms.my_transform_id]
type = "reduce"
inputs = [ "my-source-or-transform-id" ]
---
transforms:
my_transform_id:
type: reduce
inputs:
- my-source-or-transform-id
{
"transforms": {
"my_transform_id": {
"type": "reduce",
"inputs": [
"my-source-or-transform-id"
]
}
}
}
{
"query_duration_ms": 13,
"render_duration_ms": 2.3,
"request_id": "abcd1234",
"request_params": {
"key": "val"
},
"request_path": "/path",
"response_duration_ms": 5.2,
"status": 200,
"timestamp": "2020-10-07T12:33:21.223543Z",
"timestamp_end": "2020-10-07T12:33:22.742322Z"
}