Skip to content

API Reference

This section provides auto-generated API documentation for flowerpower-mqtt.

The documentation is generated from the source code docstrings using mkdocstrings.

flowerpower_mqtt

FlowerPower MQTT Plugin

A simple MQTT plugin for FlowerPower that triggers pipeline execution when messages arrive on subscribed topics, with configurable QoS levels and optional RQ job queue integration for asynchronous processing.

Classes

FlowerPowerMQTTConfig

Bases: Struct

Main configuration for FlowerPower MQTT plugin.

Source code in src/flowerpower_mqtt/config.py
class FlowerPowerMQTTConfig(Struct):
    """Main configuration for FlowerPower MQTT plugin."""
    mqtt: MQTTConfig = msgspec.field(default_factory=lambda: MQTTConfig())
    job_queue: JobQueueConfig = msgspec.field(default_factory=lambda: JobQueueConfig())
    subscriptions: List[SubscriptionConfig] = msgspec.field(default_factory=list)
    base_dir: str = "."
    log_level: str = "INFO"

    @classmethod
    def from_yaml(cls, file_path: Path) -> "FlowerPowerMQTTConfig":
        """Load configuration from YAML file."""
        if not file_path.exists():
            raise FileNotFoundError(f"Configuration file not found: {file_path}")

        with open(file_path, 'r') as f:
            return msgspec.yaml.decode(f.read(), type=cls)



    def to_yaml(self, file_path: Path) -> None:
        """Save configuration to YAML file."""
        # Convert struct to dictionary
        data = msgspec.to_builtins(self)

        with open(file_path, 'wb') as f:
            f.write(msgspec.yaml.encode(data))

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for compatibility."""
        return msgspec.to_builtins(self)
Functions
from_yaml classmethod
from_yaml(file_path: Path) -> FlowerPowerMQTTConfig

Load configuration from YAML file.

Source code in src/flowerpower_mqtt/config.py
@classmethod
def from_yaml(cls, file_path: Path) -> "FlowerPowerMQTTConfig":
    """Load configuration from YAML file."""
    if not file_path.exists():
        raise FileNotFoundError(f"Configuration file not found: {file_path}")

    with open(file_path, 'r') as f:
        return msgspec.yaml.decode(f.read(), type=cls)
to_yaml
to_yaml(file_path: Path) -> None

Save configuration to YAML file.

Source code in src/flowerpower_mqtt/config.py
def to_yaml(self, file_path: Path) -> None:
    """Save configuration to YAML file."""
    # Convert struct to dictionary
    data = msgspec.to_builtins(self)

    with open(file_path, 'wb') as f:
        f.write(msgspec.yaml.encode(data))
to_dict
to_dict() -> Dict[str, Any]

Convert to dictionary for compatibility.

Source code in src/flowerpower_mqtt/config.py
def to_dict(self) -> Dict[str, Any]:
    """Convert to dictionary for compatibility."""
    return msgspec.to_builtins(self)

MQTTConfig

Bases: Struct

MQTT broker configuration.

Source code in src/flowerpower_mqtt/config.py
class MQTTConfig(Struct, frozen=True):
    """MQTT broker configuration."""
    broker: str = "localhost"
    port: int = 1883
    keepalive: int = 60
    client_id: Optional[str] = None
    clean_session: bool = True
    username: Optional[str] = None
    password: Optional[str] = None
    reconnect_retries: int = 5
    reconnect_delay: int = 5

JobQueueConfig

Bases: Struct

Job queue configuration.

Source code in src/flowerpower_mqtt/config.py
class JobQueueConfig(Struct, frozen=True):
    """Job queue configuration."""
    enabled: bool = False
    type: str = "rq"
    redis_url: str = "redis://localhost:6379"
    queue_name: str = "mqtt_pipelines"
    worker_count: int = 4
    max_retries: int = 3

SubscriptionConfig

Bases: Struct

Individual subscription configuration.

Source code in src/flowerpower_mqtt/config.py
class SubscriptionConfig(Struct):
    """Individual subscription configuration."""
    topic: str
    pipeline: str
    qos: int = 0
    execution_mode: str = "sync"
    deserialization_format: str = "auto"

    def __post_init__(self) -> None:
        """Validate fields after initialization."""
        _validate_qos(self.qos)
        _validate_execution_mode(self.execution_mode)
        _validate_deserialization_format(self.deserialization_format)

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for compatibility."""
        return msgspec.to_builtins(self)
Functions
__post_init__
__post_init__() -> None

Validate fields after initialization.

Source code in src/flowerpower_mqtt/config.py
def __post_init__(self) -> None:
    """Validate fields after initialization."""
    _validate_qos(self.qos)
    _validate_execution_mode(self.execution_mode)
    _validate_deserialization_format(self.deserialization_format)
to_dict
to_dict() -> Dict[str, Any]

Convert to dictionary for compatibility.

Source code in src/flowerpower_mqtt/config.py
def to_dict(self) -> Dict[str, Any]:
    """Convert to dictionary for compatibility."""
    return msgspec.to_builtins(self)

FlowerPowerMQTTError

Bases: Exception

Base exception for FlowerPower MQTT plugin.

Source code in src/flowerpower_mqtt/exceptions.py
4
5
6
class FlowerPowerMQTTError(Exception):
    """Base exception for FlowerPower MQTT plugin."""
    pass

ConnectionError

Bases: FlowerPowerMQTTError

Raised when MQTT connection fails.

Source code in src/flowerpower_mqtt/exceptions.py
class ConnectionError(FlowerPowerMQTTError):
    """Raised when MQTT connection fails."""
    pass

SubscriptionError

Bases: FlowerPowerMQTTError

Raised when MQTT subscription fails.

Source code in src/flowerpower_mqtt/exceptions.py
class SubscriptionError(FlowerPowerMQTTError):
    """Raised when MQTT subscription fails."""
    pass

ConfigurationError

Bases: FlowerPowerMQTTError

Raised when configuration is invalid.

Source code in src/flowerpower_mqtt/exceptions.py
class ConfigurationError(FlowerPowerMQTTError):
    """Raised when configuration is invalid."""
    pass

MQTTPlugin

Main interface for FlowerPower MQTT plugin.

Provides simple API for connecting to MQTT brokers, subscribing to topics, and triggering FlowerPower pipeline execution with support for different QoS levels and execution modes.

Source code in src/flowerpower_mqtt/__init__.py
class MQTTPlugin:
    """
    Main interface for FlowerPower MQTT plugin.

    Provides simple API for connecting to MQTT brokers, subscribing to topics,
    and triggering FlowerPower pipeline execution with support for different
    QoS levels and execution modes.
    """

    def __init__(
        self,
        broker: str = "localhost",
        port: int = 1883,
        base_dir: str = ".",
        use_job_queue: bool = False,
        redis_url: str = "redis://localhost:6379",
        config: Optional[FlowerPowerMQTTConfig] = None,
        **mqtt_kwargs
    ):
        """
        Initialize MQTT plugin.

        Args:
            broker: MQTT broker hostname
            port: MQTT broker port
            base_dir: FlowerPower project base directory
            use_job_queue: Enable RQ job queue for async execution
            redis_url: Redis connection URL for job queue
            config: Complete configuration object (overrides other params)
            **mqtt_kwargs: Additional MQTT client configuration
        """
        # Handle Windows event loop policy for aiomqtt
        if sys.platform.lower() == "win32" or os.name.lower() == "nt":
            try:
                from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
                set_event_loop_policy(WindowsSelectorEventLoopPolicy())
            except ImportError:
                logger.warning("Could not set Windows event loop policy")

        # Initialize configuration
        if config is not None:
            self.config = config
        else:
            mqtt_config = MQTTConfig(
                broker=broker,
                port=port,
                **mqtt_kwargs
            )

            job_queue_config = JobQueueConfig(
                enabled=use_job_queue,
                redis_url=redis_url
            ) if use_job_queue else JobQueueConfig()

            self.config = FlowerPowerMQTTConfig(
                mqtt=mqtt_config,
                job_queue=job_queue_config,
                base_dir=base_dir
            )

        # Initialize components
        self.mqtt_client = MQTTClient(self.config.mqtt)
        self.listener: Optional[MQTTListener] = None
        self._connected = False

        # Configure logging level
        logging.getLogger().setLevel(getattr(logging, self.config.log_level.upper()))

    @classmethod
    def from_config(cls, config_path: Union[str, Path]) -> "MQTTPlugin":
        """
        Create plugin instance from configuration file.

        Args:
            config_path: Path to YAML configuration file

        Returns:
            Configured MQTTPlugin instance
        """
        config = FlowerPowerMQTTConfig.from_yaml(Path(config_path))
        return cls(config=config)

    async def connect(self) -> None:
        """Connect to MQTT broker."""
        if self._connected:
            logger.warning("Already connected to MQTT broker")
            return

        logger.info(f"Connecting to MQTT broker at {self.config.mqtt.broker}:{self.config.mqtt.port}")
        await self.mqtt_client.connect()

        # Initialize listener
        self.listener = MQTTListener(self.mqtt_client, self.config)
        self._connected = True

        logger.info("Successfully connected to MQTT broker")

    async def disconnect(self) -> None:
        """Disconnect from MQTT broker."""
        if not self._connected:
            logger.warning("Not connected to MQTT broker")
            return

        logger.info("Disconnecting from MQTT broker")

        # Stop listener if running
        if self.listener and self.listener.is_running:
            await self.listener.stop_listener()

        await self.mqtt_client.disconnect()
        self._connected = False

        logger.info("Successfully disconnected from MQTT broker")

    async def __aenter__(self):
        """Async context manager entry."""
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        await self.disconnect()

    async def subscribe(
        self,
        topic: str,
        pipeline_name: str,
        qos: int = 0,
        execution_mode: str = "sync"
    ) -> None:
        """
        Subscribe to MQTT topic and link to FlowerPower pipeline.

        Args:
            topic: MQTT topic pattern to subscribe to
            pipeline_name: Name of FlowerPower pipeline to execute
            qos: QoS level (0=at most once, 1=at least once, 2=exactly once)
            execution_mode: Execution mode (sync, async, mixed)
        """
        if not self._connected:
            raise ConnectionError("Not connected to MQTT broker. Call connect() first.")

        if qos not in [0, 1, 2]:
            raise SubscriptionError(f"Invalid QoS level: {qos}. Must be 0, 1, or 2.")

        if execution_mode not in ["sync", "async", "mixed"]:
            raise SubscriptionError(
                f"Invalid execution mode: {execution_mode}. Must be 'sync', 'async', or 'mixed'."
            )

        # Add to configuration
        subscription = SubscriptionConfig(
            topic=topic,
            pipeline=pipeline_name,
            qos=qos,
            execution_mode=execution_mode
        )
        self.config.subscriptions.append(subscription)

        # Subscribe via MQTT client
        await self.mqtt_client.subscribe(topic, pipeline_name, qos, execution_mode)

        logger.info(
            f"Subscribed to '{topic}' -> pipeline '{pipeline_name}' "
            f"(QoS {qos}, {execution_mode} mode)"
        )

    async def subscribe_bulk(self, subscriptions: List[Dict[str, Any]]) -> None:
        """
        Subscribe to multiple topics at once.

        Args:
            subscriptions: List of subscription dictionaries
        """
        for sub in subscriptions:
            await self.subscribe(
                topic=sub["topic"],
                pipeline_name=sub["pipeline"],
                qos=sub.get("qos", 0),
                execution_mode=sub.get("execution_mode", "sync")
            )

    async def unsubscribe(self, topic: str) -> None:
        """
        Unsubscribe from MQTT topic.

        Args:
            topic: MQTT topic pattern to unsubscribe from
        """
        if not self._connected:
            raise ConnectionError("Not connected to MQTT broker")

        await self.mqtt_client.unsubscribe(topic)

        # Remove from configuration
        self.config.subscriptions = [
            sub for sub in self.config.subscriptions if sub.topic != topic
        ]

        logger.info(f"Unsubscribed from '{topic}'")

    async def start_listener(
        self,
        background: bool = False,
        execution_mode: Optional[str] = None
    ) -> None:
        """
        Start listening for MQTT messages.

        Args:
            background: If True, run listener in background task
            execution_mode: Override execution mode for all pipelines
        """
        if not self._connected:
            raise ConnectionError("Not connected to MQTT broker. Call connect() first.")

        if not self.listener:
            raise ConfigurationError("Listener not initialized")

        if not self.config.subscriptions:
            logger.warning("No subscriptions configured")

        # Override execution mode if specified
        if execution_mode:
            for sub in self.config.subscriptions:
                sub.execution_mode = execution_mode

        logger.info(
            f"Starting listener with {len(self.config.subscriptions)} subscriptions "
            f"(background={background})"
        )

        try:
            await self.listener.start_listener(background=background)
        except KeyboardInterrupt:
            logger.info("Received keyboard interrupt, stopping listener")
            await self.stop_listener()
        except Exception as e:
            logger.error(f"Error in listener: {e}")
            raise

    async def stop_listener(self, timeout: float = 10.0) -> None:
        """
        Stop MQTT message listener.

        Args:
            timeout: Maximum time to wait for graceful shutdown
        """
        if not self.listener:
            logger.warning("No listener to stop")
            return

        await self.listener.stop_listener(timeout=timeout)

    def get_subscriptions(self) -> List[Dict[str, Any]]:
        """
        Get current subscriptions.

        Returns:
            List of subscription information dictionaries
        """
        if not self._connected:
            return [sub.to_dict() for sub in self.config.subscriptions]

        subscriptions = []
        for topic, runtime_sub in self.mqtt_client.get_all_subscriptions().items():
            sub_info = {
                "topic": runtime_sub.topic,
                "pipeline": runtime_sub.pipeline,
                "qos": runtime_sub.qos,
                "execution_mode": runtime_sub.execution_mode,
                "message_count": runtime_sub.message_count,
                "last_message_time": runtime_sub.last_message_time,
                "error_count": runtime_sub.error_count
            }
            subscriptions.append(sub_info)

        return subscriptions

    def get_statistics(self) -> Dict[str, Any]:
        """
        Get plugin statistics.

        Returns:
            Dictionary with current statistics
        """
        stats = {
            "connected": self._connected,
            "broker": f"{self.config.mqtt.broker}:{self.config.mqtt.port}",
            "subscriptions_count": len(self.config.subscriptions),
            "job_queue_enabled": self.config.job_queue.enabled
        }

        if self.listener:
            stats.update(self.listener.get_statistics())

        return stats

    def save_config(self, file_path: Union[str, Path]) -> None:
        """
        Save current configuration to YAML file.

        Args:
            file_path: Path where to save configuration
        """
        self.config.to_yaml(Path(file_path))
        logger.info(f"Configuration saved to {file_path}")

    @property
    def is_connected(self) -> bool:
        """Check if connected to MQTT broker."""
        return self._connected

    @property
    def is_listening(self) -> bool:
        """Check if listener is running."""
        return self.listener.is_running if self.listener else False
Attributes
is_connected property
is_connected: bool

Check if connected to MQTT broker.

is_listening property
is_listening: bool

Check if listener is running.

Functions
__init__
__init__(
    broker: str = "localhost",
    port: int = 1883,
    base_dir: str = ".",
    use_job_queue: bool = False,
    redis_url: str = "redis://localhost:6379",
    config: Optional[FlowerPowerMQTTConfig] = None,
    **mqtt_kwargs
)

Initialize MQTT plugin.

Parameters:

Name Type Description Default
broker str

MQTT broker hostname

'localhost'
port int

MQTT broker port

1883
base_dir str

FlowerPower project base directory

'.'
use_job_queue bool

Enable RQ job queue for async execution

False
redis_url str

Redis connection URL for job queue

'redis://localhost:6379'
config Optional[FlowerPowerMQTTConfig]

Complete configuration object (overrides other params)

None
**mqtt_kwargs

Additional MQTT client configuration

{}
Source code in src/flowerpower_mqtt/__init__.py
def __init__(
    self,
    broker: str = "localhost",
    port: int = 1883,
    base_dir: str = ".",
    use_job_queue: bool = False,
    redis_url: str = "redis://localhost:6379",
    config: Optional[FlowerPowerMQTTConfig] = None,
    **mqtt_kwargs
):
    """
    Initialize MQTT plugin.

    Args:
        broker: MQTT broker hostname
        port: MQTT broker port
        base_dir: FlowerPower project base directory
        use_job_queue: Enable RQ job queue for async execution
        redis_url: Redis connection URL for job queue
        config: Complete configuration object (overrides other params)
        **mqtt_kwargs: Additional MQTT client configuration
    """
    # Handle Windows event loop policy for aiomqtt
    if sys.platform.lower() == "win32" or os.name.lower() == "nt":
        try:
            from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
            set_event_loop_policy(WindowsSelectorEventLoopPolicy())
        except ImportError:
            logger.warning("Could not set Windows event loop policy")

    # Initialize configuration
    if config is not None:
        self.config = config
    else:
        mqtt_config = MQTTConfig(
            broker=broker,
            port=port,
            **mqtt_kwargs
        )

        job_queue_config = JobQueueConfig(
            enabled=use_job_queue,
            redis_url=redis_url
        ) if use_job_queue else JobQueueConfig()

        self.config = FlowerPowerMQTTConfig(
            mqtt=mqtt_config,
            job_queue=job_queue_config,
            base_dir=base_dir
        )

    # Initialize components
    self.mqtt_client = MQTTClient(self.config.mqtt)
    self.listener: Optional[MQTTListener] = None
    self._connected = False

    # Configure logging level
    logging.getLogger().setLevel(getattr(logging, self.config.log_level.upper()))
from_config classmethod
from_config(config_path: Union[str, Path]) -> MQTTPlugin

Create plugin instance from configuration file.

Parameters:

Name Type Description Default
config_path Union[str, Path]

Path to YAML configuration file

required

Returns:

Type Description
MQTTPlugin

Configured MQTTPlugin instance

Source code in src/flowerpower_mqtt/__init__.py
@classmethod
def from_config(cls, config_path: Union[str, Path]) -> "MQTTPlugin":
    """
    Create plugin instance from configuration file.

    Args:
        config_path: Path to YAML configuration file

    Returns:
        Configured MQTTPlugin instance
    """
    config = FlowerPowerMQTTConfig.from_yaml(Path(config_path))
    return cls(config=config)
connect async
connect() -> None

Connect to MQTT broker.

Source code in src/flowerpower_mqtt/__init__.py
async def connect(self) -> None:
    """Connect to MQTT broker."""
    if self._connected:
        logger.warning("Already connected to MQTT broker")
        return

    logger.info(f"Connecting to MQTT broker at {self.config.mqtt.broker}:{self.config.mqtt.port}")
    await self.mqtt_client.connect()

    # Initialize listener
    self.listener = MQTTListener(self.mqtt_client, self.config)
    self._connected = True

    logger.info("Successfully connected to MQTT broker")
disconnect async
disconnect() -> None

Disconnect from MQTT broker.

Source code in src/flowerpower_mqtt/__init__.py
async def disconnect(self) -> None:
    """Disconnect from MQTT broker."""
    if not self._connected:
        logger.warning("Not connected to MQTT broker")
        return

    logger.info("Disconnecting from MQTT broker")

    # Stop listener if running
    if self.listener and self.listener.is_running:
        await self.listener.stop_listener()

    await self.mqtt_client.disconnect()
    self._connected = False

    logger.info("Successfully disconnected from MQTT broker")
__aenter__ async
__aenter__()

Async context manager entry.

Source code in src/flowerpower_mqtt/__init__.py
async def __aenter__(self):
    """Async context manager entry."""
    await self.connect()
    return self
__aexit__ async
__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit.

Source code in src/flowerpower_mqtt/__init__.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit."""
    await self.disconnect()
subscribe async
subscribe(
    topic: str, pipeline_name: str, qos: int = 0, execution_mode: str = "sync"
) -> None

Subscribe to MQTT topic and link to FlowerPower pipeline.

Parameters:

Name Type Description Default
topic str

MQTT topic pattern to subscribe to

required
pipeline_name str

Name of FlowerPower pipeline to execute

required
qos int

QoS level (0=at most once, 1=at least once, 2=exactly once)

0
execution_mode str

Execution mode (sync, async, mixed)

'sync'
Source code in src/flowerpower_mqtt/__init__.py
async def subscribe(
    self,
    topic: str,
    pipeline_name: str,
    qos: int = 0,
    execution_mode: str = "sync"
) -> None:
    """
    Subscribe to MQTT topic and link to FlowerPower pipeline.

    Args:
        topic: MQTT topic pattern to subscribe to
        pipeline_name: Name of FlowerPower pipeline to execute
        qos: QoS level (0=at most once, 1=at least once, 2=exactly once)
        execution_mode: Execution mode (sync, async, mixed)
    """
    if not self._connected:
        raise ConnectionError("Not connected to MQTT broker. Call connect() first.")

    if qos not in [0, 1, 2]:
        raise SubscriptionError(f"Invalid QoS level: {qos}. Must be 0, 1, or 2.")

    if execution_mode not in ["sync", "async", "mixed"]:
        raise SubscriptionError(
            f"Invalid execution mode: {execution_mode}. Must be 'sync', 'async', or 'mixed'."
        )

    # Add to configuration
    subscription = SubscriptionConfig(
        topic=topic,
        pipeline=pipeline_name,
        qos=qos,
        execution_mode=execution_mode
    )
    self.config.subscriptions.append(subscription)

    # Subscribe via MQTT client
    await self.mqtt_client.subscribe(topic, pipeline_name, qos, execution_mode)

    logger.info(
        f"Subscribed to '{topic}' -> pipeline '{pipeline_name}' "
        f"(QoS {qos}, {execution_mode} mode)"
    )
subscribe_bulk async
subscribe_bulk(subscriptions: List[Dict[str, Any]]) -> None

Subscribe to multiple topics at once.

Parameters:

Name Type Description Default
subscriptions List[Dict[str, Any]]

List of subscription dictionaries

required
Source code in src/flowerpower_mqtt/__init__.py
async def subscribe_bulk(self, subscriptions: List[Dict[str, Any]]) -> None:
    """
    Subscribe to multiple topics at once.

    Args:
        subscriptions: List of subscription dictionaries
    """
    for sub in subscriptions:
        await self.subscribe(
            topic=sub["topic"],
            pipeline_name=sub["pipeline"],
            qos=sub.get("qos", 0),
            execution_mode=sub.get("execution_mode", "sync")
        )
unsubscribe async
unsubscribe(topic: str) -> None

Unsubscribe from MQTT topic.

Parameters:

Name Type Description Default
topic str

MQTT topic pattern to unsubscribe from

required
Source code in src/flowerpower_mqtt/__init__.py
async def unsubscribe(self, topic: str) -> None:
    """
    Unsubscribe from MQTT topic.

    Args:
        topic: MQTT topic pattern to unsubscribe from
    """
    if not self._connected:
        raise ConnectionError("Not connected to MQTT broker")

    await self.mqtt_client.unsubscribe(topic)

    # Remove from configuration
    self.config.subscriptions = [
        sub for sub in self.config.subscriptions if sub.topic != topic
    ]

    logger.info(f"Unsubscribed from '{topic}'")
start_listener async
start_listener(
    background: bool = False, execution_mode: Optional[str] = None
) -> None

Start listening for MQTT messages.

Parameters:

Name Type Description Default
background bool

If True, run listener in background task

False
execution_mode Optional[str]

Override execution mode for all pipelines

None
Source code in src/flowerpower_mqtt/__init__.py
async def start_listener(
    self,
    background: bool = False,
    execution_mode: Optional[str] = None
) -> None:
    """
    Start listening for MQTT messages.

    Args:
        background: If True, run listener in background task
        execution_mode: Override execution mode for all pipelines
    """
    if not self._connected:
        raise ConnectionError("Not connected to MQTT broker. Call connect() first.")

    if not self.listener:
        raise ConfigurationError("Listener not initialized")

    if not self.config.subscriptions:
        logger.warning("No subscriptions configured")

    # Override execution mode if specified
    if execution_mode:
        for sub in self.config.subscriptions:
            sub.execution_mode = execution_mode

    logger.info(
        f"Starting listener with {len(self.config.subscriptions)} subscriptions "
        f"(background={background})"
    )

    try:
        await self.listener.start_listener(background=background)
    except KeyboardInterrupt:
        logger.info("Received keyboard interrupt, stopping listener")
        await self.stop_listener()
    except Exception as e:
        logger.error(f"Error in listener: {e}")
        raise
stop_listener async
stop_listener(timeout: float = 10.0) -> None

Stop MQTT message listener.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for graceful shutdown

10.0
Source code in src/flowerpower_mqtt/__init__.py
async def stop_listener(self, timeout: float = 10.0) -> None:
    """
    Stop MQTT message listener.

    Args:
        timeout: Maximum time to wait for graceful shutdown
    """
    if not self.listener:
        logger.warning("No listener to stop")
        return

    await self.listener.stop_listener(timeout=timeout)
get_subscriptions
get_subscriptions() -> List[Dict[str, Any]]

Get current subscriptions.

Returns:

Type Description
List[Dict[str, Any]]

List of subscription information dictionaries

Source code in src/flowerpower_mqtt/__init__.py
def get_subscriptions(self) -> List[Dict[str, Any]]:
    """
    Get current subscriptions.

    Returns:
        List of subscription information dictionaries
    """
    if not self._connected:
        return [sub.to_dict() for sub in self.config.subscriptions]

    subscriptions = []
    for topic, runtime_sub in self.mqtt_client.get_all_subscriptions().items():
        sub_info = {
            "topic": runtime_sub.topic,
            "pipeline": runtime_sub.pipeline,
            "qos": runtime_sub.qos,
            "execution_mode": runtime_sub.execution_mode,
            "message_count": runtime_sub.message_count,
            "last_message_time": runtime_sub.last_message_time,
            "error_count": runtime_sub.error_count
        }
        subscriptions.append(sub_info)

    return subscriptions
get_statistics
get_statistics() -> Dict[str, Any]

Get plugin statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary with current statistics

Source code in src/flowerpower_mqtt/__init__.py
def get_statistics(self) -> Dict[str, Any]:
    """
    Get plugin statistics.

    Returns:
        Dictionary with current statistics
    """
    stats = {
        "connected": self._connected,
        "broker": f"{self.config.mqtt.broker}:{self.config.mqtt.port}",
        "subscriptions_count": len(self.config.subscriptions),
        "job_queue_enabled": self.config.job_queue.enabled
    }

    if self.listener:
        stats.update(self.listener.get_statistics())

    return stats
save_config
save_config(file_path: Union[str, Path]) -> None

Save current configuration to YAML file.

Parameters:

Name Type Description Default
file_path Union[str, Path]

Path where to save configuration

required
Source code in src/flowerpower_mqtt/__init__.py
def save_config(self, file_path: Union[str, Path]) -> None:
    """
    Save current configuration to YAML file.

    Args:
        file_path: Path where to save configuration
    """
    self.config.to_yaml(Path(file_path))
    logger.info(f"Configuration saved to {file_path}")

Functions

create_simple_mqtt_plugin async

create_simple_mqtt_plugin(
    broker: str = "localhost",
    topic: str = "test/topic",
    pipeline: str = "test_pipeline",
    qos: int = 0,
    base_dir: str = ".",
) -> MQTTPlugin

Create and configure a simple MQTT plugin instance.

Parameters:

Name Type Description Default
broker str

MQTT broker hostname

'localhost'
topic str

MQTT topic to subscribe to

'test/topic'
pipeline str

FlowerPower pipeline name

'test_pipeline'
qos int

QoS level

0
base_dir str

FlowerPower project directory

'.'

Returns:

Type Description
MQTTPlugin

Configured and connected MQTTPlugin instance

Source code in src/flowerpower_mqtt/__init__.py
async def create_simple_mqtt_plugin(
    broker: str = "localhost",
    topic: str = "test/topic", 
    pipeline: str = "test_pipeline",
    qos: int = 0,
    base_dir: str = "."
) -> MQTTPlugin:
    """
    Create and configure a simple MQTT plugin instance.

    Args:
        broker: MQTT broker hostname
        topic: MQTT topic to subscribe to
        pipeline: FlowerPower pipeline name
        qos: QoS level
        base_dir: FlowerPower project directory

    Returns:
        Configured and connected MQTTPlugin instance
    """
    plugin = MQTTPlugin(broker=broker, base_dir=base_dir)
    await plugin.connect()
    await plugin.subscribe(topic, pipeline, qos)
    return plugin

Modules

cli

Command Line Interface for FlowerPower MQTT Plugin.

Provides a comprehensive CLI for managing MQTT connections, subscriptions, and pipeline execution with beautiful rich output.

Classes
Functions
get_config_path
get_config_path() -> Path

Get configuration file path.

Source code in src/flowerpower_mqtt/cli.py
def get_config_path() -> Path:
    """Get configuration file path."""
    global _config_file
    if _config_file and _config_file.exists():
        return _config_file

    # Check common locations
    candidates = [
        Path("mqtt_config.yml"),
        Path("config.yml"),
        Path(".flowerpower-mqtt.yml"),
        Path.home() / ".config" / "flowerpower-mqtt.yml"
    ]

    for candidate in candidates:
        if candidate.exists():
            return candidate

    # Return default
    return Path("mqtt_config.yml")
load_plugin
load_plugin(
    config: Optional[Path] = None,
    broker: Optional[str] = None,
    port: Optional[int] = None,
    base_dir: Optional[str] = None,
    use_job_queue: bool = False,
    redis_url: Optional[str] = None,
) -> MQTTPlugin

Load MQTTPlugin from config or parameters.

Source code in src/flowerpower_mqtt/cli.py
def load_plugin(
    config: Optional[Path] = None,
    broker: Optional[str] = None,
    port: Optional[int] = None,
    base_dir: Optional[str] = None,
    use_job_queue: bool = False,
    redis_url: Optional[str] = None
) -> MQTTPlugin:
    """Load MQTTPlugin from config or parameters."""
    global _current_plugin, _config_file

    if config and config.exists():
        _config_file = config
        _current_plugin = MQTTPlugin.from_config(config)
        return _current_plugin

    # Create from parameters
    kwargs = {}
    if broker:
        kwargs["broker"] = broker
    if port:
        kwargs["port"] = port
    if base_dir:
        kwargs["base_dir"] = base_dir
    if use_job_queue:
        kwargs["use_job_queue"] = use_job_queue
    if redis_url:
        kwargs["redis_url"] = redis_url

    _current_plugin = MQTTPlugin(**kwargs)
    return _current_plugin
handle_async
handle_async(coro)

Handle async functions in CLI commands.

Source code in src/flowerpower_mqtt/cli.py
def handle_async(coro):
    """Handle async functions in CLI commands."""
    try:
        return asyncio.run(coro)
    except KeyboardInterrupt:
        console.print("\n[yellow]Operation cancelled by user[/yellow]")
        raise typer.Abort()
    except Exception as e:
        console.print(f"[red]Error: {e}[/red]")
        raise typer.Exit(1)
connect
connect(
    broker: str = typer.Option(
        "localhost", "--broker", "-b", help="MQTT broker hostname"
    ),
    port: int = typer.Option(1883, "--port", "-p", help="MQTT broker port"),
    config: Optional[Path] = typer.Option(
        None, "--config", "-c", help="Configuration file"
    ),
    base_dir: str = typer.Option(
        ".", "--base-dir", help="FlowerPower project directory"
    ),
    use_job_queue: bool = typer.Option(
        False, "--job-queue", help="Enable RQ job queue"
    ),
    redis_url: str = typer.Option(
        "redis://localhost:6379", "--redis-url", help="Redis URL for job queue"
    ),
    save_config: bool = typer.Option(
        False, "--save-config", help="Save configuration to file"
    ),
)

Connect to MQTT broker.

Source code in src/flowerpower_mqtt/cli.py
@app.command()
def connect(
    broker: str = typer.Option("localhost", "--broker", "-b", help="MQTT broker hostname"),
    port: int = typer.Option(1883, "--port", "-p", help="MQTT broker port"),
    config: Optional[Path] = typer.Option(None, "--config", "-c", help="Configuration file"),
    base_dir: str = typer.Option(".", "--base-dir", help="FlowerPower project directory"),
    use_job_queue: bool = typer.Option(False, "--job-queue", help="Enable RQ job queue"),
    redis_url: str = typer.Option("redis://localhost:6379", "--redis-url", help="Redis URL for job queue"),
    save_config: bool = typer.Option(False, "--save-config", help="Save configuration to file")
):
    """Connect to MQTT broker."""

    async def _connect():
        with Progress(
            SpinnerColumn(),
            TextColumn("[progress.description]{task.description}"),
            console=console
        ) as progress:
            task = progress.add_task(f"Connecting to {broker}:{port}...", total=None)

            try:
                plugin = load_plugin(
                    config=config,
                    broker=broker,
                    port=port,
                    base_dir=base_dir,
                    use_job_queue=use_job_queue,
                    redis_url=redis_url
                )

                await plugin.connect()

                progress.update(task, description=f"✅ Connected to {broker}:{port}")

                # Show connection info
                console.print()
                info_table = Table(title="Connection Information")
                info_table.add_column("Property", style="bold blue")
                info_table.add_column("Value", style="green")

                info_table.add_row("Broker", f"{broker}:{port}")
                info_table.add_row("Base Directory", base_dir)
                info_table.add_row("Job Queue", "Enabled" if use_job_queue else "Disabled")
                if use_job_queue:
                    info_table.add_row("Redis URL", redis_url)

                console.print(info_table)

                # Save config if requested
                if save_config:
                    config_path = get_config_path()
                    plugin.save_config(config_path)
                    console.print(f"\n[green]Configuration saved to {config_path}[/green]")

            except Exception as e:
                progress.update(task, description=f"❌ Connection failed: {e}")
                raise

    handle_async(_connect())
subscribe
subscribe(
    topic: str = typer.Argument(..., help="MQTT topic to subscribe to"),
    pipeline: str = typer.Argument(..., help="FlowerPower pipeline name"),
    qos: int = typer.Option(0, "--qos", "-q", help="QoS level (0, 1, or 2)"),
    execution_mode: str = typer.Option(
        "sync", "--mode", "-m", help="Execution mode (sync, async, mixed)"
    ),
    config: Optional[Path] = typer.Option(
        None, "--config", "-c", help="Configuration file"
    ),
    save_config: bool = typer.Option(
        False, "--save-config", help="Save subscription to config file"
    ),
)

Subscribe to MQTT topic.

Source code in src/flowerpower_mqtt/cli.py
@app.command()
def subscribe(
    topic: str = typer.Argument(..., help="MQTT topic to subscribe to"),
    pipeline: str = typer.Argument(..., help="FlowerPower pipeline name"),
    qos: int = typer.Option(0, "--qos", "-q", help="QoS level (0, 1, or 2)"),
    execution_mode: str = typer.Option("sync", "--mode", "-m", help="Execution mode (sync, async, mixed)"),
    config: Optional[Path] = typer.Option(None, "--config", "-c", help="Configuration file"),
    save_config: bool = typer.Option(False, "--save-config", help="Save subscription to config file")
):
    """Subscribe to MQTT topic."""

    # Validate inputs
    if qos not in [0, 1, 2]:
        console.print("[red]QoS must be 0, 1, or 2[/red]")
        raise typer.Exit(1)

    if execution_mode not in ["sync", "async", "mixed"]:
        console.print("[red]Execution mode must be sync, async, or mixed[/red]")
        raise typer.Exit(1)

    async def _subscribe():
        try:
            # Load or use existing plugin
            plugin = _current_plugin or load_plugin(config=config)

            if not plugin.is_connected:
                console.print("[yellow]Plugin not connected. Connecting first...[/yellow]")
                await plugin.connect()

            await plugin.subscribe(topic, pipeline, qos, execution_mode)

            # Show subscription info
            console.print(f"\n[green]✅ Subscribed to '{topic}' -> '{pipeline}'[/green]")

            sub_table = Table(title="Subscription Details")
            sub_table.add_column("Property", style="bold blue")
            sub_table.add_column("Value", style="green")

            sub_table.add_row("Topic", topic)
            sub_table.add_row("Pipeline", pipeline)
            sub_table.add_row("QoS Level", str(qos))
            sub_table.add_row("Execution Mode", execution_mode)

            console.print(sub_table)

            # Save config if requested
            if save_config:
                config_path = get_config_path()
                plugin.save_config(config_path)
                console.print(f"\n[green]Subscription saved to {config_path}[/green]")

        except Exception as e:
            console.print(f"[red]Subscription failed: {e}[/red]")
            raise typer.Exit(1)

    handle_async(_subscribe())
listen
listen(
    background: bool = typer.Option(
        False, "--background", "-bg", help="Run listener in background"
    ),
    execution_mode: Optional[str] = typer.Option(
        None,
        "--override-mode",
        help="Override execution mode for all pipelines",
    ),
    config: Optional[Path] = typer.Option(
        None, "--config", "-c", help="Configuration file"
    ),
    timeout: Optional[int] = typer.Option(
        None, "--timeout", help="Stop after specified seconds"
    ),
)

Start listening for MQTT messages.

Source code in src/flowerpower_mqtt/cli.py
@app.command()
def listen(
    background: bool = typer.Option(False, "--background", "-bg", help="Run listener in background"),
    execution_mode: Optional[str] = typer.Option(None, "--override-mode", help="Override execution mode for all pipelines"),
    config: Optional[Path] = typer.Option(None, "--config", "-c", help="Configuration file"),
    timeout: Optional[int] = typer.Option(None, "--timeout", help="Stop after specified seconds")
):
    """Start listening for MQTT messages."""

    async def _listen():
        try:
            # Load or use existing plugin
            plugin = _current_plugin or load_plugin(config=config)

            if not plugin.is_connected:
                console.print("[yellow]Plugin not connected. Connecting first...[/yellow]")
                await plugin.connect()

            subscriptions = plugin.get_subscriptions()
            if not subscriptions:
                console.print("[yellow]No subscriptions configured. Use 'subscribe' command first.[/yellow]")
                return

            # Show listener info
            console.print(Panel(
                f"[bold green]Starting MQTT Listener[/bold green]\n\n"
                f"Subscriptions: {len(subscriptions)}\n"
                f"Background mode: {'Yes' if background else 'No'}\n"
                f"Execution override: {execution_mode or 'None'}\n"
                f"Timeout: {timeout or 'None'}",
                title="Listener Configuration"
            ))

            # Show subscriptions table
            sub_table = Table(title="Active Subscriptions")
            sub_table.add_column("Topic", style="bold blue")
            sub_table.add_column("Pipeline", style="green")
            sub_table.add_column("QoS", style="yellow")
            sub_table.add_column("Mode", style="magenta")

            for sub in subscriptions:
                sub_table.add_row(
                    sub.get("topic", ""),
                    sub.get("pipeline", ""),
                    str(sub.get("qos", 0)),
                    sub.get("execution_mode", "sync")
                )

            console.print(sub_table)
            console.print("\n[bold yellow]Press Ctrl+C to stop listener[/bold yellow]\n")

            # Start listener
            if timeout:
                # Run with timeout
                try:
                    await asyncio.wait_for(
                        plugin.start_listener(background=background, execution_mode=execution_mode),
                        timeout=timeout
                    )
                except asyncio.TimeoutError:
                    console.print(f"\n[yellow]Listener stopped after {timeout} seconds[/yellow]")
            else:
                # Run indefinitely
                await plugin.start_listener(background=background, execution_mode=execution_mode)

        except KeyboardInterrupt:
            console.print("\n[yellow]Stopping listener...[/yellow]")
            if plugin:
                await plugin.stop_listener()
                console.print("[green]Listener stopped gracefully[/green]")
        except Exception as e:
            console.print(f"[red]Listener error: {e}[/red]")
            raise typer.Exit(1)

    handle_async(_listen())
status
status(
    config: Optional[Path] = typer.Option(
        None, "--config", "-c", help="Configuration file"
    ),
    json_output: bool = typer.Option(False, "--json", help="Output as JSON"),
)

Show current plugin status and statistics.

Source code in src/flowerpower_mqtt/cli.py
@app.command()
def status(
    config: Optional[Path] = typer.Option(None, "--config", "-c", help="Configuration file"),
    json_output: bool = typer.Option(False, "--json", help="Output as JSON")
):
    """Show current plugin status and statistics."""

    async def _status():
        try:
            # Load or use existing plugin
            plugin = _current_plugin or load_plugin(config=config)

            # Get statistics
            stats = plugin.get_statistics()
            subscriptions = plugin.get_subscriptions()

            if json_output:
                # JSON output
                output = {
                    "status": stats,
                    "subscriptions": subscriptions
                }
                console.print(json.dumps(output, indent=2, default=str))
                return

            # Rich formatted output
            status_table = Table(title="Plugin Status")
            status_table.add_column("Property", style="bold blue")
            status_table.add_column("Value", style="green")

            status_table.add_row("Connected", "✅ Yes" if stats.get("connected", False) else "❌ No")
            status_table.add_row("Broker", stats.get("broker", "N/A"))
            status_table.add_row("Listening", "✅ Yes" if stats.get("running", False) else "❌ No")
            status_table.add_row("Runtime", f"{stats.get('runtime_seconds', 0):.1f}s")
            status_table.add_row("Messages", str(stats.get("message_count", 0)))
            status_table.add_row("Pipelines Executed", str(stats.get("pipeline_count", 0)))
            status_table.add_row("Errors", str(stats.get("error_count", 0)))
            status_table.add_row("Subscriptions", str(len(subscriptions)))
            status_table.add_row("Job Queue", "✅ Enabled" if stats.get("job_queue_enabled", False) else "❌ Disabled")

            console.print(status_table)

            if subscriptions:
                console.print("\n")
                sub_table = Table(title="Subscription Details")
                sub_table.add_column("Topic", style="bold blue")
                sub_table.add_column("Pipeline", style="green")
                sub_table.add_column("QoS", style="yellow")
                sub_table.add_column("Mode", style="magenta")
                sub_table.add_column("Messages", style="cyan")

                for sub in subscriptions:
                    sub_table.add_row(
                        sub.get("topic", ""),
                        sub.get("pipeline", ""),
                        str(sub.get("qos", 0)),
                        sub.get("execution_mode", "sync"),
                        str(sub.get("message_count", 0))
                    )

                console.print(sub_table)

        except Exception as e:
            console.print(f"[red]Status check failed: {e}[/red]")
            raise typer.Exit(1)

    handle_async(_status())
disconnect
disconnect(
    config: Optional[Path] = typer.Option(
        None, "--config", "-c", help="Configuration file"
    )
)

Disconnect from MQTT broker.

Source code in src/flowerpower_mqtt/cli.py
@app.command()
def disconnect(
    config: Optional[Path] = typer.Option(None, "--config", "-c", help="Configuration file")
):
    """Disconnect from MQTT broker."""

    async def _disconnect():
        try:
            plugin = _current_plugin or load_plugin(config=config)

            if not plugin.is_connected:
                console.print("[yellow]Plugin not connected[/yellow]")
                return

            with Progress(
                SpinnerColumn(),
                TextColumn("[progress.description]{task.description}"),
                console=console
            ) as progress:
                task = progress.add_task("Disconnecting...", total=None)

                await plugin.disconnect()

                progress.update(task, description="✅ Disconnected successfully")

            console.print("[green]Disconnected from MQTT broker[/green]")

        except Exception as e:
            console.print(f"[red]Disconnect failed: {e}[/red]")
            raise typer.Exit(1)

    handle_async(_disconnect())
config_create
config_create(
    output: Path = typer.Option(
        "mqtt_config.yml", "--output", "-o", help="Output configuration file"
    ),
    interactive: bool = typer.Option(
        False, "--interactive", "-i", help="Interactive configuration"
    ),
    with_job_queue: bool = typer.Option(
        False, "--job-queue", help="Enable job queue configuration"
    ),
)

Create a new configuration file.

Source code in src/flowerpower_mqtt/cli.py
@config_app.command("create")
def config_create(
    output: Path = typer.Option("mqtt_config.yml", "--output", "-o", help="Output configuration file"),
    interactive: bool = typer.Option(False, "--interactive", "-i", help="Interactive configuration"),
    with_job_queue: bool = typer.Option(False, "--job-queue", help="Enable job queue configuration")
):
    """Create a new configuration file."""

    try:
        if output.exists() and not Confirm.ask(f"Configuration file {output} exists. Overwrite?"):
            console.print("[yellow]Configuration creation cancelled[/yellow]")
            return

        config = FlowerPowerMQTTConfig()

        if interactive:
            console.print(Panel("[bold blue]Interactive Configuration Setup[/bold blue]"))

            # MQTT Configuration
            console.print("\n[bold]MQTT Broker Configuration[/bold]")
            config.mqtt.broker = Prompt.ask("MQTT Broker hostname", default=config.mqtt.broker)
            config.mqtt.port = int(Prompt.ask("MQTT Broker port", default=str(config.mqtt.port)))
            config.mqtt.keepalive = int(Prompt.ask("Keep alive seconds", default=str(config.mqtt.keepalive)))
            config.mqtt.client_id = Prompt.ask("Client ID (optional)", default=config.mqtt.client_id or "")

            # Base directory
            config.base_dir = Prompt.ask("FlowerPower base directory", default=config.base_dir)

            # Job Queue
            if with_job_queue or Confirm.ask("Enable job queue for async processing?"):
                config.job_queue.enabled = True
                config.job_queue.redis_url = Prompt.ask("Redis URL", default=config.job_queue.redis_url)
                config.job_queue.queue_name = Prompt.ask("Queue name", default=config.job_queue.queue_name)
                config.job_queue.worker_count = int(Prompt.ask("Worker count", default=str(config.job_queue.worker_count)))

            # Log level
            log_levels = ["DEBUG", "INFO", "WARNING", "ERROR"]
            console.print(f"Log levels: {', '.join(log_levels)}")
            config.log_level = Prompt.ask("Log level", default=config.log_level, choices=log_levels)

        elif with_job_queue:
            config.job_queue.enabled = True

        # Save configuration
        config.to_yaml(output)

        console.print(f"\n[green]✅ Configuration created: {output}[/green]")

        # Show configuration preview
        with open(output) as f:
            config_content = f.read()

        syntax = Syntax(config_content, "yaml", theme="monokai", line_numbers=True)
        console.print(Panel(syntax, title="Configuration Preview"))

    except Exception as e:
        console.print(f"[red]Configuration creation failed: {e}[/red]")
        raise typer.Exit(1)
config_validate
config_validate(
    config_file: Path = typer.Argument(
        ..., help="Configuration file to validate"
    )
)

Validate configuration file.

Source code in src/flowerpower_mqtt/cli.py
@config_app.command("validate")
def config_validate(
    config_file: Path = typer.Argument(..., help="Configuration file to validate")
):
    """Validate configuration file."""

    try:
        if not config_file.exists():
            console.print(f"[red]Configuration file not found: {config_file}[/red]")
            raise typer.Exit(1)

        with Progress(
            SpinnerColumn(),
            TextColumn("[progress.description]{task.description}"),
            console=console
        ) as progress:
            task = progress.add_task("Validating configuration...", total=None)

            # Load and validate configuration
            config = FlowerPowerMQTTConfig.from_yaml(config_file)

            progress.update(task, description="✅ Configuration is valid")

        console.print(f"[green]✅ Configuration file {config_file} is valid[/green]")

        # Show summary
        summary_table = Table(title="Configuration Summary")
        summary_table.add_column("Section", style="bold blue")
        summary_table.add_column("Details", style="green")

        summary_table.add_row("MQTT Broker", f"{config.mqtt.broker}:{config.mqtt.port}")
        summary_table.add_row("Base Directory", config.base_dir)
        summary_table.add_row("Job Queue", "Enabled" if config.job_queue.enabled else "Disabled")
        summary_table.add_row("Subscriptions", str(len(config.subscriptions)))
        summary_table.add_row("Log Level", config.log_level)

        console.print(summary_table)

        if config.subscriptions:
            console.print("\n")
            sub_table = Table(title="Configured Subscriptions")
            sub_table.add_column("Topic", style="bold blue")
            sub_table.add_column("Pipeline", style="green")
            sub_table.add_column("QoS", style="yellow")
            sub_table.add_column("Mode", style="magenta")

            for sub in config.subscriptions:
                sub_table.add_row(sub.topic, sub.pipeline, str(sub.qos), sub.execution_mode)

            console.print(sub_table)

    except Exception as e:
        console.print(f"[red]Configuration validation failed: {e}[/red]")
        raise typer.Exit(1)
config_show
config_show(
    config_file: Optional[Path] = typer.Option(
        None, "--config", "-c", help="Configuration file"
    ),
    format_output: str = typer.Option(
        "yaml", "--format", help="Output format (yaml, json)"
    ),
)

Show current configuration.

Source code in src/flowerpower_mqtt/cli.py
@config_app.command("show")
def config_show(
    config_file: Optional[Path] = typer.Option(None, "--config", "-c", help="Configuration file"),
    format_output: str = typer.Option("yaml", "--format", help="Output format (yaml, json)")
):
    """Show current configuration."""

    try:
        config_path = config_file or get_config_path()

        if not config_path.exists():
            console.print(f"[yellow]Configuration file not found: {config_path}[/yellow]")
            console.print("Use 'flowerpower-mqtt config create' to create a new configuration.")
            return

        config = FlowerPowerMQTTConfig.from_yaml(config_path)

        if format_output == "json":
            config_dict = config.to_dict()
            console.print(json.dumps(config_dict, indent=2, default=str))
        else:
            # Show YAML content with syntax highlighting
            with open(config_path) as f:
                content = f.read()

            syntax = Syntax(content, "yaml", theme="monokai", line_numbers=True)
            console.print(Panel(syntax, title=f"Configuration: {config_path}"))

    except Exception as e:
        console.print(f"[red]Failed to show configuration: {e}[/red]")
        raise typer.Exit(1)
config_edit
config_edit(
    config_file: Optional[Path] = typer.Option(
        None, "--config", "-c", help="Configuration file"
    ),
    editor: Optional[str] = typer.Option(
        None, "--editor", help="Editor to use (default: $EDITOR)"
    ),
)

Edit configuration file.

Source code in src/flowerpower_mqtt/cli.py
@config_app.command("edit")
def config_edit(
    config_file: Optional[Path] = typer.Option(None, "--config", "-c", help="Configuration file"),
    editor: Optional[str] = typer.Option(None, "--editor", help="Editor to use (default: $EDITOR)")
):
    """Edit configuration file."""

    import os
    import subprocess

    try:
        config_path = config_file or get_config_path()

        if not config_path.exists():
            if Confirm.ask(f"Configuration file {config_path} does not exist. Create it?"):
                # Create basic config
                config = FlowerPowerMQTTConfig()
                config.to_yaml(config_path)
                console.print(f"[green]Created {config_path}[/green]")
            else:
                return

        # Determine editor
        editor_cmd = editor or os.environ.get("EDITOR", "nano")

        console.print(f"[blue]Opening {config_path} with {editor_cmd}...[/blue]")

        # Open editor
        result = subprocess.run([editor_cmd, str(config_path)])

        if result.returncode == 0:
            console.print(f"[green]Configuration file {config_path} updated[/green]")

            # Validate after editing
            try:
                FlowerPowerMQTTConfig.from_yaml(config_path)
                console.print("[green]✅ Configuration is valid[/green]")
            except Exception as e:
                console.print(f"[yellow]⚠️  Configuration validation warning: {e}[/yellow]")
        else:
            console.print(f"[yellow]Editor exited with code {result.returncode}[/yellow]")

    except Exception as e:
        console.print(f"[red]Failed to edit configuration: {e}[/red]")
        raise typer.Exit(1)
monitor
monitor(
    config: Optional[Path] = typer.Option(
        None, "--config", "-c", help="Configuration file"
    ),
    interval: int = typer.Option(
        5, "--interval", "-i", help="Update interval in seconds"
    ),
    duration: Optional[int] = typer.Option(
        None, "--duration", "-d", help="Monitor duration in seconds"
    ),
    json_output: bool = typer.Option(False, "--json", help="Output as JSON"),
)

Monitor MQTT plugin in real-time.

Source code in src/flowerpower_mqtt/cli.py
@app.command()
def monitor(
    config: Optional[Path] = typer.Option(None, "--config", "-c", help="Configuration file"),
    interval: int = typer.Option(5, "--interval", "-i", help="Update interval in seconds"),
    duration: Optional[int] = typer.Option(None, "--duration", "-d", help="Monitor duration in seconds"),
    json_output: bool = typer.Option(False, "--json", help="Output as JSON")
):
    """Monitor MQTT plugin in real-time."""

    async def _monitor():
        try:
            plugin = _current_plugin or load_plugin(config=config)

            if not plugin.is_connected:
                console.print("[yellow]Plugin not connected. Connect first with 'flowerpower-mqtt connect'[/yellow]")
                return

            console.print(Panel(
                f"[bold green]Real-time Monitoring[/bold green]\n\n"
                f"Update interval: {interval}s\n"
                f"Duration: {'Unlimited' if not duration else f'{duration}s'}\n"
                f"Press Ctrl+C to stop",
                title="Monitor Configuration"
            ))

            start_time = asyncio.get_event_loop().time()
            iteration = 0

            try:
                while True:
                    # Check duration limit
                    if duration and (asyncio.get_event_loop().time() - start_time) >= duration:
                        console.print(f"\n[yellow]Monitoring stopped after {duration} seconds[/yellow]")
                        break

                    iteration += 1
                    stats = plugin.get_statistics()
                    subscriptions = plugin.get_subscriptions()

                    if json_output:
                        output = {
                            "timestamp": asyncio.get_event_loop().time(),
                            "iteration": iteration,
                            "stats": stats,
                            "subscriptions": subscriptions
                        }
                        console.print(json.dumps(output, indent=2, default=str))
                    else:
                        # Clear screen and show updated stats
                        console.clear()
                        console.print(f"[bold blue]Monitor #{iteration}[/bold blue] - {asyncio.get_event_loop().time():.1f}")

                        # Stats table
                        stats_table = Table(title="Real-time Statistics")
                        stats_table.add_column("Metric", style="bold blue")
                        stats_table.add_column("Value", style="green")

                        stats_table.add_row("Connected", "✅ Yes" if stats.get("connected", False) else "❌ No")
                        stats_table.add_row("Listening", "✅ Yes" if stats.get("running", False) else "❌ No")
                        stats_table.add_row("Runtime", f"{stats.get('runtime_seconds', 0):.1f}s")
                        stats_table.add_row("Messages", str(stats.get("message_count", 0)))
                        stats_table.add_row("Pipeline Executions", str(stats.get("pipeline_count", 0)))
                        stats_table.add_row("Errors", str(stats.get("error_count", 0)))

                        console.print(stats_table)

                        # Subscription activity
                        if subscriptions:
                            active_subs = [s for s in subscriptions if s.get('message_count', 0) > 0]
                            if active_subs:
                                console.print("\n")
                                activity_table = Table(title="Active Subscriptions")
                                activity_table.add_column("Topic", style="bold blue")
                                activity_table.add_column("Messages", style="yellow")
                                activity_table.add_column("Pipeline", style="green")

                                for sub in active_subs:
                                    activity_table.add_row(
                                        sub.get("topic", ""),
                                        str(sub.get("message_count", 0)),
                                        sub.get("pipeline", "")
                                    )

                                console.print(activity_table)

                        console.print(f"\n[dim]Press Ctrl+C to stop monitoring...[/dim]")

                    await asyncio.sleep(interval)

            except KeyboardInterrupt:
                console.print("\n[yellow]Monitoring stopped by user[/yellow]")

        except Exception as e:
            console.print(f"[red]Monitoring failed: {e}[/red]")
            raise typer.Exit(1)

    handle_async(_monitor())
list_subscriptions
list_subscriptions(
    config: Optional[Path] = typer.Option(
        None, "--config", "-c", help="Configuration file"
    ),
    active_only: bool = typer.Option(
        False, "--active", help="Show only subscriptions with messages"
    ),
    json_output: bool = typer.Option(False, "--json", help="Output as JSON"),
)

List all MQTT subscriptions.

Source code in src/flowerpower_mqtt/cli.py
@app.command()
def list_subscriptions(
    config: Optional[Path] = typer.Option(None, "--config", "-c", help="Configuration file"),
    active_only: bool = typer.Option(False, "--active", help="Show only subscriptions with messages"),
    json_output: bool = typer.Option(False, "--json", help="Output as JSON")
):
    """List all MQTT subscriptions."""

    async def _list_subscriptions():
        try:
            plugin = _current_plugin or load_plugin(config=config)
            subscriptions = plugin.get_subscriptions()

            if not subscriptions:
                console.print("[yellow]No subscriptions found[/yellow]")
                return

            if active_only:
                subscriptions = [s for s in subscriptions if s.get('message_count', 0) > 0]
                if not subscriptions:
                    console.print("[yellow]No active subscriptions found[/yellow]")
                    return

            if json_output:
                console.print(json.dumps(subscriptions, indent=2, default=str))
                return

            # Rich formatted table
            sub_table = Table(title=f"MQTT Subscriptions ({'Active Only' if active_only else 'All'})")
            sub_table.add_column("Topic", style="bold blue")
            sub_table.add_column("Pipeline", style="green")
            sub_table.add_column("QoS", style="yellow")
            sub_table.add_column("Mode", style="magenta")
            sub_table.add_column("Messages", style="cyan")
            sub_table.add_column("Errors", style="red")

            for sub in subscriptions:
                sub_table.add_row(
                    sub.get("topic", ""),
                    sub.get("pipeline", ""),
                    str(sub.get("qos", 0)),
                    sub.get("execution_mode", "sync"),
                    str(sub.get("message_count", 0)),
                    str(sub.get("error_count", 0))
                )

            console.print(sub_table)
            console.print(f"\n[dim]Total subscriptions: {len(subscriptions)}[/dim]")

        except Exception as e:
            console.print(f"[red]Failed to list subscriptions: {e}[/red]")
            raise typer.Exit(1)

    handle_async(_list_subscriptions())
unsubscribe
unsubscribe(
    topic: str = typer.Argument(..., help="Topic to unsubscribe from"),
    config: Optional[Path] = typer.Option(
        None, "--config", "-c", help="Configuration file"
    ),
    save_config: bool = typer.Option(
        False, "--save-config", help="Save changes to config file"
    ),
)

Unsubscribe from MQTT topic.

Source code in src/flowerpower_mqtt/cli.py
@app.command() 
def unsubscribe(
    topic: str = typer.Argument(..., help="Topic to unsubscribe from"),
    config: Optional[Path] = typer.Option(None, "--config", "-c", help="Configuration file"),
    save_config: bool = typer.Option(False, "--save-config", help="Save changes to config file")
):
    """Unsubscribe from MQTT topic."""

    async def _unsubscribe():
        try:
            plugin = _current_plugin or load_plugin(config=config)

            if not plugin.is_connected:
                console.print("[yellow]Plugin not connected. Connect first.[/yellow]")
                return

            # Check if topic exists
            subscriptions = plugin.get_subscriptions()
            topic_exists = any(sub.get("topic") == topic for sub in subscriptions)

            if not topic_exists:
                console.print(f"[yellow]Topic '{topic}' is not subscribed[/yellow]")
                return

            await plugin.unsubscribe(topic)
            console.print(f"[green]✅ Unsubscribed from '{topic}'[/green]")

            if save_config:
                config_path = get_config_path()
                plugin.save_config(config_path)
                console.print(f"[green]Changes saved to {config_path}[/green]")

        except Exception as e:
            console.print(f"[red]Unsubscribe failed: {e}[/red]")
            raise typer.Exit(1)

    handle_async(_unsubscribe())
jobs_status
jobs_status(
    config: Optional[Path] = typer.Option(
        None, "--config", "-c", help="Configuration file"
    ),
    json_output: bool = typer.Option(False, "--json", help="Output as JSON"),
)

Show job queue status.

Source code in src/flowerpower_mqtt/cli.py
@jobs_app.command("status")
def jobs_status(
    config: Optional[Path] = typer.Option(None, "--config", "-c", help="Configuration file"),
    json_output: bool = typer.Option(False, "--json", help="Output as JSON")
):
    """Show job queue status."""

    async def _jobs_status():
        try:
            plugin = _current_plugin or load_plugin(config=config)

            if not plugin.config.job_queue.enabled:
                console.print("[yellow]Job queue is not enabled in configuration[/yellow]")
                return

            stats = plugin.get_statistics()

            if json_output:
                console.print(json.dumps({
                    "job_queue_enabled": stats.get("job_queue_enabled", False),
                    "job_queue_stats": stats.get("job_queue_stats", {})
                }, indent=2))
                return

            # Rich formatted output
            queue_table = Table(title="Job Queue Status")
            queue_table.add_column("Property", style="bold blue")
            queue_table.add_column("Value", style="green")

            queue_table.add_row("Enabled", "✅ Yes" if stats.get("job_queue_enabled", False) else "❌ No")

            if "job_queue_stats" in stats:
                job_stats = stats["job_queue_stats"]
                queue_table.add_row("Queue Name", job_stats.get("queue_name", "N/A"))
                queue_table.add_row("Queue Type", job_stats.get("type", "N/A"))

            console.print(queue_table)

        except Exception as e:
            console.print(f"[red]Failed to get job queue status: {e}[/red]")
            raise typer.Exit(1)

    handle_async(_jobs_status())
jobs_worker
jobs_worker(
    action: str = typer.Argument(
        ..., help="Worker action (start, stop, status)"
    ),
    count: int = typer.Option(1, "--count", "-c", help="Number of workers"),
    config: Optional[Path] = typer.Option(
        None, "--config", "-c", help="Configuration file"
    ),
)

Manage RQ workers.

Source code in src/flowerpower_mqtt/cli.py
@jobs_app.command("worker")
def jobs_worker(
    action: str = typer.Argument(..., help="Worker action (start, stop, status)"),
    count: int = typer.Option(1, "--count", "-c", help="Number of workers"),
    config: Optional[Path] = typer.Option(None, "--config", "-c", help="Configuration file")
):
    """Manage RQ workers."""

    if action not in ["start", "stop", "status"]:
        console.print("[red]Worker action must be: start, stop, or status[/red]")
        raise typer.Exit(1)

    try:
        plugin = load_plugin(config=config)

        if not plugin.config.job_queue.enabled:
            console.print("[yellow]Job queue is not enabled in configuration[/yellow]")
            return

        if action == "start":
            import subprocess
            redis_url = plugin.config.job_queue.redis_url
            queue_name = plugin.config.job_queue.queue_name

            console.print(f"[blue]Starting {count} RQ worker(s) for queue '{queue_name}'...[/blue]")

            for i in range(count):
                cmd = f"rq worker {queue_name} --url {redis_url}"
                console.print(f"[dim]Worker {i+1} command: {cmd}[/dim]")

                # In a real implementation, you'd start these as background processes
                console.print(f"[green]Worker {i+1} started (run manually: {cmd})[/green]")

            console.print(f"\n[bold blue]To start workers manually, run:[/bold blue]")
            console.print(f"rq worker {queue_name} --url {redis_url}")

        elif action == "status":
            console.print("[blue]Checking worker status...[/blue]")

            try:
                import subprocess
                import psutil

                redis_url = plugin.config.job_queue.redis_url
                queue_name = plugin.config.job_queue.queue_name

                # Check for running RQ worker processes
                worker_processes = []
                for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
                    try:
                        if proc.info['name'] == 'python' and proc.info['cmdline']:
                            cmdline = ' '.join(proc.info['cmdline'])
                            if f'rq worker {queue_name}' in cmdline and redis_url in cmdline:
                                worker_processes.append(proc.info)
                    except (psutil.NoSuchProcess, psutil.AccessDenied):
                        continue

                if worker_processes:
                    console.print(f"[green]✅ Found {len(worker_processes)} running worker(s)[/green]")

                    status_table = Table(title="Running Workers")
                    status_table.add_column("PID", style="bold blue")
                    status_table.add_column("Command", style="green")
                    status_table.add_column("Status", style="yellow")

                    for proc in worker_processes:
                        status_table.add_row(
                            str(proc['pid']),
                            ' '.join(proc['cmdline'][:4]) + '...' if len(proc['cmdline']) > 4 else ' '.join(proc['cmdline']),
                            "Running"
                        )

                    console.print(status_table)
                else:
                    console.print(f"[yellow]No running workers found for queue '{queue_name}'[/yellow]")
                    console.print(f"[dim]To start workers, run: rq worker {queue_name} --url {redis_url}[/dim]")

            except ImportError:
                console.print("[red]psutil not available. Install with: pip install psutil[/red]")
                console.print("[yellow]Cannot check worker status without psutil[/yellow]")
            except Exception as e:
                console.print(f"[red]Error checking worker status: {e}[/red]")

        elif action == "stop":
            console.print("[blue]Stopping workers...[/blue]")

            try:
                import subprocess
                import signal
                import psutil

                redis_url = plugin.config.job_queue.redis_url
                queue_name = plugin.config.job_queue.queue_name

                # Find and stop running RQ worker processes
                stopped_count = 0
                for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
                    try:
                        if proc.info['name'] == 'python' and proc.info['cmdline']:
                            cmdline = ' '.join(proc.info['cmdline'])
                            if f'rq worker {queue_name}' in cmdline and redis_url in cmdline:
                                console.print(f"[blue]Stopping worker PID {proc.info['pid']}...[/blue]")
                                proc.terminate()
                                stopped_count += 1
                    except (psutil.NoSuchProcess, psutil.AccessDenied):
                        continue

                if stopped_count > 0:
                    console.print(f"[green]✅ Stopped {stopped_count} worker(s)[/green]")
                else:
                    console.print(f"[yellow]No running workers found for queue '{queue_name}'[/yellow]")

            except ImportError:
                console.print("[red]psutil not available. Install with: pip install psutil[/red]")
                console.print("[yellow]Cannot stop workers without psutil[/yellow]")
            except Exception as e:
                console.print(f"[red]Error stopping workers: {e}[/red]")

    except Exception as e:
        console.print(f"[red]Worker management failed: {e}[/red]")
        raise typer.Exit(1)
main
main()

Main CLI entry point.

Source code in src/flowerpower_mqtt/cli.py
def main():
    """Main CLI entry point."""
    app()

client

MQTT client wrapper with QoS support for FlowerPower integration.

Classes
MQTTMessage

Bases: Struct

Immutable wrapper for MQTT messages with additional metadata and deserialization support.

Source code in src/flowerpower_mqtt/client.py
class MQTTMessage(msgspec.Struct):
    """Immutable wrapper for MQTT messages with additional metadata and deserialization support."""

    topic: str
    payload: bytes
    qos: int
    retain: bool
    timestamp: float

    def __post_init__(self) -> None:
        """Validate fields after initialization."""
        if not isinstance(self.topic, str) or not self.topic.strip():
            raise ValueError("Topic must be a non-empty string")

        if not isinstance(self.payload, bytes):
            raise ValueError("Payload must be bytes")

        if self.qos not in (0, 1, 2):
            raise ValueError(f"QoS must be 0, 1, or 2, got {self.qos}")

        if not isinstance(self.timestamp, (int, float)) or self.timestamp < 0:
            raise ValueError("Timestamp must be a non-negative number")

    @property
    def payload_str(self) -> Optional[str]:
        """Attempt to decode payload as UTF-8 string."""
        if not self.payload:
            return ""

        try:
            return self.payload.decode('utf-8')
        except UnicodeDecodeError:
            logger.debug("Payload is not valid UTF-8")
            return None

    def deserialize_json(self) -> Optional[Any]:
        """Deserialize payload as JSON using msgspec."""
        payload_str = self.payload_str
        if payload_str is None:
            logger.debug("Cannot deserialize non-UTF-8 payload as JSON")
            return None

        try:
            return msgspec.json.decode(self.payload, type=Any)
        except Exception as e:
            logger.debug(f"Failed to deserialize as JSON: {e}")
            return None

    def deserialize_yaml(self) -> Optional[Any]:
        """Deserialize payload as YAML."""
        payload_str = self.payload_str
        if payload_str is None:
            logger.debug("Cannot deserialize non-UTF-8 payload as YAML")
            return None

        try:
            return msgspec.yaml.decode(payload_str, type=Any)
        except Exception as e:
            logger.debug(f"Failed to deserialize as YAML: {e}")
            return None

    def deserialize_msgpack(self) -> Optional[Any]:
        """Deserialize payload as MessagePack using msgspec."""
        try:
            return msgspec.msgpack.decode(self.payload, type=Any)
        except Exception as e:
            logger.debug(f"Failed to deserialize as MessagePack: {e}")
            return None

    def deserialize_pickle(self) -> Optional[Any]:
        """Deserialize payload as Python pickle."""
        try:
            return pickle.loads(self.payload)
        except Exception as e:
            logger.debug(f"Failed to deserialize as pickle: {e}")
            return None

    def deserialize_protobuf(self, message_class: Type[ProtobufMessage]) -> Optional[ProtobufMessage]:
        """Deserialize payload as Protocol Buffers."""
        try:
            message = message_class()
            message.ParseFromString(self.payload)
            return message
        except Exception as e:
            logger.debug(f"Failed to deserialize as protobuf: {e}")
            return None

    def deserialize_pyarrow(self) -> Optional[Any]:
        """Deserialize payload as PyArrow IPC format."""
        try:
            reader = pa.ipc.open_stream(self.payload)
            return reader.read_all()
        except Exception as e:
            logger.debug(f"Failed to deserialize as PyArrow IPC: {e}")
            return None

    def serialize_json(self, data: Any) -> bytes:
        """Serialize data to JSON format."""
        try:
            return msgspec.json.encode(data)
        except Exception as e:
            logger.error(f"Failed to serialize as JSON: {e}")
            raise

    def serialize_yaml(self, data: Any) -> bytes:
        """Serialize data to YAML format."""
        try:
            return msgspec.yaml.encode(data)
        except Exception as e:
            logger.error(f"Failed to serialize as YAML: {e}")
            raise

    def serialize_msgpack(self, data: Any) -> bytes:
        """Serialize data to MessagePack format."""
        try:
            return msgspec.msgpack.encode(data)
        except Exception as e:
            logger.error(f"Failed to serialize as MessagePack: {e}")
            raise

    def serialize_pickle(self, data: Any) -> bytes:
        """Serialize data to Python pickle format."""
        try:
            return pickle.dumps(data)
        except Exception as e:
            logger.error(f"Failed to serialize as pickle: {e}")
            raise

    def serialize_protobuf(self, message: ProtobufMessage) -> bytes:
        """Serialize Protocol Buffers message."""
        try:
            return message.SerializeToString()
        except Exception as e:
            logger.error(f"Failed to serialize as protobuf: {e}")
            raise

    def serialize_pyarrow(self, data: Any) -> bytes:
        """Serialize data to PyArrow IPC format."""
        try:
            # Handle different input types
            if isinstance(data, pa.Table):
                table = data
            elif isinstance(data, pa.RecordBatch):
                table = pa.Table.from_batches([data])
            elif isinstance(data, dict):
                # Convert dict to PyArrow table directly
                arrays = []
                names = []
                for key, value in data.items():
                    if isinstance(value, list):
                        arrays.append(pa.array(value))
                    else:
                        arrays.append(pa.array([value]))
                    names.append(key)
                table = pa.Table.from_arrays(arrays, names=names)
            elif hasattr(data, '__iter__') and not isinstance(data, (str, bytes)):
                # Handle iterable of dicts (list of records)
                if isinstance(data, list) and data and isinstance(data[0], dict):
                    # Convert list of dicts to table
                    arrays = []
                    names = list(data[0].keys())
                    for name in names:
                        values = [record.get(name) for record in data]
                        arrays.append(pa.array(values))
                    table = pa.Table.from_arrays(arrays, names=names)
                else:
                    raise ValueError("Unsupported data type for PyArrow serialization")
            else:
                raise ValueError("Unsupported data type for PyArrow serialization")

            sink = pa.BufferOutputStream()
            with pa.ipc.new_stream(sink, table.schema) as writer:
                writer.write(table)
            return sink.getvalue().to_pybytes()
        except Exception as e:
            logger.error(f"Failed to serialize as PyArrow IPC: {e}")
            raise

    def serialize(self, format_name: str, data: Any, **kwargs) -> bytes:
        """
        Serialize data using the specified format.

        Args:
            format_name: Format name ('json', 'yaml', 'msgpack', 'pickle', 'protobuf', 'pyarrow')
            data: Data to serialize
            **kwargs: Additional arguments for specific formats

        Returns:
            Serialized data as bytes

        Raises:
            ValueError: If format is not supported
        """
        format_name = format_name.lower()

        if format_name == 'json':
            return self.serialize_json(data)
        elif format_name == 'yaml':
            return self.serialize_yaml(data)
        elif format_name == 'msgpack':
            return self.serialize_msgpack(data)
        elif format_name == 'pickle':
            return self.serialize_pickle(data)
        elif format_name == 'protobuf':
            if 'message' in kwargs:
                return self.serialize_protobuf(kwargs['message'])
            else:
                raise ValueError("protobuf format requires 'message' kwarg with ProtobufMessage instance")
        elif format_name == 'pyarrow':
            return self.serialize_pyarrow(data)
        else:
            raise ValueError(f"Unsupported serialization format: {format_name}")

    def deserialize_auto(self) -> Optional[Any]:
        """
        Automatically detect and deserialize payload format.

        Detection order (most common to least common):
        1. JSON - Most common for MQTT payloads
        2. MessagePack - Efficient binary format
        3. YAML - Human-readable structured data
        4. PyArrow IPC - Columnar data format
        5. Pickle - Python-specific binary format

        Returns:
            Deserialized data or None if no format matches
        """
        # Try JSON first (most common)
        result = self.deserialize_json()
        if result is not None:
            logger.debug("Auto-detected format: JSON")
            return result

        # Try MessagePack (efficient binary)
        result = self.deserialize_msgpack()
        if result is not None:
            logger.debug("Auto-detected format: MessagePack")
            return result

        # Try YAML (human-readable)
        result = self.deserialize_yaml()
        if result is not None:
            logger.debug("Auto-detected format: YAML")
            return result

        # Try PyArrow IPC (columnar data)
        result = self.deserialize_pyarrow()
        if result is not None:
            logger.debug("Auto-detected format: PyArrow IPC")
            return result

        # Try Pickle (Python-specific, least safe)
        result = self.deserialize_pickle()
        if result is not None:
            logger.debug("Auto-detected format: Pickle")
            return result

        # No format detected
        logger.debug("Could not auto-detect payload format")
        return None

    def deserialize(self, format_name: str, **kwargs) -> Optional[Any]:
        """
        Deserialize payload using the specified format.

        Args:
            format_name: Format name ('json', 'yaml', 'msgpack', 'pickle', 'protobuf', 'pyarrow')
            **kwargs: Additional arguments for specific formats

        Returns:
            Deserialized data or None if deserialization fails

        Raises:
            ValueError: If format is not supported
        """
        format_name = format_name.lower()

        if format_name == 'auto':
            return self.deserialize_auto()
        elif format_name == 'json':
            return self.deserialize_json()
        elif format_name == 'yaml':
            return self.deserialize_yaml()
        elif format_name == 'msgpack':
            return self.deserialize_msgpack()
        elif format_name == 'pickle':
            return self.deserialize_pickle()
        elif format_name == 'protobuf':
            if 'message_class' in kwargs:
                return self.deserialize_protobuf(kwargs['message_class'])
            else:
                raise ValueError("protobuf format requires 'message_class' kwarg with message type")
        elif format_name == 'pyarrow':
            return self.deserialize_pyarrow()
        else:
            raise ValueError(f"Unsupported deserialization format: {format_name}")

    def is_empty_payload(self) -> bool:
        """Check if payload is empty."""
        return len(self.payload) == 0

    def get_payload_size(self) -> int:
        """Get payload size in bytes."""
        return len(self.payload)
Attributes
payload_str property
payload_str: Optional[str]

Attempt to decode payload as UTF-8 string.

Functions
__post_init__
__post_init__() -> None

Validate fields after initialization.

Source code in src/flowerpower_mqtt/client.py
def __post_init__(self) -> None:
    """Validate fields after initialization."""
    if not isinstance(self.topic, str) or not self.topic.strip():
        raise ValueError("Topic must be a non-empty string")

    if not isinstance(self.payload, bytes):
        raise ValueError("Payload must be bytes")

    if self.qos not in (0, 1, 2):
        raise ValueError(f"QoS must be 0, 1, or 2, got {self.qos}")

    if not isinstance(self.timestamp, (int, float)) or self.timestamp < 0:
        raise ValueError("Timestamp must be a non-negative number")
deserialize_json
deserialize_json() -> Optional[Any]

Deserialize payload as JSON using msgspec.

Source code in src/flowerpower_mqtt/client.py
def deserialize_json(self) -> Optional[Any]:
    """Deserialize payload as JSON using msgspec."""
    payload_str = self.payload_str
    if payload_str is None:
        logger.debug("Cannot deserialize non-UTF-8 payload as JSON")
        return None

    try:
        return msgspec.json.decode(self.payload, type=Any)
    except Exception as e:
        logger.debug(f"Failed to deserialize as JSON: {e}")
        return None
deserialize_yaml
deserialize_yaml() -> Optional[Any]

Deserialize payload as YAML.

Source code in src/flowerpower_mqtt/client.py
def deserialize_yaml(self) -> Optional[Any]:
    """Deserialize payload as YAML."""
    payload_str = self.payload_str
    if payload_str is None:
        logger.debug("Cannot deserialize non-UTF-8 payload as YAML")
        return None

    try:
        return msgspec.yaml.decode(payload_str, type=Any)
    except Exception as e:
        logger.debug(f"Failed to deserialize as YAML: {e}")
        return None
deserialize_msgpack
deserialize_msgpack() -> Optional[Any]

Deserialize payload as MessagePack using msgspec.

Source code in src/flowerpower_mqtt/client.py
def deserialize_msgpack(self) -> Optional[Any]:
    """Deserialize payload as MessagePack using msgspec."""
    try:
        return msgspec.msgpack.decode(self.payload, type=Any)
    except Exception as e:
        logger.debug(f"Failed to deserialize as MessagePack: {e}")
        return None
deserialize_pickle
deserialize_pickle() -> Optional[Any]

Deserialize payload as Python pickle.

Source code in src/flowerpower_mqtt/client.py
def deserialize_pickle(self) -> Optional[Any]:
    """Deserialize payload as Python pickle."""
    try:
        return pickle.loads(self.payload)
    except Exception as e:
        logger.debug(f"Failed to deserialize as pickle: {e}")
        return None
deserialize_protobuf
deserialize_protobuf(message_class: Type[Message]) -> Optional[ProtobufMessage]

Deserialize payload as Protocol Buffers.

Source code in src/flowerpower_mqtt/client.py
def deserialize_protobuf(self, message_class: Type[ProtobufMessage]) -> Optional[ProtobufMessage]:
    """Deserialize payload as Protocol Buffers."""
    try:
        message = message_class()
        message.ParseFromString(self.payload)
        return message
    except Exception as e:
        logger.debug(f"Failed to deserialize as protobuf: {e}")
        return None
deserialize_pyarrow
deserialize_pyarrow() -> Optional[Any]

Deserialize payload as PyArrow IPC format.

Source code in src/flowerpower_mqtt/client.py
def deserialize_pyarrow(self) -> Optional[Any]:
    """Deserialize payload as PyArrow IPC format."""
    try:
        reader = pa.ipc.open_stream(self.payload)
        return reader.read_all()
    except Exception as e:
        logger.debug(f"Failed to deserialize as PyArrow IPC: {e}")
        return None
serialize_json
serialize_json(data: Any) -> bytes

Serialize data to JSON format.

Source code in src/flowerpower_mqtt/client.py
def serialize_json(self, data: Any) -> bytes:
    """Serialize data to JSON format."""
    try:
        return msgspec.json.encode(data)
    except Exception as e:
        logger.error(f"Failed to serialize as JSON: {e}")
        raise
serialize_yaml
serialize_yaml(data: Any) -> bytes

Serialize data to YAML format.

Source code in src/flowerpower_mqtt/client.py
def serialize_yaml(self, data: Any) -> bytes:
    """Serialize data to YAML format."""
    try:
        return msgspec.yaml.encode(data)
    except Exception as e:
        logger.error(f"Failed to serialize as YAML: {e}")
        raise
serialize_msgpack
serialize_msgpack(data: Any) -> bytes

Serialize data to MessagePack format.

Source code in src/flowerpower_mqtt/client.py
def serialize_msgpack(self, data: Any) -> bytes:
    """Serialize data to MessagePack format."""
    try:
        return msgspec.msgpack.encode(data)
    except Exception as e:
        logger.error(f"Failed to serialize as MessagePack: {e}")
        raise
serialize_pickle
serialize_pickle(data: Any) -> bytes

Serialize data to Python pickle format.

Source code in src/flowerpower_mqtt/client.py
def serialize_pickle(self, data: Any) -> bytes:
    """Serialize data to Python pickle format."""
    try:
        return pickle.dumps(data)
    except Exception as e:
        logger.error(f"Failed to serialize as pickle: {e}")
        raise
serialize_protobuf
serialize_protobuf(message: Message) -> bytes

Serialize Protocol Buffers message.

Source code in src/flowerpower_mqtt/client.py
def serialize_protobuf(self, message: ProtobufMessage) -> bytes:
    """Serialize Protocol Buffers message."""
    try:
        return message.SerializeToString()
    except Exception as e:
        logger.error(f"Failed to serialize as protobuf: {e}")
        raise
serialize_pyarrow
serialize_pyarrow(data: Any) -> bytes

Serialize data to PyArrow IPC format.

Source code in src/flowerpower_mqtt/client.py
def serialize_pyarrow(self, data: Any) -> bytes:
    """Serialize data to PyArrow IPC format."""
    try:
        # Handle different input types
        if isinstance(data, pa.Table):
            table = data
        elif isinstance(data, pa.RecordBatch):
            table = pa.Table.from_batches([data])
        elif isinstance(data, dict):
            # Convert dict to PyArrow table directly
            arrays = []
            names = []
            for key, value in data.items():
                if isinstance(value, list):
                    arrays.append(pa.array(value))
                else:
                    arrays.append(pa.array([value]))
                names.append(key)
            table = pa.Table.from_arrays(arrays, names=names)
        elif hasattr(data, '__iter__') and not isinstance(data, (str, bytes)):
            # Handle iterable of dicts (list of records)
            if isinstance(data, list) and data and isinstance(data[0], dict):
                # Convert list of dicts to table
                arrays = []
                names = list(data[0].keys())
                for name in names:
                    values = [record.get(name) for record in data]
                    arrays.append(pa.array(values))
                table = pa.Table.from_arrays(arrays, names=names)
            else:
                raise ValueError("Unsupported data type for PyArrow serialization")
        else:
            raise ValueError("Unsupported data type for PyArrow serialization")

        sink = pa.BufferOutputStream()
        with pa.ipc.new_stream(sink, table.schema) as writer:
            writer.write(table)
        return sink.getvalue().to_pybytes()
    except Exception as e:
        logger.error(f"Failed to serialize as PyArrow IPC: {e}")
        raise
serialize
serialize(format_name: str, data: Any, **kwargs) -> bytes

Serialize data using the specified format.

Parameters:

Name Type Description Default
format_name str

Format name ('json', 'yaml', 'msgpack', 'pickle', 'protobuf', 'pyarrow')

required
data Any

Data to serialize

required
**kwargs

Additional arguments for specific formats

{}

Returns:

Type Description
bytes

Serialized data as bytes

Raises:

Type Description
ValueError

If format is not supported

Source code in src/flowerpower_mqtt/client.py
def serialize(self, format_name: str, data: Any, **kwargs) -> bytes:
    """
    Serialize data using the specified format.

    Args:
        format_name: Format name ('json', 'yaml', 'msgpack', 'pickle', 'protobuf', 'pyarrow')
        data: Data to serialize
        **kwargs: Additional arguments for specific formats

    Returns:
        Serialized data as bytes

    Raises:
        ValueError: If format is not supported
    """
    format_name = format_name.lower()

    if format_name == 'json':
        return self.serialize_json(data)
    elif format_name == 'yaml':
        return self.serialize_yaml(data)
    elif format_name == 'msgpack':
        return self.serialize_msgpack(data)
    elif format_name == 'pickle':
        return self.serialize_pickle(data)
    elif format_name == 'protobuf':
        if 'message' in kwargs:
            return self.serialize_protobuf(kwargs['message'])
        else:
            raise ValueError("protobuf format requires 'message' kwarg with ProtobufMessage instance")
    elif format_name == 'pyarrow':
        return self.serialize_pyarrow(data)
    else:
        raise ValueError(f"Unsupported serialization format: {format_name}")
deserialize_auto
deserialize_auto() -> Optional[Any]

Automatically detect and deserialize payload format.

Detection order (most common to least common): 1. JSON - Most common for MQTT payloads 2. MessagePack - Efficient binary format 3. YAML - Human-readable structured data 4. PyArrow IPC - Columnar data format 5. Pickle - Python-specific binary format

Returns:

Type Description
Optional[Any]

Deserialized data or None if no format matches

Source code in src/flowerpower_mqtt/client.py
def deserialize_auto(self) -> Optional[Any]:
    """
    Automatically detect and deserialize payload format.

    Detection order (most common to least common):
    1. JSON - Most common for MQTT payloads
    2. MessagePack - Efficient binary format
    3. YAML - Human-readable structured data
    4. PyArrow IPC - Columnar data format
    5. Pickle - Python-specific binary format

    Returns:
        Deserialized data or None if no format matches
    """
    # Try JSON first (most common)
    result = self.deserialize_json()
    if result is not None:
        logger.debug("Auto-detected format: JSON")
        return result

    # Try MessagePack (efficient binary)
    result = self.deserialize_msgpack()
    if result is not None:
        logger.debug("Auto-detected format: MessagePack")
        return result

    # Try YAML (human-readable)
    result = self.deserialize_yaml()
    if result is not None:
        logger.debug("Auto-detected format: YAML")
        return result

    # Try PyArrow IPC (columnar data)
    result = self.deserialize_pyarrow()
    if result is not None:
        logger.debug("Auto-detected format: PyArrow IPC")
        return result

    # Try Pickle (Python-specific, least safe)
    result = self.deserialize_pickle()
    if result is not None:
        logger.debug("Auto-detected format: Pickle")
        return result

    # No format detected
    logger.debug("Could not auto-detect payload format")
    return None
deserialize
deserialize(format_name: str, **kwargs) -> Optional[Any]

Deserialize payload using the specified format.

Parameters:

Name Type Description Default
format_name str

Format name ('json', 'yaml', 'msgpack', 'pickle', 'protobuf', 'pyarrow')

required
**kwargs

Additional arguments for specific formats

{}

Returns:

Type Description
Optional[Any]

Deserialized data or None if deserialization fails

Raises:

Type Description
ValueError

If format is not supported

Source code in src/flowerpower_mqtt/client.py
def deserialize(self, format_name: str, **kwargs) -> Optional[Any]:
    """
    Deserialize payload using the specified format.

    Args:
        format_name: Format name ('json', 'yaml', 'msgpack', 'pickle', 'protobuf', 'pyarrow')
        **kwargs: Additional arguments for specific formats

    Returns:
        Deserialized data or None if deserialization fails

    Raises:
        ValueError: If format is not supported
    """
    format_name = format_name.lower()

    if format_name == 'auto':
        return self.deserialize_auto()
    elif format_name == 'json':
        return self.deserialize_json()
    elif format_name == 'yaml':
        return self.deserialize_yaml()
    elif format_name == 'msgpack':
        return self.deserialize_msgpack()
    elif format_name == 'pickle':
        return self.deserialize_pickle()
    elif format_name == 'protobuf':
        if 'message_class' in kwargs:
            return self.deserialize_protobuf(kwargs['message_class'])
        else:
            raise ValueError("protobuf format requires 'message_class' kwarg with message type")
    elif format_name == 'pyarrow':
        return self.deserialize_pyarrow()
    else:
        raise ValueError(f"Unsupported deserialization format: {format_name}")
is_empty_payload
is_empty_payload() -> bool

Check if payload is empty.

Source code in src/flowerpower_mqtt/client.py
def is_empty_payload(self) -> bool:
    """Check if payload is empty."""
    return len(self.payload) == 0
get_payload_size
get_payload_size() -> int

Get payload size in bytes.

Source code in src/flowerpower_mqtt/client.py
def get_payload_size(self) -> int:
    """Get payload size in bytes."""
    return len(self.payload)
MQTTClient

MQTT client wrapper with QoS support and subscription management.

Source code in src/flowerpower_mqtt/client.py
class MQTTClient:
    """
    MQTT client wrapper with QoS support and subscription management.
    """

    def __init__(self, config: MQTTConfig):
        """
        Initialize MQTT client.

        Args:
            config: MQTT configuration
        """
        self.config = config
        self._client: Optional[aiomqtt.Client] = None
        self._subscriptions: Dict[str, RuntimeSubscription] = {}
        self._connected = False
        self._message_handlers: List[Callable[[MQTTMessage], None]] = []
        self._lock = asyncio.Lock()

    async def __aenter__(self):
        """Async context manager entry."""
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        await self.disconnect()

    async def connect(self) -> None:
        """Establish connection to MQTT broker with automatic reconnection."""
        async with self._lock:
            if self._connected:
                return

            last_exception = None

            for attempt in range(self.config.reconnect_retries + 1):
                try:
                    logger.info(f"Connecting to MQTT broker at {self.config.broker}:{self.config.port} (attempt {attempt + 1}/{self.config.reconnect_retries + 1})")

                    # Create aiomqtt client with configuration
                    self._client = aiomqtt.Client(
                        hostname=self.config.broker,
                        port=self.config.port,
                        keepalive=self.config.keepalive,
                        identifier=self.config.client_id,
                        clean_session=self.config.clean_session,
                        username=self.config.username,
                        password=self.config.password,
                    )

                    # Connect using context manager protocol
                    await self._client.__aenter__()
                    self._connected = True

                    logger.info("Successfully connected to MQTT broker")
                    return

                except Exception as e:
                    last_exception = e
                    logger.warning(f"Connection attempt {attempt + 1} failed: {e}")

                    # Don't wait after the last attempt
                    if attempt < self.config.reconnect_retries:
                        # Exponential backoff: delay = base_delay * (2 ^ attempt)
                        delay = self.config.reconnect_delay * (2 ** attempt)
                        logger.info(f"Retrying connection in {delay} seconds...")
                        await asyncio.sleep(delay)

            # All retry attempts failed
            logger.error(f"Failed to connect to MQTT broker after {self.config.reconnect_retries + 1} attempts")
            raise ConnectionError(f"Failed to connect to MQTT broker after {self.config.reconnect_retries + 1} attempts: {last_exception}") from last_exception

    async def disconnect(self) -> None:
        """Disconnect from MQTT broker."""
        async with self._lock:
            if not self._connected or self._client is None:
                return

            try:
                logger.info("Disconnecting from MQTT broker")
                await self._client.__aexit__(None, None, None)
                self._connected = False
                self._client = None

                logger.info("Successfully disconnected from MQTT broker")

            except Exception as e:
                logger.error(f"Error during disconnect: {e}")
                # Don't raise exception during disconnect

    async def subscribe(
        self,
        topic: str,
        pipeline: str,
        qos: int = 0,
        execution_mode: str = "sync",
        deserialization_format: str = "auto"
    ) -> None:
        """
        Subscribe to MQTT topic.

        Args:
            topic: MQTT topic pattern to subscribe to
            pipeline: FlowerPower pipeline name to execute
            qos: QoS level (0, 1, or 2)
            execution_mode: Pipeline execution mode (sync, async, mixed)
            deserialization_format: Format to use for deserializing message payloads
                                   (json, yaml, msgpack, pickle, protobuf, pyarrow, auto)
        """
        if not self._connected or self._client is None:
            raise ConnectionError("Client not connected to broker")

        if qos not in [0, 1, 2]:
            raise SubscriptionError(f"Invalid QoS level: {qos}. Must be 0, 1, or 2")

        try:
            logger.info(f"Subscribing to topic '{topic}' with QoS {qos}")

            await self._client.subscribe(topic, qos=qos)

            # Store subscription info
            self._subscriptions[topic] = RuntimeSubscription(
                topic=topic,
                pipeline=pipeline,
                qos=qos,
                execution_mode=execution_mode,
                deserialization_format=deserialization_format
            )

            logger.info(
                f"Successfully subscribed to '{topic}' -> pipeline '{pipeline}'"
            )

        except Exception as e:
            logger.error(f"Failed to subscribe to topic '{topic}': {e}")
            raise SubscriptionError(f"Failed to subscribe to topic '{topic}': {e}") from e

    async def unsubscribe(self, topic: str) -> None:
        """
        Unsubscribe from MQTT topic.

        Args:
            topic: MQTT topic pattern to unsubscribe from
        """
        if not self._connected or self._client is None:
            raise ConnectionError("Client not connected to broker")

        try:
            logger.info(f"Unsubscribing from topic '{topic}'")

            await self._client.unsubscribe(topic)

            # Remove subscription info
            if topic in self._subscriptions:
                del self._subscriptions[topic]

            logger.info(f"Successfully unsubscribed from '{topic}'")

        except Exception as e:
            logger.error(f"Failed to unsubscribe from topic '{topic}': {e}")
            raise SubscriptionError(f"Failed to unsubscribe from topic '{topic}': {e}") from e

    def add_message_handler(self, handler: Callable[[MQTTMessage], None]) -> None:
        """
        Add message handler function.

        Args:
            handler: Function to call when messages arrive
        """
        self._message_handlers.append(handler)

    def remove_message_handler(self, handler: Callable[[MQTTMessage], None]) -> None:
        """
        Remove message handler function.

        Args:
            handler: Handler function to remove
        """
        if handler in self._message_handlers:
            self._message_handlers.remove(handler)

    async def listen_for_messages(self) -> None:
        """
        Listen for incoming MQTT messages and dispatch to handlers.
        """
        if not self._connected or self._client is None:
            raise ConnectionError("Client not connected to broker")

        logger.info("Starting message listener")

        try:
            async for message in self._client.messages:
                # Update subscription statistics
                topic_str = str(message.topic)
                for topic_pattern, sub in self._subscriptions.items():
                    if message.topic.matches(topic_pattern):
                        sub.message_count += 1
                        sub.last_message_time = time.time()
                        break

                # Create wrapped message
                # Convert payload to bytes to match MQTTMessage type
                if isinstance(message.payload, bytes):
                    payload_bytes = message.payload
                elif isinstance(message.payload, str):
                    payload_bytes = message.payload.encode('utf-8')
                else:
                    payload_bytes = str(message.payload).encode('utf-8')

                mqtt_message = MQTTMessage(
                    topic=topic_str,
                    payload=payload_bytes,
                    qos=message.qos,
                    retain=message.retain,
                    timestamp=time.time()
                )

                # Dispatch to all handlers
                for handler in self._message_handlers:
                    try:
                        handler(mqtt_message)
                    except Exception as e:
                        logger.error(f"Error in message handler: {e}")

        except Exception as e:
            logger.error(f"Error in message listener: {e}")
            raise

    def get_subscription(self, topic: str) -> Optional[RuntimeSubscription]:
        """
        Get subscription info for a topic.

        Args:
            topic: Topic pattern

        Returns:
            RuntimeSubscription or None if not found
        """
        return self._subscriptions.get(topic)

    def get_all_subscriptions(self) -> Dict[str, RuntimeSubscription]:
        """Get all current subscriptions."""
        return self._subscriptions.copy()

    def find_subscription_for_topic(self, topic: str) -> Optional[RuntimeSubscription]:
        """
        Find subscription that matches a specific topic.

        Args:
            topic: Specific topic to match against patterns

        Returns:
            First matching RuntimeSubscription or None
        """
        for pattern, subscription in self._subscriptions.items():
            try:
                if aiomqtt.Topic(topic).matches(pattern):
                    return subscription
            except:
                # Fallback to simple string comparison
                if topic == pattern:
                    return subscription

        return None

    @property
    def is_connected(self) -> bool:
        """Check if client is connected."""
        return self._connected

    @property
    def subscription_count(self) -> int:
        """Get number of active subscriptions."""
        return len(self._subscriptions)
Attributes
is_connected property
is_connected: bool

Check if client is connected.

subscription_count property
subscription_count: int

Get number of active subscriptions.

Functions
__init__
__init__(config: MQTTConfig)

Initialize MQTT client.

Parameters:

Name Type Description Default
config MQTTConfig

MQTT configuration

required
Source code in src/flowerpower_mqtt/client.py
def __init__(self, config: MQTTConfig):
    """
    Initialize MQTT client.

    Args:
        config: MQTT configuration
    """
    self.config = config
    self._client: Optional[aiomqtt.Client] = None
    self._subscriptions: Dict[str, RuntimeSubscription] = {}
    self._connected = False
    self._message_handlers: List[Callable[[MQTTMessage], None]] = []
    self._lock = asyncio.Lock()
__aenter__ async
__aenter__()

Async context manager entry.

Source code in src/flowerpower_mqtt/client.py
async def __aenter__(self):
    """Async context manager entry."""
    await self.connect()
    return self
__aexit__ async
__aexit__(exc_type, exc_val, exc_tb)

Async context manager exit.

Source code in src/flowerpower_mqtt/client.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit."""
    await self.disconnect()
connect async
connect() -> None

Establish connection to MQTT broker with automatic reconnection.

Source code in src/flowerpower_mqtt/client.py
async def connect(self) -> None:
    """Establish connection to MQTT broker with automatic reconnection."""
    async with self._lock:
        if self._connected:
            return

        last_exception = None

        for attempt in range(self.config.reconnect_retries + 1):
            try:
                logger.info(f"Connecting to MQTT broker at {self.config.broker}:{self.config.port} (attempt {attempt + 1}/{self.config.reconnect_retries + 1})")

                # Create aiomqtt client with configuration
                self._client = aiomqtt.Client(
                    hostname=self.config.broker,
                    port=self.config.port,
                    keepalive=self.config.keepalive,
                    identifier=self.config.client_id,
                    clean_session=self.config.clean_session,
                    username=self.config.username,
                    password=self.config.password,
                )

                # Connect using context manager protocol
                await self._client.__aenter__()
                self._connected = True

                logger.info("Successfully connected to MQTT broker")
                return

            except Exception as e:
                last_exception = e
                logger.warning(f"Connection attempt {attempt + 1} failed: {e}")

                # Don't wait after the last attempt
                if attempt < self.config.reconnect_retries:
                    # Exponential backoff: delay = base_delay * (2 ^ attempt)
                    delay = self.config.reconnect_delay * (2 ** attempt)
                    logger.info(f"Retrying connection in {delay} seconds...")
                    await asyncio.sleep(delay)

        # All retry attempts failed
        logger.error(f"Failed to connect to MQTT broker after {self.config.reconnect_retries + 1} attempts")
        raise ConnectionError(f"Failed to connect to MQTT broker after {self.config.reconnect_retries + 1} attempts: {last_exception}") from last_exception
disconnect async
disconnect() -> None

Disconnect from MQTT broker.

Source code in src/flowerpower_mqtt/client.py
async def disconnect(self) -> None:
    """Disconnect from MQTT broker."""
    async with self._lock:
        if not self._connected or self._client is None:
            return

        try:
            logger.info("Disconnecting from MQTT broker")
            await self._client.__aexit__(None, None, None)
            self._connected = False
            self._client = None

            logger.info("Successfully disconnected from MQTT broker")

        except Exception as e:
            logger.error(f"Error during disconnect: {e}")
subscribe async
subscribe(
    topic: str,
    pipeline: str,
    qos: int = 0,
    execution_mode: str = "sync",
    deserialization_format: str = "auto",
) -> None

Subscribe to MQTT topic.

Parameters:

Name Type Description Default
topic str

MQTT topic pattern to subscribe to

required
pipeline str

FlowerPower pipeline name to execute

required
qos int

QoS level (0, 1, or 2)

0
execution_mode str

Pipeline execution mode (sync, async, mixed)

'sync'
deserialization_format str

Format to use for deserializing message payloads (json, yaml, msgpack, pickle, protobuf, pyarrow, auto)

'auto'
Source code in src/flowerpower_mqtt/client.py
async def subscribe(
    self,
    topic: str,
    pipeline: str,
    qos: int = 0,
    execution_mode: str = "sync",
    deserialization_format: str = "auto"
) -> None:
    """
    Subscribe to MQTT topic.

    Args:
        topic: MQTT topic pattern to subscribe to
        pipeline: FlowerPower pipeline name to execute
        qos: QoS level (0, 1, or 2)
        execution_mode: Pipeline execution mode (sync, async, mixed)
        deserialization_format: Format to use for deserializing message payloads
                               (json, yaml, msgpack, pickle, protobuf, pyarrow, auto)
    """
    if not self._connected or self._client is None:
        raise ConnectionError("Client not connected to broker")

    if qos not in [0, 1, 2]:
        raise SubscriptionError(f"Invalid QoS level: {qos}. Must be 0, 1, or 2")

    try:
        logger.info(f"Subscribing to topic '{topic}' with QoS {qos}")

        await self._client.subscribe(topic, qos=qos)

        # Store subscription info
        self._subscriptions[topic] = RuntimeSubscription(
            topic=topic,
            pipeline=pipeline,
            qos=qos,
            execution_mode=execution_mode,
            deserialization_format=deserialization_format
        )

        logger.info(
            f"Successfully subscribed to '{topic}' -> pipeline '{pipeline}'"
        )

    except Exception as e:
        logger.error(f"Failed to subscribe to topic '{topic}': {e}")
        raise SubscriptionError(f"Failed to subscribe to topic '{topic}': {e}") from e
unsubscribe async
unsubscribe(topic: str) -> None

Unsubscribe from MQTT topic.

Parameters:

Name Type Description Default
topic str

MQTT topic pattern to unsubscribe from

required
Source code in src/flowerpower_mqtt/client.py
async def unsubscribe(self, topic: str) -> None:
    """
    Unsubscribe from MQTT topic.

    Args:
        topic: MQTT topic pattern to unsubscribe from
    """
    if not self._connected or self._client is None:
        raise ConnectionError("Client not connected to broker")

    try:
        logger.info(f"Unsubscribing from topic '{topic}'")

        await self._client.unsubscribe(topic)

        # Remove subscription info
        if topic in self._subscriptions:
            del self._subscriptions[topic]

        logger.info(f"Successfully unsubscribed from '{topic}'")

    except Exception as e:
        logger.error(f"Failed to unsubscribe from topic '{topic}': {e}")
        raise SubscriptionError(f"Failed to unsubscribe from topic '{topic}': {e}") from e
add_message_handler
add_message_handler(handler: Callable[[MQTTMessage], None]) -> None

Add message handler function.

Parameters:

Name Type Description Default
handler Callable[[MQTTMessage], None]

Function to call when messages arrive

required
Source code in src/flowerpower_mqtt/client.py
def add_message_handler(self, handler: Callable[[MQTTMessage], None]) -> None:
    """
    Add message handler function.

    Args:
        handler: Function to call when messages arrive
    """
    self._message_handlers.append(handler)
remove_message_handler
remove_message_handler(handler: Callable[[MQTTMessage], None]) -> None

Remove message handler function.

Parameters:

Name Type Description Default
handler Callable[[MQTTMessage], None]

Handler function to remove

required
Source code in src/flowerpower_mqtt/client.py
def remove_message_handler(self, handler: Callable[[MQTTMessage], None]) -> None:
    """
    Remove message handler function.

    Args:
        handler: Handler function to remove
    """
    if handler in self._message_handlers:
        self._message_handlers.remove(handler)
listen_for_messages async
listen_for_messages() -> None

Listen for incoming MQTT messages and dispatch to handlers.

Source code in src/flowerpower_mqtt/client.py
async def listen_for_messages(self) -> None:
    """
    Listen for incoming MQTT messages and dispatch to handlers.
    """
    if not self._connected or self._client is None:
        raise ConnectionError("Client not connected to broker")

    logger.info("Starting message listener")

    try:
        async for message in self._client.messages:
            # Update subscription statistics
            topic_str = str(message.topic)
            for topic_pattern, sub in self._subscriptions.items():
                if message.topic.matches(topic_pattern):
                    sub.message_count += 1
                    sub.last_message_time = time.time()
                    break

            # Create wrapped message
            # Convert payload to bytes to match MQTTMessage type
            if isinstance(message.payload, bytes):
                payload_bytes = message.payload
            elif isinstance(message.payload, str):
                payload_bytes = message.payload.encode('utf-8')
            else:
                payload_bytes = str(message.payload).encode('utf-8')

            mqtt_message = MQTTMessage(
                topic=topic_str,
                payload=payload_bytes,
                qos=message.qos,
                retain=message.retain,
                timestamp=time.time()
            )

            # Dispatch to all handlers
            for handler in self._message_handlers:
                try:
                    handler(mqtt_message)
                except Exception as e:
                    logger.error(f"Error in message handler: {e}")

    except Exception as e:
        logger.error(f"Error in message listener: {e}")
        raise
get_subscription
get_subscription(topic: str) -> Optional[RuntimeSubscription]

Get subscription info for a topic.

Parameters:

Name Type Description Default
topic str

Topic pattern

required

Returns:

Type Description
Optional[RuntimeSubscription]

RuntimeSubscription or None if not found

Source code in src/flowerpower_mqtt/client.py
def get_subscription(self, topic: str) -> Optional[RuntimeSubscription]:
    """
    Get subscription info for a topic.

    Args:
        topic: Topic pattern

    Returns:
        RuntimeSubscription or None if not found
    """
    return self._subscriptions.get(topic)
get_all_subscriptions
get_all_subscriptions() -> Dict[str, RuntimeSubscription]

Get all current subscriptions.

Source code in src/flowerpower_mqtt/client.py
def get_all_subscriptions(self) -> Dict[str, RuntimeSubscription]:
    """Get all current subscriptions."""
    return self._subscriptions.copy()
find_subscription_for_topic
find_subscription_for_topic(topic: str) -> Optional[RuntimeSubscription]

Find subscription that matches a specific topic.

Parameters:

Name Type Description Default
topic str

Specific topic to match against patterns

required

Returns:

Type Description
Optional[RuntimeSubscription]

First matching RuntimeSubscription or None

Source code in src/flowerpower_mqtt/client.py
def find_subscription_for_topic(self, topic: str) -> Optional[RuntimeSubscription]:
    """
    Find subscription that matches a specific topic.

    Args:
        topic: Specific topic to match against patterns

    Returns:
        First matching RuntimeSubscription or None
    """
    for pattern, subscription in self._subscriptions.items():
        try:
            if aiomqtt.Topic(topic).matches(pattern):
                return subscription
        except:
            # Fallback to simple string comparison
            if topic == pattern:
                return subscription

    return None

config

Configuration management for FlowerPower MQTT plugin.

Classes
MQTTConfig

Bases: Struct

MQTT broker configuration.

Source code in src/flowerpower_mqtt/config.py
class MQTTConfig(Struct, frozen=True):
    """MQTT broker configuration."""
    broker: str = "localhost"
    port: int = 1883
    keepalive: int = 60
    client_id: Optional[str] = None
    clean_session: bool = True
    username: Optional[str] = None
    password: Optional[str] = None
    reconnect_retries: int = 5
    reconnect_delay: int = 5
JobQueueConfig

Bases: Struct

Job queue configuration.

Source code in src/flowerpower_mqtt/config.py
class JobQueueConfig(Struct, frozen=True):
    """Job queue configuration."""
    enabled: bool = False
    type: str = "rq"
    redis_url: str = "redis://localhost:6379"
    queue_name: str = "mqtt_pipelines"
    worker_count: int = 4
    max_retries: int = 3
SubscriptionConfig

Bases: Struct

Individual subscription configuration.

Source code in src/flowerpower_mqtt/config.py
class SubscriptionConfig(Struct):
    """Individual subscription configuration."""
    topic: str
    pipeline: str
    qos: int = 0
    execution_mode: str = "sync"
    deserialization_format: str = "auto"

    def __post_init__(self) -> None:
        """Validate fields after initialization."""
        _validate_qos(self.qos)
        _validate_execution_mode(self.execution_mode)
        _validate_deserialization_format(self.deserialization_format)

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for compatibility."""
        return msgspec.to_builtins(self)
Functions
__post_init__
__post_init__() -> None

Validate fields after initialization.

Source code in src/flowerpower_mqtt/config.py
def __post_init__(self) -> None:
    """Validate fields after initialization."""
    _validate_qos(self.qos)
    _validate_execution_mode(self.execution_mode)
    _validate_deserialization_format(self.deserialization_format)
to_dict
to_dict() -> Dict[str, Any]

Convert to dictionary for compatibility.

Source code in src/flowerpower_mqtt/config.py
def to_dict(self) -> Dict[str, Any]:
    """Convert to dictionary for compatibility."""
    return msgspec.to_builtins(self)
FlowerPowerMQTTConfig

Bases: Struct

Main configuration for FlowerPower MQTT plugin.

Source code in src/flowerpower_mqtt/config.py
class FlowerPowerMQTTConfig(Struct):
    """Main configuration for FlowerPower MQTT plugin."""
    mqtt: MQTTConfig = msgspec.field(default_factory=lambda: MQTTConfig())
    job_queue: JobQueueConfig = msgspec.field(default_factory=lambda: JobQueueConfig())
    subscriptions: List[SubscriptionConfig] = msgspec.field(default_factory=list)
    base_dir: str = "."
    log_level: str = "INFO"

    @classmethod
    def from_yaml(cls, file_path: Path) -> "FlowerPowerMQTTConfig":
        """Load configuration from YAML file."""
        if not file_path.exists():
            raise FileNotFoundError(f"Configuration file not found: {file_path}")

        with open(file_path, 'r') as f:
            return msgspec.yaml.decode(f.read(), type=cls)



    def to_yaml(self, file_path: Path) -> None:
        """Save configuration to YAML file."""
        # Convert struct to dictionary
        data = msgspec.to_builtins(self)

        with open(file_path, 'wb') as f:
            f.write(msgspec.yaml.encode(data))

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for compatibility."""
        return msgspec.to_builtins(self)
Functions
from_yaml classmethod
from_yaml(file_path: Path) -> FlowerPowerMQTTConfig

Load configuration from YAML file.

Source code in src/flowerpower_mqtt/config.py
@classmethod
def from_yaml(cls, file_path: Path) -> "FlowerPowerMQTTConfig":
    """Load configuration from YAML file."""
    if not file_path.exists():
        raise FileNotFoundError(f"Configuration file not found: {file_path}")

    with open(file_path, 'r') as f:
        return msgspec.yaml.decode(f.read(), type=cls)
to_yaml
to_yaml(file_path: Path) -> None

Save configuration to YAML file.

Source code in src/flowerpower_mqtt/config.py
def to_yaml(self, file_path: Path) -> None:
    """Save configuration to YAML file."""
    # Convert struct to dictionary
    data = msgspec.to_builtins(self)

    with open(file_path, 'wb') as f:
        f.write(msgspec.yaml.encode(data))
to_dict
to_dict() -> Dict[str, Any]

Convert to dictionary for compatibility.

Source code in src/flowerpower_mqtt/config.py
def to_dict(self) -> Dict[str, Any]:
    """Convert to dictionary for compatibility."""
    return msgspec.to_builtins(self)
RuntimeSubscription dataclass

Runtime subscription data with additional metadata.

Source code in src/flowerpower_mqtt/config.py
@dataclass
class RuntimeSubscription:
    """Runtime subscription data with additional metadata."""
    topic: str
    pipeline: str
    qos: int = 0
    execution_mode: str = "sync"
    deserialization_format: str = "auto"
    message_count: int = 0
    last_message_time: Optional[float] = None
    error_count: int = 0

exceptions

Custom exceptions for FlowerPower MQTT plugin.

Classes
FlowerPowerMQTTError

Bases: Exception

Base exception for FlowerPower MQTT plugin.

Source code in src/flowerpower_mqtt/exceptions.py
4
5
6
class FlowerPowerMQTTError(Exception):
    """Base exception for FlowerPower MQTT plugin."""
    pass
ConnectionError

Bases: FlowerPowerMQTTError

Raised when MQTT connection fails.

Source code in src/flowerpower_mqtt/exceptions.py
class ConnectionError(FlowerPowerMQTTError):
    """Raised when MQTT connection fails."""
    pass
SubscriptionError

Bases: FlowerPowerMQTTError

Raised when MQTT subscription fails.

Source code in src/flowerpower_mqtt/exceptions.py
class SubscriptionError(FlowerPowerMQTTError):
    """Raised when MQTT subscription fails."""
    pass
PipelineExecutionError

Bases: FlowerPowerMQTTError

Raised when pipeline execution fails.

Source code in src/flowerpower_mqtt/exceptions.py
class PipelineExecutionError(FlowerPowerMQTTError):
    """Raised when pipeline execution fails."""
    pass
JobQueueError

Bases: FlowerPowerMQTTError

Raised when job queue operations fail.

Source code in src/flowerpower_mqtt/exceptions.py
class JobQueueError(FlowerPowerMQTTError):
    """Raised when job queue operations fail."""
    pass
ConfigurationError

Bases: FlowerPowerMQTTError

Raised when configuration is invalid.

Source code in src/flowerpower_mqtt/exceptions.py
class ConfigurationError(FlowerPowerMQTTError):
    """Raised when configuration is invalid."""
    pass

job_handler

RQ job handler functions for pipeline execution.

Classes
Functions
execute_pipeline_job
execute_pipeline_job(
    pipeline_name: str,
    message_data: Dict[str, Any],
    base_dir: str,
    topic: str,
    qos: int,
    execution_metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]

RQ job function for executing FlowerPower pipelines.

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline to execute

required
message_data Dict[str, Any]

MQTT message data to pass as pipeline input

required
base_dir str

FlowerPower project base directory

required
topic str

MQTT topic the message came from

required
qos int

QoS level of the message

required
execution_metadata Optional[Dict[str, Any]]

Additional metadata for execution

None

Returns:

Type Description
Dict[str, Any]

Dict containing execution status and results

Source code in src/flowerpower_mqtt/job_handler.py
def execute_pipeline_job(
    pipeline_name: str, 
    message_data: Dict[str, Any], 
    base_dir: str,
    topic: str,
    qos: int,
    execution_metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """
    RQ job function for executing FlowerPower pipelines.

    Args:
        pipeline_name: Name of the pipeline to execute
        message_data: MQTT message data to pass as pipeline input
        base_dir: FlowerPower project base directory
        topic: MQTT topic the message came from
        qos: QoS level of the message
        execution_metadata: Additional metadata for execution

    Returns:
        Dict containing execution status and results
    """
    start_time = datetime.now()

    try:
        logger.info(
            f"Starting pipeline execution: {pipeline_name} "
            f"for topic {topic} (QoS {qos})"
        )

        # Initialize pipeline manager
        pm = PipelineManager(base_dir=base_dir)

        # Prepare pipeline inputs
        pipeline_inputs = {
            "mqtt_message": message_data,
            "mqtt_topic": topic,
            "mqtt_qos": qos,
            "execution_timestamp": start_time.isoformat(),
            **(execution_metadata or {})
        }

        # Execute the pipeline
        result = pm.run(
            name=pipeline_name,
            inputs=pipeline_inputs
        )

        execution_time = (datetime.now() - start_time).total_seconds()

        success_result = {
            "status": "success",
            "pipeline_name": pipeline_name,
            "topic": topic,
            "qos": qos,
            "execution_time": execution_time,
            "start_time": start_time.isoformat(),
            "result": result
        }

        logger.info(
            f"Pipeline {pipeline_name} completed successfully "
            f"in {execution_time:.2f}s"
        )

        return success_result

    except Exception as e:
        execution_time = (datetime.now() - start_time).total_seconds()

        error_result = {
            "status": "error",
            "pipeline_name": pipeline_name,
            "topic": topic,
            "qos": qos,
            "execution_time": execution_time,
            "start_time": start_time.isoformat(),
            "error": str(e),
            "error_type": type(e).__name__
        }

        logger.error(
            f"Pipeline {pipeline_name} failed after {execution_time:.2f}s: {e}"
        )

        # Re-raise for RQ to handle retries
        raise PipelineExecutionError(
            f"Pipeline '{pipeline_name}' execution failed: {e}"
        ) from e
get_job_status
get_job_status(
    job_id: str, job_queue_manager: JobQueueManager
) -> Dict[str, Any]

Get the status of a job.

Parameters:

Name Type Description Default
job_id str

RQ job ID

required
job_queue_manager JobQueueManager

JobQueueManager instance

required

Returns:

Type Description
Dict[str, Any]

Dict containing job status information

Source code in src/flowerpower_mqtt/job_handler.py
def get_job_status(job_id: str, job_queue_manager: JobQueueManager) -> Dict[str, Any]:
    """
    Get the status of a job.

    Args:
        job_id: RQ job ID
        job_queue_manager: JobQueueManager instance

    Returns:
        Dict containing job status information
    """
    try:
        job = job_queue_manager.get_job(job_id)
        if job is None:
            return {"status": "not_found", "job_id": job_id}

        return {
            "status": job.get_status(),
            "job_id": job_id,
            "created_at": job.created_at.isoformat() if job.created_at else None,
            "started_at": job.started_at.isoformat() if job.started_at else None,
            "ended_at": job.ended_at.isoformat() if job.ended_at else None,
            "result": job.result if job.is_finished else None,
            "exc_info": job.exc_info if job.is_failed else None
        }
    except Exception as e:
        logger.error(f"Error getting job status for {job_id}: {e}")
        return {
            "status": "error",
            "job_id": job_id,
            "error": str(e)
        }
cleanup_completed_jobs
cleanup_completed_jobs(
    job_queue_manager: JobQueueManager, max_age_hours: int = 24
) -> Dict[str, int]

Clean up completed jobs older than specified age.

Parameters:

Name Type Description Default
job_queue_manager JobQueueManager

JobQueueManager instance

required
max_age_hours int

Maximum age of jobs to keep

24

Returns:

Type Description
Dict[str, int]

Dict with cleanup statistics

Source code in src/flowerpower_mqtt/job_handler.py
def cleanup_completed_jobs(
    job_queue_manager: JobQueueManager,
    max_age_hours: int = 24
) -> Dict[str, int]:
    """
    Clean up completed jobs older than specified age.

    Args:
        job_queue_manager: JobQueueManager instance
        max_age_hours: Maximum age of jobs to keep

    Returns:
        Dict with cleanup statistics
    """
    try:
        # This would depend on the specific JobQueueManager implementation
        # For now, return a placeholder
        logger.info(f"Job cleanup requested for jobs older than {max_age_hours} hours")
        return {"cleaned_count": 0, "error_count": 0}
    except Exception as e:
        logger.error(f"Error during job cleanup: {e}")
        return {"cleaned_count": 0, "error_count": 1}

listener

MQTT listener with FlowerPower pipeline execution and job queue integration.

Classes
MQTTListener

MQTT listener that processes messages and executes FlowerPower pipelines. Supports both synchronous and asynchronous execution via RQ job queue.

Source code in src/flowerpower_mqtt/listener.py
class MQTTListener:
    """
    MQTT listener that processes messages and executes FlowerPower pipelines.
    Supports both synchronous and asynchronous execution via RQ job queue.
    """

    def __init__(
        self, 
        mqtt_client: MQTTClient,
        config: FlowerPowerMQTTConfig
    ):
        """
        Initialize MQTT listener.

        Args:
            mqtt_client: Connected MQTT client instance
            config: FlowerPower MQTT configuration
        """
        self.mqtt_client = mqtt_client
        self.config = config

        # FlowerPower managers
        self.pipeline_manager = PipelineManager(base_dir=config.base_dir)
        self.job_queue_manager: Optional[JobQueueManager] = None

        # State management
        self._running = False
        self._listener_task: Optional[asyncio.Task] = None
        self._shutdown_event = asyncio.Event()

        # Statistics
        self._message_count = 0
        self._pipeline_count = 0
        self._error_count = 0
        self._start_time: Optional[datetime] = None

        # Initialize job queue if enabled
        if config.job_queue.enabled:
            self._init_job_queue()

        # Register message handler
        self.mqtt_client.add_message_handler(self._handle_message)

    def _init_job_queue(self) -> None:
        """Initialize job queue manager."""
        try:
            jq_config = self.config.job_queue

            logger.info(f"Initializing job queue: {jq_config.type}")

            self.job_queue_manager = JobQueueManager(
                type=jq_config.type,
                name=jq_config.queue_name,
                base_dir=self.config.base_dir
            )

            logger.info("Job queue manager initialized successfully")

        except Exception as e:
            logger.error(f"Failed to initialize job queue: {e}")
            raise JobQueueError(f"Failed to initialize job queue: {e}") from e

    def _handle_message(self, message: MQTTMessage) -> None:
        """
        Handle incoming MQTT message.

        Args:
            message: MQTT message to process
        """
        self._message_count += 1

        try:
            # Find matching subscription
            subscription = self.mqtt_client.find_subscription_for_topic(message.topic)
            if not subscription:
                logger.warning(f"No subscription found for topic: {message.topic}")
                return

            logger.info(
                f"Processing message from topic '{message.topic}' "
                f"-> pipeline '{subscription.pipeline}' (QoS {message.qos})"
            )

            # Parse message payload
            message_data = self._parse_message_payload(message)

            # Determine execution mode
            execution_mode = self._determine_execution_mode(subscription, message)

            # Execute pipeline based on mode
            if execution_mode == "sync":
                self._execute_pipeline_sync(subscription.pipeline, message_data, message)
            elif execution_mode == "async":
                self._execute_pipeline_async(subscription.pipeline, message_data, message)
            else:
                logger.error(f"Unknown execution mode: {execution_mode}")
                self._error_count += 1

        except Exception as e:
            logger.error(f"Error handling message from topic '{message.topic}': {e}")
            self._error_count += 1

    def _parse_message_payload(self, message: MQTTMessage) -> Dict[str, Any]:
        """
        Parse MQTT message payload.

        Args:
            message: MQTT message

        Returns:
            Parsed payload data
        """
        # Try to parse as JSON
        json_data = message.deserialize_json()
        if json_data is not None:
            return json_data

        # Fallback to string payload
        payload_str = message.payload_str
        if payload_str is not None:
            return {
                "raw_payload": payload_str,
                "payload_bytes": message.payload
            }

        # Binary payload
        return {
            "payload_bytes": message.payload,
            "is_binary": True
        }

    def _determine_execution_mode(self, subscription, message: MQTTMessage) -> str:
        """
        Determine execution mode based on configuration and message.

        Args:
            subscription: Runtime subscription info
            message: MQTT message

        Returns:
            Execution mode ("sync" or "async")
        """
        if not self.config.job_queue.enabled:
            return "sync"

        mode = subscription.execution_mode

        if mode == "mixed":
            # QoS 2 messages are executed synchronously, others async
            return "sync" if message.qos >= 2 else "async"

        return mode

    def _execute_pipeline_sync(
        self, 
        pipeline_name: str, 
        message_data: Dict[str, Any],
        message: MQTTMessage
    ) -> None:
        """
        Execute pipeline synchronously.

        Args:
            pipeline_name: Name of pipeline to execute
            message_data: Parsed message data
            message: Original MQTT message
        """
        try:
            start_time = datetime.now()

            # Prepare pipeline inputs
            pipeline_inputs = {
                "mqtt_message": message_data,
                "mqtt_topic": message.topic,
                "mqtt_qos": message.qos,
                "execution_timestamp": start_time.isoformat(),
                "execution_mode": "sync"
            }

            # Execute pipeline
            result = self.pipeline_manager.run(
                name=pipeline_name,
                inputs=pipeline_inputs
            )

            execution_time = (datetime.now() - start_time).total_seconds()
            self._pipeline_count += 1

            logger.info(
                f"Pipeline '{pipeline_name}' completed synchronously "
                f"in {execution_time:.2f}s"
            )

        except Exception as e:
            logger.error(f"Synchronous pipeline execution failed: {e}")
            self._error_count += 1
            raise PipelineExecutionError(
                f"Synchronous execution of '{pipeline_name}' failed: {e}"
            ) from e

    def _execute_pipeline_async(
        self, 
        pipeline_name: str, 
        message_data: Dict[str, Any],
        message: MQTTMessage
    ) -> Optional[str]:
        """
        Execute pipeline asynchronously via job queue.

        Args:
            pipeline_name: Name of pipeline to execute
            message_data: Parsed message data
            message: Original MQTT message

        Returns:
            Job ID if successful, None if failed
        """
        if not self.job_queue_manager:
            logger.error("Job queue not available for async execution")
            self._error_count += 1
            return None

        try:
            # Enqueue pipeline execution job
            job = self.job_queue_manager.enqueue(
                execute_pipeline_job,
                pipeline_name,
                message_data,
                self.config.base_dir,
                message.topic,
                message.qos,
                {"execution_mode": "async"}
            )

            job_id = job.id if hasattr(job, 'id') else str(job)
            self._pipeline_count += 1

            logger.info(
                f"Pipeline '{pipeline_name}' queued for async execution "
                f"(job ID: {job_id})"
            )

            return job_id

        except Exception as e:
            logger.error(f"Async pipeline execution failed: {e}")
            self._error_count += 1
            return None

    async def start_listener(self, background: bool = False) -> None:
        """
        Start MQTT message listener.

        Args:
            background: If True, run listener in background task
        """
        if self._running:
            logger.warning("Listener already running")
            return

        self._running = True
        self._start_time = datetime.now()

        logger.info(f"Starting MQTT listener (background={background})")

        if background:
            self._listener_task = asyncio.create_task(self._listen_loop())
        else:
            await self._listen_loop()

    async def _listen_loop(self) -> None:
        """Main listening loop."""
        try:
            # Set up signal handlers for graceful shutdown
            if not asyncio.current_task().cancelled():
                loop = asyncio.get_event_loop()
                for sig in [signal.SIGINT, signal.SIGTERM]:
                    try:
                        loop.add_signal_handler(sig, self._signal_handler)
                    except NotImplementedError:
                        # Windows doesn't support signal handlers
                        pass

            # Start listening for messages
            await self.mqtt_client.listen_for_messages()

        except asyncio.CancelledError:
            logger.info("Listener loop cancelled")
        except Exception as e:
            logger.error(f"Error in listener loop: {e}")
            raise
        finally:
            self._running = False

    def _signal_handler(self) -> None:
        """Handle shutdown signals."""
        logger.info("Received shutdown signal")
        self._shutdown_event.set()
        if self._listener_task:
            self._listener_task.cancel()

    async def stop_listener(self, timeout: float = 10.0) -> None:
        """
        Stop MQTT message listener.

        Args:
            timeout: Maximum time to wait for graceful shutdown
        """
        if not self._running:
            logger.warning("Listener not running")
            return

        logger.info("Stopping MQTT listener")

        # Signal shutdown
        self._shutdown_event.set()

        # Cancel listener task if running in background
        if self._listener_task:
            self._listener_task.cancel()

            try:
                await asyncio.wait_for(self._listener_task, timeout=timeout)
            except (asyncio.TimeoutError, asyncio.CancelledError):
                logger.warning("Listener task did not stop gracefully")

        self._running = False
        logger.info("MQTT listener stopped")

    def get_statistics(self) -> Dict[str, Any]:
        """
        Get listener statistics.

        Returns:
            Dictionary with current statistics
        """
        runtime = None
        if self._start_time:
            runtime = (datetime.now() - self._start_time).total_seconds()

        stats = {
            "running": self._running,
            "start_time": self._start_time.isoformat() if self._start_time else None,
            "runtime_seconds": runtime,
            "message_count": self._message_count,
            "pipeline_count": self._pipeline_count,
            "error_count": self._error_count,
            "subscriptions": len(self.mqtt_client.get_all_subscriptions()),
            "job_queue_enabled": self.config.job_queue.enabled
        }

        # Add job queue stats if available
        if self.job_queue_manager:
            try:
                # This would depend on the JobQueueManager implementation
                stats["job_queue_stats"] = {
                    "queue_name": self.config.job_queue.queue_name,
                    "type": self.config.job_queue.type
                }
            except Exception as e:
                logger.debug(f"Could not get job queue stats: {e}")

        return stats

    @property
    def is_running(self) -> bool:
        """Check if listener is running."""
        return self._running
Attributes
is_running property
is_running: bool

Check if listener is running.

Functions
__init__
__init__(mqtt_client: MQTTClient, config: FlowerPowerMQTTConfig)

Initialize MQTT listener.

Parameters:

Name Type Description Default
mqtt_client MQTTClient

Connected MQTT client instance

required
config FlowerPowerMQTTConfig

FlowerPower MQTT configuration

required
Source code in src/flowerpower_mqtt/listener.py
def __init__(
    self, 
    mqtt_client: MQTTClient,
    config: FlowerPowerMQTTConfig
):
    """
    Initialize MQTT listener.

    Args:
        mqtt_client: Connected MQTT client instance
        config: FlowerPower MQTT configuration
    """
    self.mqtt_client = mqtt_client
    self.config = config

    # FlowerPower managers
    self.pipeline_manager = PipelineManager(base_dir=config.base_dir)
    self.job_queue_manager: Optional[JobQueueManager] = None

    # State management
    self._running = False
    self._listener_task: Optional[asyncio.Task] = None
    self._shutdown_event = asyncio.Event()

    # Statistics
    self._message_count = 0
    self._pipeline_count = 0
    self._error_count = 0
    self._start_time: Optional[datetime] = None

    # Initialize job queue if enabled
    if config.job_queue.enabled:
        self._init_job_queue()

    # Register message handler
    self.mqtt_client.add_message_handler(self._handle_message)
start_listener async
start_listener(background: bool = False) -> None

Start MQTT message listener.

Parameters:

Name Type Description Default
background bool

If True, run listener in background task

False
Source code in src/flowerpower_mqtt/listener.py
async def start_listener(self, background: bool = False) -> None:
    """
    Start MQTT message listener.

    Args:
        background: If True, run listener in background task
    """
    if self._running:
        logger.warning("Listener already running")
        return

    self._running = True
    self._start_time = datetime.now()

    logger.info(f"Starting MQTT listener (background={background})")

    if background:
        self._listener_task = asyncio.create_task(self._listen_loop())
    else:
        await self._listen_loop()
stop_listener async
stop_listener(timeout: float = 10.0) -> None

Stop MQTT message listener.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for graceful shutdown

10.0
Source code in src/flowerpower_mqtt/listener.py
async def stop_listener(self, timeout: float = 10.0) -> None:
    """
    Stop MQTT message listener.

    Args:
        timeout: Maximum time to wait for graceful shutdown
    """
    if not self._running:
        logger.warning("Listener not running")
        return

    logger.info("Stopping MQTT listener")

    # Signal shutdown
    self._shutdown_event.set()

    # Cancel listener task if running in background
    if self._listener_task:
        self._listener_task.cancel()

        try:
            await asyncio.wait_for(self._listener_task, timeout=timeout)
        except (asyncio.TimeoutError, asyncio.CancelledError):
            logger.warning("Listener task did not stop gracefully")

    self._running = False
    logger.info("MQTT listener stopped")
get_statistics
get_statistics() -> Dict[str, Any]

Get listener statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary with current statistics

Source code in src/flowerpower_mqtt/listener.py
def get_statistics(self) -> Dict[str, Any]:
    """
    Get listener statistics.

    Returns:
        Dictionary with current statistics
    """
    runtime = None
    if self._start_time:
        runtime = (datetime.now() - self._start_time).total_seconds()

    stats = {
        "running": self._running,
        "start_time": self._start_time.isoformat() if self._start_time else None,
        "runtime_seconds": runtime,
        "message_count": self._message_count,
        "pipeline_count": self._pipeline_count,
        "error_count": self._error_count,
        "subscriptions": len(self.mqtt_client.get_all_subscriptions()),
        "job_queue_enabled": self.config.job_queue.enabled
    }

    # Add job queue stats if available
    if self.job_queue_manager:
        try:
            # This would depend on the JobQueueManager implementation
            stats["job_queue_stats"] = {
                "queue_name": self.config.job_queue.queue_name,
                "type": self.config.job_queue.type
            }
        except Exception as e:
            logger.debug(f"Could not get job queue stats: {e}")

    return stats
Functions