Skip to content

Advanced Usage

Welcome to the advanced usage guide for FlowerPower. This document covers more complex configurations and use cases to help you get the most out of the library.

Configuration Flexibility

FlowerPower offers multiple ways to configure your project, ensuring flexibility for different environments and workflows. The configuration is loaded in the following order of precedence:

  1. Programmatic Overrides: Highest priority.
  2. Environment Variables: Set in your shell or .env file.
  3. settings.py: A dedicated settings module.
  4. YAML files: anypath.yaml for your project.

Programmatic Configuration

You can override configuration settings directly in your Python code. This is useful for dynamic adjustments or for settings that are determined at runtime.

1
2
3
4
5
6
7
from flowerpower.core.config import settings

# Override the default Redis host
settings.set('redis.host', 'localhost')

# You can also update nested settings
settings.set('pipelines.my_pipeline.retries', 3)

Direct Module Usage

For fine-grained control, you can work directly with PipelineManager and JobQueueManager.

PipelineManager

The PipelineManager is responsible for loading, validating, and executing data pipelines.

from flowerpower.core.pipeline import PipelineManager

# Initialize the manager
pipeline_manager = PipelineManager()

# Load a specific pipeline
pipeline = pipeline_manager.get_pipeline("sales_etl")

# Execute the pipeline
result = pipeline.run(input_data="path/to/data.csv")
print(result)

JobQueueManager

The JobQueueManager handles job queuing, scheduling, and worker management.

from flowerpower.core.job_queue import JobQueueManager

# Initialize the manager
job_queue_manager = JobQueueManager()

# Enqueue a job
job = job_queue_manager.enqueue("my_task", arg1="value1", arg2="value2")
print(f"Job {job.id} enqueued.")

# Schedule a job to run at a specific time
job_queue_manager.schedule("my_task", cron="0 0 * * *") # Daily at midnight

Adapters

Integrate with popular MLOps and observability tools using adapters.

  • Hamilton Tracker: For dataflow and lineage tracking.
  • MLflow: For experiment tracking.
  • OpenTelemetry: For distributed tracing and metrics.

Filesystem Abstraction

FlowerPower uses the library fsspec-utils to provide a unified interface for interacting with different filesystems, including local storage, S3, and GCS. This allows you to switch between storage backends without changing your code.

Worker Management

You can manage workers to process your queued jobs.

Single Worker

Start a single worker in the foreground:

flowerpower job-queue start-worker

Worker Pool

Start a pool of workers in the background:

flowerpower job-queue start-worker --pool-size 5 --background

To stop background workers:

1
2
3
4
flowerpower job-queue stop-worker

```bash
flowerpower job-queue start-worker stop

Scheduling Options

FlowerPower supports several scheduling strategies for your jobs:

  • Cron: For recurring jobs at specific times (e.g., 0 2 * * *).
  • Interval: For jobs that run at regular intervals (e.g., every 30 minutes).
  • Date: For jobs that run once at a specific date and time.

Extensible I/O Plugins

The FlowerPower plugin flowerpower-io enhances FlowerPower's I/O capabilities, allowing you to connect to various data sources and sinks using a simple plugin architecture.

Supported Types Include:

  • CSV, JSON, Parquet
  • DeltaTable
  • DuckDB, PostgreSQL, MySQL, MSSQL, Oracle, SQLite
  • MQTT

To use a plugin, simply specify its type in your pipeline configuration.

Troubleshooting

Here are some common issues and how to resolve them:

  • Redis Connection Error: Ensure your Redis server is running and accessible. Check the redis.host and redis.port settings in your configuration.
  • Configuration Errors: Use the flowerpower config show command to inspect the loaded configuration and identify any misconfigurations.
  • Module Not Found: Make sure your pipeline and task modules are in Python's path. You can add directories to the path using the PYTHONPATH environment variable.

Note

For more detailed information, refer to the API documentation.