Skip to content

flowerpower mqtt Commands

This section details the commands available under flowerpower mqtt.

start_listener

Start an MQTT client to listen to messages on a topic

The connection to the MQTT broker is established using the provided configuration o a MQTT event broker defined in the project configuration file conf/project.yml. If not configuration is found, you have to provide the connection parameters, such as host, port, username, and password.

The on_message module should contain a function on_message that will be called with the message payload as argument.

Usage

flowerpower mqtt start_listener [options]

Arguments

Name Type Description Default
on_message str Name of the module containing the on_message function Required
topic str MQTT topic to listen to Required
base_dir str Base directory for the module Required
host str MQTT broker host Required
port str MQTT broker port Required
username str MQTT broker username Required
password str MQTT broker password Required

Examples

$ flowerpower mqtt start_listener --on-message my_module --topic my_topic --base-dir /path/to/module

run_pipeline_on_message

Run a pipeline on a message

This command sets up an MQTT listener that executes a pipeline whenever a message is received on the specified topic. The pipeline can be configured to retry on failure using exponential backoff with jitter for better resilience.

Usage

flowerpower mqtt run_pipeline_on_message [options]

Arguments

Name Type Description Default
name str Name of the pipeline Required
topic str MQTT topic to listen to Required
executor str Name of the executor Required
base_dir str Base directory for the pipeline Required
inputs str Inputs as JSON or key=value pairs or dict string Required
final_vars str Final variables as JSON or list Required
config str Config for the hamilton pipeline executor Required
with_tracker str Enable tracking with hamilton ui Required
with_opentelemetry str Enable OpenTelemetry tracing Required
with_progressbar str Enable progress bar Required
storage_options str Storage options as JSON, dict string or key=value pairs Required
as_job str Run as a job in the scheduler Required
host str MQTT broker host Required
port str MQTT broker port Required
username str MQTT broker username Required
password str MQTT broker password Required
clean_session str Whether to start a clean session with the broker Required
qos str MQTT Quality of Service level (0, 1, or 2) Required
client_id str Custom MQTT client identifier Required
client_id_suffix str Optional suffix to append to client_id Required
config_hook str Function to process incoming messages into pipeline config Required
max_retries str Maximum number of retry attempts if pipeline execution fails Required
retry_delay str Base delay between retries in seconds Required
jitter_factor str Random factor (0-1) applied to delay for jitter Required

Examples

1
2
3
$ flowerpower mqtt run-pipeline-on-message my_pipeline --topic sensors/data

# Configure retries for resilience
1
2
3
$ flowerpower mqtt run-pipeline-on-message my_pipeline --topic sensors/data --max-retries 5 --retry-delay 2.0

# Run as a job with custom MQTT settings
1
2
3
$ flowerpower mqtt run-pipeline-on-message my_pipeline --topic events/process --as-job --qos 2 --host mqtt.example.com

# Use a config hook to process messages
$ flowerpower mqtt run-pipeline-on-message my_pipeline --topic data/incoming --config-hook process_message