Skip to content

FlowerPowerProject

Module: flowerpower.flowerpower.FlowerPowerProject

The FlowerPowerProject class represents an initialized FlowerPower project, providing an interface to manage pipelines, job queues, and project-level settings.

Initialization

init

__init__(self, pipeline_manager: PipelineManager, job_queue_manager: JobQueueManager | None = None)
...

Initializes a FlowerPowerProject instance. This constructor is typically called internally by FlowerPowerProject.load() or FlowerPowerProject.new().

Parameter Type Description
pipeline_manager PipelineManager An instance of PipelineManager to manage pipelines within this project.
job_queue_manager JobQueueManager \| None An optional instance of JobQueueManager to handle job queue operations.

Attributes

Attribute Type Description
pipeline_manager PipelineManager Manages pipelines within the project.
job_queue_manager JobQueueManager \| None Manages job queue operations, if configured.
name str The name of the current project.
_base_dir str The base directory of the project.
_fs AbstractFileSystem The fsspec-compatible filesystem instance used by the project.
_storage_options dict \| Munch \| BaseStorageOptions Storage options for the filesystem.
job_queue_type str \| None The type of job queue configured for the project (e.g., "rq").
job_queue_backend Any \| None The backend instance for the job queue, if configured.

Methods

run

run(self, name: str, inputs: dict | None = None, final_vars: list[str] | None = None, config: dict | None = None, cache: dict | None = None, executor_cfg: str | dict | ExecutorConfig | None = None, with_adapter_cfg: dict | WithAdapterConfig | None = None, pipeline_adapter_cfg: dict | PipelineAdapterConfig | None = None, project_adapter_cfg: dict | ProjectAdapterConfig | None = None, adapter: dict[str, Any] | None = None, reload: bool = False, log_level: str | None = None, max_retries: int | None = None, retry_delay: float | None = None, jitter_factor: float | None = None, retry_exceptions: tuple | list | None = None, on_success: Callable | tuple[Callable, tuple | None, dict | None] | None = None, on_failure: Callable | tuple[Callable, tuple | None, dict | None] | None = None) -> dict[str, Any]
...

Execute a pipeline synchronously and return its results.

This is a convenience method that delegates to the pipeline manager. It provides the same functionality as self.pipeline_manager.run().

Parameter Type Description Default
name str Name of the pipeline to run. Must be a valid identifier.
inputs dict \| None Override pipeline input values. Example: {"data_date": "2025-04-28"} None
final_vars list[str] \| None Specify which output variables to return. Example: ["model", "metrics"] None
config dict \| None Configuration for Hamilton pipeline executor. Example: {"model": "LogisticRegression"} None
cache dict \| None Cache configuration for results. Example: {"recompute": ["node1", "final_node"]} None
executor_cfg str \| dict \| ExecutorConfig \| None Execution configuration, can be:
- str: Executor name, e.g. "threadpool", "local"
- dict: Raw config, e.g. {"type": "threadpool", "max_workers": 4}
- ExecutorConfig: Structured config object
None
with_adapter_cfg dict \| WithAdapterConfig \| None Adapter settings for pipeline execution. Example: {"opentelemetry": True, "tracker": False} None
pipeline_adapter_cfg dict \| PipelineAdapterConfig \| None Pipeline-specific adapter settings. Example: {"tracker": {"project_id": "123", "tags": {"env": "prod"}}} None
project_adapter_cfg dict \| ProjectAdapterConfig \| None Project-level adapter settings. Example: {"opentelemetry": {"host": "http://localhost:4317"}} None
adapter dict[str, Any] \| None Custom adapter instance for pipeline Example: {"ray_graph_adapter": RayGraphAdapter()} None
reload bool Force reload of pipeline configuration. False
log_level str \| None Logging level for the execution. Valid values: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" None
max_retries int \| None Maximum number of retries for execution. None
retry_delay float \| None Delay between retries in seconds. None
jitter_factor float \| None Random jitter factor to add to retry delay None
retry_exceptions tuple \| list \| None Exceptions that trigger a retry. None
on_success Callable \| tuple[Callable, tuple | None, dict | None] \| None Callback to run on successful pipeline execution. None
on_failure Callable \| tuple[Callable, tuple | None, dict | None] \| None Callback to run on pipeline execution failure. None

Returns: dict[str, Any] - Pipeline execution results, mapping output variable names to their computed values.

Raises:

  • ValueError: If pipeline name doesn't exist or configuration is invalid.
  • ImportError: If pipeline module cannot be imported.
  • RuntimeError: If execution fails due to pipeline or adapter errors.

Example

from flowerpower import FlowerPowerProject

project = FlowerPowerProject.load(".")

# Simple execution
result = project.run("my_pipeline")

# With custom inputs
result = project.run(
    "ml_pipeline",
    inputs={"data_date": "2025-01-01"},
    final_vars=["model", "metrics"]
)

enqueue

enqueue(self, name: str, *args, **kwargs)
...

Enqueue a pipeline for execution via the job queue.

This is a convenience method that delegates to the job queue manager's enqueue_pipeline method. It provides asynchronous pipeline execution.

Parameter Type Description
name str Name of the pipeline to enqueue.
*args Any Additional positional arguments for job execution.
**kwargs Any Keyword arguments for pipeline execution and job queue options. Supports all parameters from pipeline_manager.run() plus job queue specific options:
- run_in: Schedule the job to run after a delay
- run_at: Schedule the job to run at a specific datetime
- queue_name: Queue to use (for RQ)
- timeout: Job execution timeout
- retry: Number of retries
- result_ttl: Result time to live
- ttl: Job time to live

Returns: Job - Job ID or result depending on implementation, or None if job queue not configured.

Raises: RuntimeError: If job queue manager is not configured.

Example

from flowerpower import FlowerPowerProject
from datetime import datetime

project = FlowerPowerProject.load(".")

# Immediate execution via job queue
job_id = project.enqueue("my_pipeline", inputs={"date": "today"})

# Delayed execution
job_id = project.enqueue("my_pipeline", inputs={"date": "today"}, run_in=300)

# Scheduled execution
job_id = project.enqueue(
    "my_pipeline",
    inputs={"date": "today"},
    run_at=datetime(2025, 1, 1, 9, 0)
)

schedule

schedule(self, name: str, *args, **kwargs)
...

Schedule a pipeline for recurring or future execution.

This is a convenience method that delegates to the job queue manager's schedule_pipeline method. It provides scheduled pipeline execution.

Parameter Type Description
name str Name of the pipeline to schedule.
*args Any Additional positional arguments for scheduling.
**kwargs Any Keyword arguments for pipeline execution and scheduling options. Supports all parameters from pipeline_manager.run() plus scheduling options:
- cron: Cron expression for recurring execution (e.g., "0 9 * * *")
- interval: Time interval for recurring execution (int seconds or dict)
- date: Future date for one-time execution (datetime or ISO string)
- schedule_id: Unique identifier for the schedule
- overwrite: Whether to overwrite existing schedule with same ID

Returns: ScheduledJob - Schedule ID or job ID depending on implementation, or None if job queue not configured.

Raises: RuntimeError: If job queue manager is not configured.

Example

from flowerpower import FlowerPowerProject
from datetime import datetime, timedelta

project = FlowerPowerProject.load(".")

# Daily schedule with cron
schedule_id = project.schedule(
    "daily_metrics",
    cron="0 9 * * *",  # 9 AM daily
    inputs={"date": "{{ execution_date }}"}
)

# Interval-based schedule
schedule_id = project.schedule(
    "monitoring",
    interval={"minutes": 15},
    inputs={"check_type": "health"}
)

# Future one-time execution
future_date = datetime.now() + timedelta(days=1)
schedule_id = project.schedule(
    "batch_process",
    date=future_date,
    inputs={"process_date": "tomorrow"}
)

start_worker

start_worker(self, background: bool = False, queue_names: list[str] | None = None, with_scheduler: bool = True, **kwargs: Any) -> None
...

Start a worker process for processing jobs from the queues.

This is a convenience method that delegates to the job queue manager's start_worker method.

Parameter Type Description Default
background bool If True, runs the worker in a non-blocking background mode. If False, runs in the current process and blocks until stopped. False
queue_names list[str] \| None List of queue names to process. If None, processes all queues defined in the backend configuration. None
with_scheduler bool Whether to include the scheduler queue for processing scheduled jobs (if supported by the backend). True
**kwargs Any Additional worker configuration options specific to the job queue backend.

Raises: RuntimeError: If job queue manager is not configured.

Example

from flowerpower import FlowerPowerProject

project = FlowerPowerProject.load(".")

# Start worker in foreground (blocks)
project.start_worker()

# Start worker in background
project.start_worker(background=True)

# Start worker for specific queues
project.start_worker(queue_names=["high_priority", "default"])

stop_worker

stop_worker(self) -> None
...

Stop the worker process.

This is a convenience method that delegates to the job queue manager's stop_worker method.

Raises: RuntimeError: If job queue manager is not configured.

Example

1
2
3
4
from flowerpower import FlowerPowerProject

project = FlowerPowerProject.load(".")
project.stop_worker()

start_worker_pool

start_worker_pool(self, num_workers: int | None = None, background: bool = False, queue_names: list[str] | None = None, with_scheduler: bool = True, **kwargs: Any) -> None
...

Start a pool of worker processes to handle jobs in parallel.

This is a convenience method that delegates to the job queue manager's start_worker_pool method.

Parameter Type Description Default
num_workers int \| None Number of worker processes to start. If None, uses CPU count or backend-specific default. None
background bool If True, runs the worker pool in a non-blocking background mode. If False, runs in the current process and blocks until stopped. False
queue_names list[str] \| None List of queue names to process. If None, processes all queues defined in the backend configuration. None
with_scheduler bool Whether to include the scheduler queue for processing scheduled jobs (if supported by the backend). True
**kwargs Any Additional worker pool configuration options specific to the job queue backend.

Raises: RuntimeError: If job queue manager is not configured.

Example

from flowerpower import FlowerPowerProject

project = FlowerPowerProject.load(".")

# Start worker pool with default number of workers
project.start_worker_pool()

# Start 4 workers in background
project.start_worker_pool(num_workers=4, background=True)

# Start worker pool for specific queues
project.start_worker_pool(
    num_workers=2,
    queue_names=["high_priority", "default"]
)

stop_worker_pool

stop_worker_pool(self) -> None
...

Stop all worker processes in the worker pool.

This is a convenience method that delegates to the job queue manager's stop_worker_pool method.

Raises: RuntimeError: If job queue manager is not configured.

Example

1
2
3
4
from flowerpower import FlowerPowerProject

project = FlowerPowerProject.load(".")
project.stop_worker_pool()

load

load(cls, base_dir: str | None = None, storage_options: dict | BaseStorageOptions | None = {}, fs: AbstractFileSystem | None = None, log_level: str | None = None) -> "FlowerPowerProject"
...

Load an existing FlowerPower project.

If the project does not exist, it will raise an error.

Parameter Type Description Default
base_dir str \| None The base directory of the project. If None, it defaults to the current working directory. None
storage_options dict \| BaseStorageOptions \| None Storage options for the filesystem. {}
fs AbstractFileSystem \| None An instance of AbstractFileSystem to use for file operations. If None, uses the get_filesystem helper. None
log_level str \| None The logging level to set for the project. If None, it uses the default log level. None

Returns: FlowerPowerProject - An instance of FlowerPowerProject if the project exists, otherwise None.

Raises: FileNotFoundError: If the project does not exist at the specified base directory.

Example

1
2
3
4
5
6
7
from flowerpower import FlowerPowerProject

# Load a project from the current directory
project = FlowerPowerProject.load(".")

# Load a project from a specific path
project = FlowerPowerProject.load("/path/to/my/project")

new

new(cls, name: str | None = None, base_dir: str | None = None, storage_options: dict | BaseStorageOptions | None = {}, fs: AbstractFileSystem | None = None, job_queue_type: str = settings.JOB_QUEUE_TYPE, hooks_dir: str = settings.HOOKS_DIR, log_level: str | None = None) -> "FlowerPowerProject"
...

Initialize a new FlowerPower project.

Parameter Type Description Default
name str \| None The name of the project. If None, it defaults to the current directory name. None
base_dir str \| None The base directory where the project will be created. If None, it defaults to the current working directory. None
storage_options dict \| BaseStorageOptions \| None Storage options for the filesystem. {}
fs AbstractFileSystem \| None An instance of AbstractFileSystem to use for file operations. If None, uses the get_filesystem helper. None
job_queue_type str The type of job queue to use for the project. settings.JOB_QUEUE_TYPE
hooks_dir str The directory where the project hooks will be stored. settings.HOOKS_DIR
log_level str \| None The logging level to set for the project. If None, it uses the default log level. None

Returns: FlowerPowerProject - An instance of FlowerPowerProject initialized with the new project.

Raises: FileExistsError: If the project already exists at the specified base directory.

Example

1
2
3
4
5
6
7
from flowerpower import FlowerPowerProject

# Initialize a new project in the current directory
project = FlowerPowerProject.new()

# Initialize a new project with a specific name and job queue type
project = FlowerPowerProject.new(name="my-new-project", job_queue_type="rq")