Forward workflow execution logs to one or more desired destinations.
The Log Shipper task extracts logs from the Kestra backend and loads them to desired destinations including Datadog, Elasticsearch, New Relic, OpenTelemetry, AWS CloudWatch, Google Operational Suite, and Azure Monitor.
The task works incrementally in batches:
- Determines the starting timestamp using either:
- The last successfully processed log's timestamp (persisted in KV Store using the
offsetKey
) - Current time minus
lookbackPeriod
duration if no previous state exists
- The last successfully processed log's timestamp (persisted in KV Store using the
- Sends retrieved logs through configured
logExporters
- Stores the timestamp of the last processed log to maintain state between executions
- Subsequent runs continue from the last stored timestamp
This incremental approach ensures reliable log forwarding without gaps or duplicates.
type: "io.kestra.plugin.ee.core.log.LogShipper"
Ship logs to multiple destinations
id: logShipper
namespace: system
tasks:
- id: shipLogs
type: io.kestra.plugin.ee.core.log.LogShipper
logLevelFilter: INFO
lookbackPeriod: P1D
offsetKey: logShipperOffset
delete: false
logExporters:
- id: file
type: io.kestra.plugin.ee.core.log.FileLogExporter
- id: awsCloudWatch
type: io.kestra.plugin.ee.aws.cloudwatch.LogExporter
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: us-east-1
logGroupName: kestra
logStreamName: production
- id: S3LogExporter
type: io.kestra.plugin.ee.aws.s3.LogExporter
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
region: "{{ vars.region }}"
format: JSON
bucket: logbucket
logFilePrefix: kestra-log-file
maxLinesPerFile: 1000000
- id: googleOperationalSuite
type: io.kestra.plugin.ee.gcp.operationalsuite.LogExporter
projectId: my-gcp-project
- id: gcs
type: io.kestra.plugin.ee.gcp.gcs.LogExporter
projectId: myProjectId
format: JSON
maxLinesPerFile: 10000
bucket: my-bucket
logFilePrefix: kestra-log-file
chunk: 1000
- id: azureMonitor
type: io.kestra.plugin.ee.azure.monitor.LogExporter
endpoint: https://endpoint-host.ingest.monitor.azure.com
tenantId: "{{ secret('AZURE_TENANT_ID') }}"
clientId: "{{ secret('AZURE_CLIENT_ID') }}"
clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
ruleId: dcr-69f0b123041d4d6e9f2bf72aad0b62cf
streamName: kestraLogs
- id: azureBlobStorage
type: io.kestra.plugin.ee.azure.storage.LogExporter
endpoint: https://myblob.blob.core.windows.net/
tenantId: "{{ secret('AZURE_TENANT_ID') }}"
clientId: "{{ secret('AZURE_CLIENT_ID') }}"
clientSecret: "{{ secret('AZURE_CLIENT_SECRET') }}"
containerName: logs
format: JSON
logFilePrefix: kestra-log-file
maxLinesPerFile: 1000000
chunk: 1000
- id: datadog
type: io.kestra.plugin.ee.datadog.LogExporter
basePath: https://http-intake.logs.datadoghq.eu
apiKey: "{{ secret('DATADOG_API_KEY') }}"
- id: elasticsearch
type: io.kestra.plugin.ee.elasticsearch.LogExporter
indexName: kestra-logs
connection:
basicAuth:
password: "{{ secret('ES_PASSWORD') }}"
username: kestra_user
hosts:
- https://elastic.example.com:9200
- id: opensearch
type: io.kestra.plugin.ee.opensearch.LogExporter
indexName: kestra-logs
connection:
basicAuth:
password: "{{ secret('ES_PASSWORD') }}"
username: kestra_user
hosts:
- https://elastic.example.com:9200
- id: newRelic
type: io.kestra.plugin.ee.newrelic.LogExporter
basePath: https://log-api.newrelic.com
apiKey: "{{ secret('NEWRELIC_API_KEY') }}"
- id: openTelemetry
type: io.kestra.plugin.ee.opentelemetry.LogExporter
otlpEndpoint: http://otel-collector:4318/v1/logs
authorizationHeaderName: Authorization
authorizationHeaderValue: "Bearer {{ secret('OTEL_TOKEN') }}"
triggers:
- id: dailySchedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 0 * * *"
disabled: true
YES
Deprecated
YES
Delete logs after export
The log shipper will delete the exported logs
YES
INFO
Log level to send
The minimum log level to send
YES
P1D
duration
Starting duration before now
If no previous execution or state exists, the fetch start date is set to the current time minus this duration
YES
Namespace to search
The namespace to use to filter logs
YES
Prefix of the KVStore key
The prefix of the KVStore key that contains the last execution's end fetched date
NO
^[a-zA-Z0-9][a-zA-Z0-9_-]*
1