PipelineManager¶
Module: flowerpower.pipeline.PipelineManager
The PipelineManager is the central class for managing pipeline operations in FlowerPower. It provides a unified interface for creating, running, and managing pipelines.
Initialization¶
init¶
Initializes the PipelineManager, setting up project paths and loading configurations.
| Parameter | Type | Description |
|---|---|---|
base_dir |
str \| None |
The base directory of the project. Defaults to the current working directory. |
storage_options |
dict \| Munch \| BaseStorageOptions \| None |
Configuration options for filesystem access (e.g., S3, GCS). |
fs |
AbstractFileSystem \| None |
An fsspec-compatible filesystem instance. |
cfg_dir |
str \| None |
Override the default configuration directory name. |
pipelines_dir |
str \| None |
Override the default pipelines directory name. |
log_level |
str \| None |
The logging level for the manager. |
Example:
Methods¶
Attributes¶
| Attribute | Type | Description |
|---|---|---|
registry |
PipelineRegistry |
Handles pipeline registration and discovery. |
visualizer |
PipelineVisualizer |
Handles pipeline visualization. |
io |
PipelineIOManager |
Handles pipeline import/export operations, consolidating common logic. |
project_cfg |
ProjectConfig |
Current project configuration. |
pipeline_cfg |
PipelineConfig |
Current pipeline configuration. |
pipelines |
list[str] |
List of available pipeline names. |
current_pipeline_name |
str |
Name of the currently loaded pipeline. |
summary |
dict[str, dict \| str] |
Summary of all pipelines. |
_base_dir |
str |
The base directory of the project. |
_fs |
AbstractFileSystem |
The filesystem instance used by the manager. |
_storage_options |
dict \| Munch \| BaseStorageOptions |
Storage options for the filesystem. |
_cfg_dir |
str |
The directory for configuration files. |
_pipelines_dir |
str |
The directory for pipeline modules. |
_project_context |
FlowerPowerProject \| None |
Reference to the FlowerPowerProject instance. |
Methods¶
run¶
Execute a pipeline synchronously and return its results. Parameters related to retries (max_retries, retry_delay, jitter_factor, retry_exceptions) configure the internal retry mechanism.
Legacy retry kwargs
The standalone retry kwargs are retained for backwards compatibility and now emit DeprecationWarning. Prefer supplying retry settings via run_config.retry or the builder helpers.
This method supports two primary ways of providing execution configuration:
1. Using a RunConfig object (recommended): Provides a structured way to pass all execution parameters.
2. Using individual parameters (**kwargs): Allows specifying parameters directly, which will override corresponding values in the RunConfig if both are provided.
When both run_config and individual parameters (**kwargs) are provided, the individual parameters take precedence over the corresponding values in run_config.
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str |
Name of the pipeline to run. Must be a valid identifier. | |
run_config |
RunConfig \| None |
Configuration object containing all execution parameters. See RunConfig for details. | None |
inputs |
dict \| None |
Override pipeline input values. Example: {"data_date": "2025-04-28"} |
None |
final_vars |
list[str] \| None |
Specify which output variables to return. Example: ["model", "metrics"] |
None |
config |
dict \| None |
Configuration for Hamilton pipeline executor. Example: {"model": "LogisticRegression"} |
None |
cache |
dict \| None |
Cache configuration for results. Example: {"recompute": ["node1", "final_node"]} |
None |
executor_cfg |
str \| dict \| ExecutorConfig \| None |
Execution configuration, can be: - str: Executor type, one of "synchronous", "threadpool", "processpool", "ray", "dask" - dict: Raw config, e.g. {"type": "threadpool", "max_workers": 4} - ExecutorConfig: Structured config object |
None |
with_adapter_cfg |
dict \| WithAdapterConfig \| None |
Adapter settings for pipeline execution. Example: {"opentelemetry": True, "tracker": False} |
None |
pipeline_adapter_cfg |
dict \| PipelineAdapterConfig \| None |
Pipeline-specific adapter settings. Example: {"tracker": {"project_id": "123", "tags": {"env": "prod"}}} |
None |
project_adapter_cfg |
dict \| ProjectAdapterConfig \| None |
Project-level adapter settings. Example: {"opentelemetry": {"host": "http://localhost:4317"}} |
None |
adapter |
dict[str, Any] \| None |
Custom adapter instance for pipeline Example: {"ray_graph_adapter": RayGraphAdapter()} |
None |
reload |
bool |
Force reload of pipeline configuration. | False |
log_level |
str \| None |
Logging level for the execution. Valid values: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" | None |
max_retries |
int \| None |
Deprecated. Legacy retry override; use run_config.retry. |
None |
retry_delay |
float \| None |
Deprecated. Legacy retry override; use run_config.retry. |
None |
jitter_factor |
float \| None |
Deprecated. Legacy retry override; use run_config.retry. |
None |
retry_exceptions |
tuple \| list \| None |
Deprecated. Legacy retry override; use run_config.retry. |
None |
on_success |
Callable \| tuple[Callable, tuple \| None, dict \| None] \| None |
Callback to run on successful pipeline execution. | None |
on_failure |
Callable \| tuple[Callable, tuple \| None, dict \| None] \| None |
Callback to run on pipeline execution failure. | None |
Returns: dict[str, Any] - Pipeline execution results, mapping output variable names to their computed values.
Raises:
ValueError: If pipeline name doesn't exist or configuration is invalid.ImportError: If pipeline module cannot be imported.RuntimeError: If execution fails due to pipeline or adapter errors.
Example¶
new¶
Create a new pipeline with the given name.
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str |
Name for the new pipeline. Must be a valid Python identifier. | |
overwrite |
bool |
Whether to overwrite existing pipeline with same name. | False |
Returns: None
Raises:
ValueError: If name is invalid or pipeline exists and overwrite=False.RuntimeError: If file creation fails.PermissionError: If lacking write permissions.
Example¶
delete¶
Delete an existing pipeline.
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str |
Name of the pipeline to delete. |
Returns: None
Raises:
FileNotFoundError: If the pipeline does not exist.RuntimeError: If deletion fails.
Example¶
show_pipelines¶
Display a summary of all available pipelines.
| Parameter | Type | Description | Default |
|---|---|---|---|
format |
str |
Output format ("table", "json", "yaml"). | "table" |
Returns: None
list_pipelines¶
Return a sorted list of available pipeline names.
Example¶
add_hook¶
Add a hook to a specific pipeline.
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str |
Name of the pipeline to add the hook to. | |
type |
HookType |
Type of the hook (e.g., HookType.MQTT_BUILD_CONFIG). |
|
to |
str |
Destination of the hook (e.g., "mqtt"). | |
function_name |
str |
Name of the function to be called as the hook. |
Returns: None
Raises:
ValueError: If the pipeline does not exist or hook type is invalid.FileExistsError: If a hook with the same name and type already exists.
Example¶
remove_hook¶
Remove a hook from a specific pipeline.
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str |
Name of the pipeline to remove the hook from. | |
type |
HookType |
Type of the hook to remove. | |
function_name |
str |
Name of the function that was used as the hook. |
Returns: None
Raises: FileNotFoundError: If the pipeline or hook does not exist.
Example¶
import_pipeline¶
Import a pipeline from another FlowerPower project.
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str |
Name for the new pipeline in the current project. | |
src_base_dir |
str |
Source FlowerPower project directory or URI. Examples: - Local: "/path/to/other/project" - S3: "s3://bucket/project" - GitHub: "github://org/repo/project" |
|
src_fs |
AbstractFileSystem \| None |
Pre-configured source filesystem. Example: S3FileSystem(anon=False) |
None |
src_storage_options |
dict \| BaseStorageOptions \| None |
Options for source filesystem access. Example: {"key": "ACCESS_KEY", "secret": "SECRET_KEY"} |
None |
overwrite |
bool |
Whether to replace existing pipeline if name exists. | False |
Returns: None
Raises:
ValueError: If pipeline name exists andoverwrite=False.FileNotFoundError: If source pipeline not found.RuntimeError: If import fails.
Example¶
import_many¶
Import multiple pipelines from another FlowerPower project.
| Parameter | Type | Description | Default |
|---|---|---|---|
names |
list[str] |
List of pipeline names to import. | |
src_base_dir |
str |
Source FlowerPower project directory or URI. Examples: - Local: "/path/to/other/project" - S3: "s3://bucket/project" - GitHub: "github://org/repo/project" |
|
src_fs |
AbstractFileSystem \| None |
Pre-configured source filesystem. Example: S3FileSystem(anon=False) |
None |
src_storage_options |
dict \| BaseStorageOptions \| None |
Options for source filesystem access. Example: {"key": "ACCESS_KEY", "secret": "SECRET_KEY"} |
None |
overwrite |
bool |
Whether to replace existing pipelines if names exist. | False |
Returns: None
Raises:
ValueError: If any pipeline name exists andoverwrite=False.FileNotFoundError: If any source pipeline not found.RuntimeError: If import fails.
Example¶
export_pipeline¶
Export a pipeline to another FlowerPower project.
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str |
Name of the pipeline to export. | |
dest_base_dir |
str |
Destination FlowerPower project directory or URI. Examples: - Local: "/path/to/backup" - S3: "s3://bucket/backups" - GCS: "gs://bucket/backups" |
|
dest_fs |
AbstractFileSystem \| None |
Pre-configured destination filesystem. Example: GCSFileSystem(project='my-project') |
None |
dest_storage_options |
dict \| BaseStorageOptions \| None |
Options for destination filesystem access. Example: {"token": "my_token"} |
None |
overwrite |
bool |
Whether to replace existing pipeline in destination if name exists. | False |
Returns: None
Raises:
FileNotFoundError: If the pipeline does not exist in the current project.FileExistsError: If destination pipeline exists andoverwrite=False.RuntimeError: If export fails.
Example¶
export_many¶
Export multiple pipelines to another FlowerPower project.
| Parameter | Type | Description | Default |
|---|---|---|---|
names |
list[str] |
List of pipeline names to export. | |
dest_base_dir |
str |
Destination FlowerPower project directory or URI. Examples: - Local: "/path/to/backup" - S3: "s3://bucket/backups" - GCS: "gs://bucket/backups" |
|
dest_fs |
AbstractFileSystem \| None |
Pre-configured destination filesystem. Example: GCSFileSystem(project='my-project') |
None |
dest_storage_options |
dict \| BaseStorageOptions \| None |
Options for destination filesystem access. Example: {"token": "my_token"} |
None |
overwrite |
bool |
Whether to replace existing pipelines in destination if names exist. | False |
Returns: None
Raises:
FileNotFoundError: If any pipeline does not exist in the current project.FileExistsError: If any destination pipeline exists andoverwrite=False.RuntimeError: If export fails.
Example¶
show_dag¶
Generate and display the Directed Acyclic Graph (DAG) of a pipeline.
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str |
Name of the pipeline to visualize. | |
format |
str |
Output format for the DAG ("png", "svg", "html", "dot"). | "png" |
show_outputs |
bool |
Whether to include output nodes in the DAG. | False |
display_html |
bool |
Whether to display the HTML directly in the notebook (only for "html" format). | False |
Returns: None (displays the DAG directly or saves it to a file).
Raises:
FileNotFoundError: If the pipeline does not exist.ValueError: If format is invalid or visualization fails.
Example¶
show_execution_graph¶
Generate and display the execution graph of a pipeline, considering inputs and configuration.
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str |
Name of the pipeline to visualize. | |
format |
str |
Output format for the graph ("png", "svg", "html", "dot"). | "png" |
show_outputs |
bool |
Whether to include output nodes in the graph. | False |
display_html |
bool |
Whether to display the HTML directly in the notebook (only for "html" format). | False |
inputs |
dict \| None |
Input values to consider for graph generation. | None |
config |
dict \| None |
Configuration for Hamilton pipeline executor. | None |
Returns: None (displays the graph directly or saves it to a file).
Raises:
FileNotFoundError: If the pipeline does not exist.ValueError: If format is invalid or visualization fails.