Skip to content

PipelineManager

Module: flowerpower.pipeline.PipelineManager

The PipelineManager is the central class for managing pipeline operations in FlowerPower. It provides a unified interface for creating, running, and managing pipelines.

Initialization

init

__init__(self, base_dir: str | None = None, storage_options: dict | Munch | BaseStorageOptions | None = None, fs: AbstractFileSystem | None = None, cfg_dir: str | None = None, pipelines_dir: str | None = None, job_queue_type: str = settings.JOB_QUEUE_TYPE, log_level: str | None = None)

Initializes the PipelineManager, setting up project paths and loading configurations.

Parameter Type Description
base_dir str \| None The base directory of the project. Defaults to the current working directory.
storage_options dict \| Munch \| BaseStorageOptions \| None Configuration options for filesystem access (e.g., S3, GCS).
fs AbstractFileSystem \| None An fsspec-compatible filesystem instance.
cfg_dir str \| None Override the default configuration directory name.
pipelines_dir str \| None Override the default pipelines directory name.
job_queue_type str The type of job queue to use for the project.
log_level str \| None The logging level for the manager.

Example:

1
2
3
4
from flowerpower.pipeline import PipelineManager

# Initialize a manager for the project in the current directory
manager = PipelineManager()

Methods

Attributes

Attribute Type Description
registry PipelineRegistry Handles pipeline registration and discovery.
scheduler PipelineScheduler Manages job scheduling and execution.
visualizer PipelineVisualizer Handles pipeline visualization.
io PipelineIOManager Handles pipeline import/export operations, consolidating common logic.
project_cfg ProjectConfig Current project configuration.
pipeline_cfg PipelineConfig Current pipeline configuration.
pipelines list[str] List of available pipeline names.
current_pipeline_name str Name of the currently loaded pipeline.
summary dict[str, dict \| str] Summary of all pipelines.
_base_dir str The base directory of the project.
_fs AbstractFileSystem The filesystem instance used by the manager.
_storage_options dict \| Munch \| BaseStorageOptions Storage options for the filesystem.
_cfg_dir str The directory for configuration files.
_pipelines_dir str The directory for pipeline modules.
_project_context FlowerPowerProject \| None Reference to the FlowerPowerProject instance.

Methods

run

run(self, name: str, inputs: dict | None = None, final_vars: list[str] | None = None, config: dict | None = None, cache: dict | None = None, executor_cfg: str | dict | ExecutorConfig | None = None, with_adapter_cfg: dict | WithAdapterConfig | None = None, pipeline_adapter_cfg: dict | PipelineAdapterConfig | None = None, project_adapter_cfg: dict | ProjectAdapterConfig | None = None, adapter: dict[str, Any] | None = None, reload: bool = False, log_level: str | None = None, max_retries: int | None = None, retry_delay: float | None = None, jitter_factor: float | None = None, retry_exceptions: tuple | list | None = None, on_success: Callable | tuple[Callable, tuple | None, dict | None] | None = None, on_failure: Callable | tuple[Callable, tuple | None, dict | None] | None = None)

Execute a pipeline synchronously and return its results. Parameters related to retries (max_retries, retry_delay, jitter_factor, retry_exceptions) configure the internal retry mechanism.

Parameter Type Description Default
name str Name of the pipeline to run. Must be a valid identifier.
inputs dict \| None Override pipeline input values. Example: {"data_date": "2025-04-28"} None
final_vars list[str] \| None Specify which output variables to return. Example: ["model", "metrics"] None
config dict \| None Configuration for Hamilton pipeline executor. Example: {"model": "LogisticRegression"} None
cache dict \| None Cache configuration for results. Example: {"recompute": ["node1", "final_node"]} None
executor_cfg str \| dict \| ExecutorConfig \| None Execution configuration, can be:
- str: Executor name, e.g. "threadpool", "local"
- dict: Raw config, e.g. {"type": "threadpool", "max_workers": 4}
- ExecutorConfig: Structured config object
None
with_adapter_cfg dict \| WithAdapterConfig \| None Adapter settings for pipeline execution. Example: {"opentelemetry": True, "tracker": False} None
pipeline_adapter_cfg dict \| PipelineAdapterConfig \| None Pipeline-specific adapter settings. Example: {"tracker": {"project_id": "123", "tags": {"env": "prod"}}} None
project_adapter_cfg dict \| ProjectAdapterConfig \| None Project-level adapter settings. Example: {"opentelemetry": {"host": "http://localhost:4317"}} None
adapter dict[str, Any] \| None Custom adapter instance for pipeline Example: {"ray_graph_adapter": RayGraphAdapter()} None
reload bool Force reload of pipeline configuration. False
log_level str \| None Logging level for the execution. Valid values: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" None
max_retries int \| None Maximum number of retries for execution. None
retry_delay float \| None Delay between retries in seconds. None
jitter_factor float \| None Random jitter factor to add to retry delay None
retry_exceptions tuple \| list \| None Exceptions that trigger a retry. None
on_success Callable \| tuple[Callable, tuple \| None, dict \| None] \| None Callback to run on successful pipeline execution. None
on_failure Callable \| tuple[Callable, tuple \| None, dict \| None] \| None Callback to run on pipeline execution failure. None

Returns: dict[str, Any] - Pipeline execution results, mapping output variable names to their computed values.

Raises:

  • ValueError: If pipeline name doesn't exist or configuration is invalid.
  • ImportError: If pipeline module cannot be imported.
  • RuntimeError: If execution fails due to pipeline or adapter errors.

Example

from flowerpower.pipeline import PipelineManager

manager = PipelineManager()

# Simple execution
result = manager.run("my_pipeline")

# With custom inputs
result = manager.run(
    "ml_pipeline",
    inputs={"data_date": "2025-01-01"},
    final_vars=["model", "metrics"]
)

new

new(self, name: str, overwrite: bool = False)

Create a new pipeline with the given name.

Parameter Type Description Default
name str Name for the new pipeline. Must be a valid Python identifier.
overwrite bool Whether to overwrite existing pipeline with same name. False

Returns: None

Raises:

  • ValueError: If name is invalid or pipeline exists and overwrite=False.
  • RuntimeError: If file creation fails.
  • PermissionError: If lacking write permissions.

Example

1
2
3
4
5
6
7
8
from flowerpower.pipeline import PipelineManager

# Create new pipeline
manager = PipelineManager()
manager.new("data_transformation")

# Overwrite existing pipeline
manager.new("data_transformation", overwrite=True)

delete

delete(self, name: str)

Delete an existing pipeline.

Parameter Type Description Default
name str Name of the pipeline to delete.

Returns: None

Raises:

  • FileNotFoundError: If the pipeline does not exist.
  • RuntimeError: If deletion fails.

Example

1
2
3
4
from flowerpower.pipeline import PipelineManager

manager = PipelineManager()
manager.delete("old_pipeline")

show_pipelines

show_pipelines(self, format: str = "table")

Display a summary of all available pipelines.

Parameter Type Description Default
format str Output format for the list ("table", "json", "yaml"). "table"

Returns: None

Example

1
2
3
4
5
6
7
8
9
from flowerpower.pipeline import PipelineManager

manager = PipelineManager()

# Show pipelines in table format (default)
manager.show_pipelines()

# Show pipelines in JSON format
manager.show_pipelines(format="json")

add_hook

add_hook(self, name: str, type: HookType, to: str, function_name: str)

Add a hook to a specific pipeline.

Parameter Type Description Default
name str Name of the pipeline to add the hook to.
type HookType Type of the hook (e.g., HookType.MQTT_BUILD_CONFIG).
to str Destination of the hook (e.g., "mqtt").
function_name str Name of the function to be called as the hook.

Returns: None

Raises:

  • ValueError: If the pipeline does not exist or hook type is invalid.
  • FileExistsError: If a hook with the same name and type already exists.

Example

1
2
3
4
5
6
7
8
9
from flowerpower.pipeline import PipelineManager, HookType

manager = PipelineManager()
manager.add_hook(
    name="my_pipeline",
    type=HookType.MQTT_BUILD_CONFIG,
    to="mqtt",
    function_name="build_mqtt_config"
)

remove_hook

remove_hook(self, name: str, type: HookType, function_name: str)

Remove a hook from a specific pipeline.

Parameter Type Description Default
name str Name of the pipeline to remove the hook from.
type HookType Type of the hook to remove.
function_name str Name of the function that was used as the hook.

Returns: None

Raises: FileNotFoundError: If the pipeline or hook does not exist.

Example

1
2
3
4
5
6
7
8
from flowerpower.pipeline import PipelineManager, HookType

manager = PipelineManager()
manager.remove_hook(
    name="my_pipeline",
    type=HookType.MQTT_BUILD_CONFIG,
    function_name="build_mqtt_config"
)

import_pipeline

import_pipeline(self, name: str, src_base_dir: str, src_fs: AbstractFileSystem | None = None, src_storage_options: dict | BaseStorageOptions | None = None, overwrite: bool = False)

Import a pipeline from another FlowerPower project.

Parameter Type Description Default
name str Name for the new pipeline in the current project.
src_base_dir str Source FlowerPower project directory or URI. Examples:
- Local: "/path/to/other/project"
- S3: "s3://bucket/project"
- GitHub: "github://org/repo/project"
src_fs AbstractFileSystem \| None Pre-configured source filesystem. Example: S3FileSystem(anon=False) None
src_storage_options dict \| BaseStorageOptions \| None Options for source filesystem access. Example: {"key": "ACCESS_KEY", "secret": "SECRET_KEY"} None
overwrite bool Whether to replace existing pipeline if name exists. False

Returns: None

Raises:

  • ValueError: If pipeline name exists and overwrite=False.
  • FileNotFoundError: If source pipeline not found.
  • RuntimeError: If import fails.

Example

from flowerpower.pipeline import PipelineManager
from s3fs import S3FileSystem

manager = PipelineManager()

# Import from local filesystem
manager.import_pipeline(
    "new_pipeline",
    "/path/to/other/project"
)

# Import from S3 with custom filesystem
s3 = S3FileSystem(anon=False)
manager.import_pipeline(
    "s3_pipeline",
    "s3://bucket/project",
    src_fs=s3
)

import_many

import_many(self, names: list[str], src_base_dir: str, src_fs: AbstractFileSystem | None = None, src_storage_options: dict | BaseStorageOptions | None = None, overwrite: bool = False)

Import multiple pipelines from another FlowerPower project.

Parameter Type Description Default
names list[str] List of pipeline names to import.
src_base_dir str Source FlowerPower project directory or URI. Examples:
- Local: "/path/to/other/project"
- S3: "s3://bucket/project"
- GitHub: "github://org/repo/project"
src_fs AbstractFileSystem \| None Pre-configured source filesystem. Example: S3FileSystem(anon=False) None
src_storage_options dict \| BaseStorageOptions \| None Options for source filesystem access. Example: {"key": "ACCESS_KEY", "secret": "SECRET_KEY"} None
overwrite bool Whether to replace existing pipelines if names exist. False

Returns: None

Raises:

  • ValueError: If any pipeline name exists and overwrite=False.
  • FileNotFoundError: If any source pipeline not found.
  • RuntimeError: If import fails.

Example

from flowerpower.pipeline import PipelineManager

manager = PipelineManager()

# Import multiple pipelines
manager.import_many(
    names=["pipeline1", "pipeline2"],
    src_base_dir="/path/to/other/project"
)

# Import multiple pipelines from S3
manager.import_many(
    names=["s3_pipeline_a", "s3_pipeline_b"],
    src_base_dir="s3://bucket/source",
    src_storage_options={
        "key": "ACCESS_KEY",
        "secret": "SECRET_KEY"
    }
)

export_pipeline

export_pipeline(self, name: str, dest_base_dir: str, dest_fs: AbstractFileSystem | None = None, dest_storage_options: dict | BaseStorageOptions | None = None, overwrite: bool = False)

Export a pipeline to another FlowerPower project.

Parameter Type Description Default
name str Name of the pipeline to export.
dest_base_dir str Destination FlowerPower project directory or URI. Examples:
- Local: "/path/to/backup"
- S3: "s3://bucket/backups"
- GCS: "gs://bucket/backups"
dest_fs AbstractFileSystem \| None Pre-configured destination filesystem. Example: GCSFileSystem(project='my-project') None
dest_storage_options dict \| BaseStorageOptions \| None Options for destination filesystem access. Example: {"token": "my_token"} None
overwrite bool Whether to replace existing pipeline in destination if name exists. False

Returns: None

Raises:

  • FileNotFoundError: If the pipeline does not exist in the current project.
  • FileExistsError: If destination pipeline exists and overwrite=False.
  • RuntimeError: If export fails.

Example

from flowerpower.pipeline import PipelineManager
from gcsfs import GCSFileSystem

manager = PipelineManager()

# Export to local backup
manager.export_pipeline(
    "my_pipeline",
    "/path/to/backup"
)

# Export to Google Cloud Storage
gcs = GCSFileSystem(project='my-project')
manager.export_pipeline(
    "prod_pipeline",
    "gs://my-bucket/backups",
    dest_fs=gcs
)

export_many

export_many(self, names: list[str], dest_base_dir: str, dest_fs: AbstractFileSystem | None = None, dest_storage_options: dict | BaseStorageOptions | None = None, overwrite: bool = False)

Export multiple pipelines to another FlowerPower project.

Parameter Type Description Default
names list[str] List of pipeline names to export.
dest_base_dir str Destination FlowerPower project directory or URI. Examples:
- Local: "/path/to/backup"
- S3: "s3://bucket/backups"
- GCS: "gs://bucket/backups"
dest_fs AbstractFileSystem \| None Pre-configured destination filesystem. Example: GCSFileSystem(project='my-project') None
dest_storage_options dict \| BaseStorageOptions \| None Options for destination filesystem access. Example: {"token": "my_token"} None
overwrite bool Whether to replace existing pipelines in destination if names exist. False

Returns: None

Raises:

  • FileNotFoundError: If any pipeline does not exist in the current project.
  • FileExistsError: If any destination pipeline exists and overwrite=False.
  • RuntimeError: If export fails.

Example

from flowerpower.pipeline import PipelineManager

manager = PipelineManager()

# Export multiple pipelines
manager.export_many(
    names=["pipeline1", "pipeline2"],
    dest_base_dir="/path/to/backup"
)

# Export multiple pipelines from S3
manager.export_many(
    names=["s3_pipeline_a", "s3_pipeline_b"],
    dest_base_dir="s3://bucket/backups",
    dest_storage_options={
        "key": "ACCESS_KEY",
        "secret": "SECRET_KEY"
    }
)

show_dag

show_dag(self, name: str, format: str = "png", show_outputs: bool = False, display_html: bool = False)

Generate and display the Directed Acyclic Graph (DAG) of a pipeline.

Parameter Type Description Default
name str Name of the pipeline to visualize.
format str Output format for the DAG ("png", "svg", "html", "dot"). "png"
show_outputs bool Whether to include output nodes in the DAG. False
display_html bool Whether to display the HTML directly in the notebook (only for "html" format). False

Returns: None (displays the DAG directly or saves it to a file).

Raises:

  • FileNotFoundError: If the pipeline does not exist.
  • ValueError: If format is invalid or visualization fails.

Example

1
2
3
4
5
6
7
8
9
from flowerpower.pipeline import PipelineManager

manager = PipelineManager()

# Show DAG as PNG
manager.show_dag("my_pipeline")

# Show DAG as SVG with outputs
manager.show_dag("ml_pipeline", format="svg", show_outputs=True)

show_execution_graph

show_execution_graph(self, name: str, format: str = "png", show_outputs: bool = False, display_html: bool = False, inputs: dict | None = None, config: dict | None = None)

Generate and display the execution graph of a pipeline, considering inputs and configuration.

Parameter Type Description Default
name str Name of the pipeline to visualize.
format str Output format for the graph ("png", "svg", "html", "dot"). "png"
show_outputs bool Whether to include output nodes in the graph. False
display_html bool Whether to display the HTML directly in the notebook (only for "html" format). False
inputs dict \| None Input values to consider for graph generation. None
config dict \| None Configuration for Hamilton pipeline executor. None

Returns: None (displays the graph directly or saves it to a file).

Raises:

  • FileNotFoundError: If the pipeline does not exist.
  • ValueError: If format is invalid or visualization fails.

Example

1
2
3
4
5
6
from flowerpower.pipeline import PipelineManager

manager = PipelineManager()

# Show execution graph
manager.show_execution_graph("my_pipeline", inputs={"data_date": "2025-01-01"})