Trigger
Consume messages periodically from MQTT topics and create one execution per batch.
Note that you don't need an extra task to consume the message from the event trigger. The trigger will automatically consume messages and you can retrieve their content in your flow using the {{ trigger.uri }}
variable. If you would like to consume each message from MQTT topics in real-time and create one execution per message, you can use the io.kestra.plugin.mqtt.RealtimeTrigger instead.
type: "io.kestra.plugin.mqtt.Trigger"
id: mqtt_trigger
namespace: company.team
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.payload }}"
triggers:
- id: trigger
type: io.kestra.plugin.mqtt.Trigger
server: tcp://localhost:1883
clientId: kestraProducer
topic:
- kestra/sensors/cpu
- kestra/sensors/mem
serdeType: JSON
maxRecords: 10
YES
1
Sets the quality of service for this message.
- Quality of Service 0: indicates that a message should be delivered at most once (zero or one times). The message will not be persisted to disk, and will not be acknowledged across the network. This QoS is the fastest, but should only be used for messages which are not valuable - note that if the server cannot process the message (for example, there is an authorization problem). Also known as "fire and forget".
- Quality of Service 1: indicates that a message should be delivered at least once (one or more times). The message can only be delivered safely if it can be persisted, so the application must supply a means of persistence using MqttConnectOptions. If a persistence mechanism is not specified, the message will not be delivered in the event of a client failure. The message will be acknowledged across the network.
- Quality of Service 2: indicates that a message should be delivered once. The message will be persisted to disk, and will be subject to a two-phase acknowledgement across the network. The message can only be delivered safely if it can be persisted, so the application must supply a means of persistence using MqttConnectOptions. If a persistence mechanism is not specified, the message will not be delivered in the event of a client failure. If persistence is not configured, QoS 1 and 2 messages will still be delivered in the event of a network or server problem as the client will hold state in memory. If the MQTT client is shutdown or fails and persistence is not configured then delivery of QoS 1 and 2 messages can not be maintained as client-side state will be lost.
YES
Topic where to consume message
Can be a string or a List of string to consume from multiple topic
YES
YES
YES
duration
YES
YES
NO
60
duration
Interval between polling.
The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.
YES
duration
The max duration waiting for new rows
It's not an hard limit and is evaluated every second
YES
The max number of rows to fetch before stopping
It's not an hard limit and is evaluated every second
YES
V5
V3
V5
YES
YES
JSON
STRING
JSON
BYTES
Serializer / Deserializer used for the payload
YES
NO
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
List of execution states after which a trigger should be stopped (a.k.a. disabled).
YES
Number of message produced
uri
URI of a kestra internal storage file