PipelineManager¶
Module: flowerpower.pipeline.PipelineManager
The PipelineManager
is the central class for managing pipeline operations in FlowerPower. It provides a unified interface for creating, running, and managing pipelines.
Initialization¶
init¶
Initializes the PipelineManager
, setting up project paths and loading configurations.
Parameter | Type | Description |
---|---|---|
base_dir |
str \| None |
The base directory of the project. Defaults to the current working directory. |
storage_options |
dict \| Munch \| BaseStorageOptions \| None |
Configuration options for filesystem access (e.g., S3, GCS). |
fs |
AbstractFileSystem \| None |
An fsspec-compatible filesystem instance. |
cfg_dir |
str \| None |
Override the default configuration directory name. |
pipelines_dir |
str \| None |
Override the default pipelines directory name. |
job_queue_type |
str |
The type of job queue to use for the project. |
log_level |
str \| None |
The logging level for the manager. |
Example:
Methods¶
Attributes¶
Attribute | Type | Description |
---|---|---|
registry |
PipelineRegistry |
Handles pipeline registration and discovery. |
scheduler |
PipelineScheduler |
Manages job scheduling and execution. |
visualizer |
PipelineVisualizer |
Handles pipeline visualization. |
io |
PipelineIOManager |
Handles pipeline import/export operations, consolidating common logic. |
project_cfg |
ProjectConfig |
Current project configuration. |
pipeline_cfg |
PipelineConfig |
Current pipeline configuration. |
pipelines |
list[str] |
List of available pipeline names. |
current_pipeline_name |
str |
Name of the currently loaded pipeline. |
summary |
dict[str, dict \| str] |
Summary of all pipelines. |
_base_dir |
str |
The base directory of the project. |
_fs |
AbstractFileSystem |
The filesystem instance used by the manager. |
_storage_options |
dict \| Munch \| BaseStorageOptions |
Storage options for the filesystem. |
_cfg_dir |
str |
The directory for configuration files. |
_pipelines_dir |
str |
The directory for pipeline modules. |
_project_context |
FlowerPowerProject \| None |
Reference to the FlowerPowerProject instance. |
Methods¶
run¶
Execute a pipeline synchronously and return its results. Parameters related to retries (max_retries
, retry_delay
, jitter_factor
, retry_exceptions
) configure the internal retry mechanism.
Parameter | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the pipeline to run. Must be a valid identifier. | |
inputs |
dict \| None |
Override pipeline input values. Example: {"data_date": "2025-04-28"} |
None |
final_vars |
list[str] \| None |
Specify which output variables to return. Example: ["model", "metrics"] |
None |
config |
dict \| None |
Configuration for Hamilton pipeline executor. Example: {"model": "LogisticRegression"} |
None |
cache |
dict \| None |
Cache configuration for results. Example: {"recompute": ["node1", "final_node"]} |
None |
executor_cfg |
str \| dict \| ExecutorConfig \| None |
Execution configuration, can be: - str : Executor name, e.g. "threadpool", "local" - dict : Raw config, e.g. {"type": "threadpool", "max_workers": 4} - ExecutorConfig : Structured config object |
None |
with_adapter_cfg |
dict \| WithAdapterConfig \| None |
Adapter settings for pipeline execution. Example: {"opentelemetry": True, "tracker": False} |
None |
pipeline_adapter_cfg |
dict \| PipelineAdapterConfig \| None |
Pipeline-specific adapter settings. Example: {"tracker": {"project_id": "123", "tags": {"env": "prod"}}} |
None |
project_adapter_cfg |
dict \| ProjectAdapterConfig \| None |
Project-level adapter settings. Example: {"opentelemetry": {"host": "http://localhost:4317"}} |
None |
adapter |
dict[str, Any] \| None |
Custom adapter instance for pipeline Example: {"ray_graph_adapter": RayGraphAdapter()} |
None |
reload |
bool |
Force reload of pipeline configuration. | False |
log_level |
str \| None |
Logging level for the execution. Valid values: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" | None |
max_retries |
int \| None |
Maximum number of retries for execution. | None |
retry_delay |
float \| None |
Delay between retries in seconds. | None |
jitter_factor |
float \| None |
Random jitter factor to add to retry delay | None |
retry_exceptions |
tuple \| list \| None |
Exceptions that trigger a retry. | None |
on_success |
Callable \| tuple[Callable, tuple \| None, dict \| None] \| None |
Callback to run on successful pipeline execution. | None |
on_failure |
Callable \| tuple[Callable, tuple \| None, dict \| None] \| None |
Callback to run on pipeline execution failure. | None |
Returns: dict[str, Any]
- Pipeline execution results, mapping output variable names to their computed values.
Raises:
ValueError
: If pipeline name doesn't exist or configuration is invalid.ImportError
: If pipeline module cannot be imported.RuntimeError
: If execution fails due to pipeline or adapter errors.
Example¶
new¶
Create a new pipeline with the given name.
Parameter | Type | Description | Default |
---|---|---|---|
name |
str |
Name for the new pipeline. Must be a valid Python identifier. | |
overwrite |
bool |
Whether to overwrite existing pipeline with same name. | False |
Returns: None
Raises:
ValueError
: If name is invalid or pipeline exists and overwrite=False
.RuntimeError
: If file creation fails.PermissionError
: If lacking write permissions.
Example¶
delete¶
Delete an existing pipeline.
Parameter | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the pipeline to delete. |
Returns: None
Raises:
FileNotFoundError
: If the pipeline does not exist.RuntimeError
: If deletion fails.
Example¶
show_pipelines¶
Display a summary of all available pipelines.
Parameter | Type | Description | Default |
---|---|---|---|
format |
str |
Output format for the list ("table", "json", "yaml"). | "table" |
Returns: None
Example¶
add_hook¶
Add a hook to a specific pipeline.
Parameter | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the pipeline to add the hook to. | |
type |
HookType |
Type of the hook (e.g., HookType.MQTT_BUILD_CONFIG ). |
|
to |
str |
Destination of the hook (e.g., "mqtt"). | |
function_name |
str |
Name of the function to be called as the hook. |
Returns: None
Raises:
ValueError
: If the pipeline does not exist or hook type is invalid.FileExistsError
: If a hook with the same name and type already exists.
Example¶
remove_hook¶
Remove a hook from a specific pipeline.
Parameter | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the pipeline to remove the hook from. | |
type |
HookType |
Type of the hook to remove. | |
function_name |
str |
Name of the function that was used as the hook. |
Returns: None
Raises: FileNotFoundError
: If the pipeline or hook does not exist.
Example¶
import_pipeline¶
Import a pipeline from another FlowerPower project.
Parameter | Type | Description | Default |
---|---|---|---|
name |
str |
Name for the new pipeline in the current project. | |
src_base_dir |
str |
Source FlowerPower project directory or URI. Examples: - Local: "/path/to/other/project" - S3: "s3://bucket/project" - GitHub: "github://org/repo/project" |
|
src_fs |
AbstractFileSystem \| None |
Pre-configured source filesystem. Example: S3FileSystem(anon=False) |
None |
src_storage_options |
dict \| BaseStorageOptions \| None |
Options for source filesystem access. Example: {"key": "ACCESS_KEY", "secret": "SECRET_KEY"} |
None |
overwrite |
bool |
Whether to replace existing pipeline if name exists. | False |
Returns: None
Raises:
ValueError
: If pipeline name exists andoverwrite=False
.FileNotFoundError
: If source pipeline not found.RuntimeError
: If import fails.
Example¶
import_many¶
Import multiple pipelines from another FlowerPower project.
Parameter | Type | Description | Default |
---|---|---|---|
names |
list[str] |
List of pipeline names to import. | |
src_base_dir |
str |
Source FlowerPower project directory or URI. Examples: - Local: "/path/to/other/project" - S3: "s3://bucket/project" - GitHub: "github://org/repo/project" |
|
src_fs |
AbstractFileSystem \| None |
Pre-configured source filesystem. Example: S3FileSystem(anon=False) |
None |
src_storage_options |
dict \| BaseStorageOptions \| None |
Options for source filesystem access. Example: {"key": "ACCESS_KEY", "secret": "SECRET_KEY"} |
None |
overwrite |
bool |
Whether to replace existing pipelines if names exist. | False |
Returns: None
Raises:
ValueError
: If any pipeline name exists andoverwrite=False
.FileNotFoundError
: If any source pipeline not found.RuntimeError
: If import fails.
Example¶
export_pipeline¶
Export a pipeline to another FlowerPower project.
Parameter | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the pipeline to export. | |
dest_base_dir |
str |
Destination FlowerPower project directory or URI. Examples: - Local: "/path/to/backup" - S3: "s3://bucket/backups" - GCS: "gs://bucket/backups" |
|
dest_fs |
AbstractFileSystem \| None |
Pre-configured destination filesystem. Example: GCSFileSystem(project='my-project') |
None |
dest_storage_options |
dict \| BaseStorageOptions \| None |
Options for destination filesystem access. Example: {"token": "my_token"} |
None |
overwrite |
bool |
Whether to replace existing pipeline in destination if name exists. | False |
Returns: None
Raises:
FileNotFoundError
: If the pipeline does not exist in the current project.FileExistsError
: If destination pipeline exists andoverwrite=False
.RuntimeError
: If export fails.
Example¶
export_many¶
Export multiple pipelines to another FlowerPower project.
Parameter | Type | Description | Default |
---|---|---|---|
names |
list[str] |
List of pipeline names to export. | |
dest_base_dir |
str |
Destination FlowerPower project directory or URI. Examples: - Local: "/path/to/backup" - S3: "s3://bucket/backups" - GCS: "gs://bucket/backups" |
|
dest_fs |
AbstractFileSystem \| None |
Pre-configured destination filesystem. Example: GCSFileSystem(project='my-project') |
None |
dest_storage_options |
dict \| BaseStorageOptions \| None |
Options for destination filesystem access. Example: {"token": "my_token"} |
None |
overwrite |
bool |
Whether to replace existing pipelines in destination if names exist. | False |
Returns: None
Raises:
FileNotFoundError
: If any pipeline does not exist in the current project.FileExistsError
: If any destination pipeline exists andoverwrite=False
.RuntimeError
: If export fails.
Example¶
show_dag¶
Generate and display the Directed Acyclic Graph (DAG) of a pipeline.
Parameter | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the pipeline to visualize. | |
format |
str |
Output format for the DAG ("png", "svg", "html", "dot"). | "png" |
show_outputs |
bool |
Whether to include output nodes in the DAG. | False |
display_html |
bool |
Whether to display the HTML directly in the notebook (only for "html" format). | False |
Returns: None
(displays the DAG directly or saves it to a file).
Raises:
FileNotFoundError
: If the pipeline does not exist.ValueError
: If format is invalid or visualization fails.
Example¶
show_execution_graph¶
Generate and display the execution graph of a pipeline, considering inputs and configuration.
Parameter | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the pipeline to visualize. | |
format |
str |
Output format for the graph ("png", "svg", "html", "dot"). | "png" |
show_outputs |
bool |
Whether to include output nodes in the graph. | False |
display_html |
bool |
Whether to display the HTML directly in the notebook (only for "html" format). | False |
inputs |
dict \| None |
Input values to consider for graph generation. | None |
config |
dict \| None |
Configuration for Hamilton pipeline executor. | None |
Returns: None
(displays the graph directly or saves it to a file).
Raises:
FileNotFoundError
: If the pipeline does not exist.ValueError
: If format is invalid or visualization fails.