FlowerPowerProject¶
Module: flowerpower.flowerpower.FlowerPowerProject
The FlowerPowerProject class represents an initialized FlowerPower project, providing an interface to manage pipelines and project-level settings.
Initialization¶
init¶
Initializes a FlowerPowerProject instance. This constructor is typically called internally by FlowerPowerProject.load() or FlowerPowerProject.new().
| Parameter | Type | Description |
|---|---|---|
pipeline_manager |
PipelineManager |
An instance of PipelineManager to manage pipelines within this project. |
Attributes¶
| Attribute | Type | Description |
|---|---|---|
pipeline_manager |
PipelineManager |
Manages pipelines within the project. |
name |
str |
The name of the current project. |
Methods¶
run¶
Execute a pipeline synchronously and return its results.
Legacy retry kwargs
The standalone retry kwargs are retained for backwards compatibility and now emit DeprecationWarning. Prefer supplying retry settings via run_config.retry or the builder helpers.
This is a convenience method that delegates to the pipeline manager. It provides the same functionality as self.pipeline_manager.run().
This method supports two primary ways of providing execution configuration:
1. Using a RunConfig object (recommended): Provides a structured way to pass all execution parameters.
2. Using individual parameters (**kwargs): Allows specifying parameters directly, which will override corresponding values in the RunConfig if both are provided.
When both run_config and individual parameters (**kwargs) are provided, the individual parameters take precedence over the corresponding values in run_config.
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str |
Name of the pipeline to run. Must be a valid identifier. | |
run_config |
RunConfig \| None |
Configuration object containing all execution parameters. See RunConfig for details. | None |
inputs |
dict \| None |
Override pipeline input values. Example: {"data_date": "2025-04-28"} |
None |
final_vars |
list[str] \| None |
Specify which output variables to return. Example: ["model", "metrics"] |
None |
config |
dict \| None |
Configuration for Hamilton pipeline executor. Example: {"model": "LogisticRegression"} |
None |
cache |
dict \| None |
Cache configuration for results. Example: {"recompute": ["node1", "final_node"]} |
None |
executor_cfg |
str \| dict \| ExecutorConfig \| None |
Execution configuration, can be: - str: Executor type, one of "synchronous", "threadpool", "processpool", "ray", "dask" - dict: Raw config, e.g. {"type": "threadpool", "max_workers": 4} - ExecutorConfig: Structured config object |
None |
with_adapter_cfg |
dict \| WithAdapterConfig \| None |
Adapter settings for pipeline execution. Example: {"opentelemetry": True, "tracker": False} |
None |
pipeline_adapter_cfg |
dict \| PipelineAdapterConfig \| None |
Pipeline-specific adapter settings. Example: {"tracker": {"project_id": "123", "tags": {"env": "prod"}}} |
None |
project_adapter_cfg |
dict \| ProjectAdapterConfig \| None |
Project-level adapter settings. Example: {"opentelemetry": {"host": "http://localhost:4317"}} |
None |
adapter |
dict[str, Any] \| None |
Custom adapter instance for pipeline Example: {"ray_graph_adapter": RayGraphAdapter()} |
None |
reload |
bool |
Force reload of pipeline configuration. | False |
log_level |
str \| None |
Logging level for the execution. Valid values: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" | None |
max_retries |
int \| None |
Deprecated. Legacy retry override; use run_config.retry. |
None |
retry_delay |
float \| None |
Deprecated. Legacy retry override; use run_config.retry. |
None |
jitter_factor |
float \| None |
Deprecated. Legacy retry override; use run_config.retry. |
None |
retry_exceptions |
tuple \| list \| None |
Deprecated. Legacy retry override; use run_config.retry. |
None |
on_success |
Callable \| tuple[Callable, tuple | None, dict | None] \| None |
Callback to run on successful pipeline execution. | None |
on_failure |
Callable \| tuple[Callable, tuple | None, dict | None] \| None |
Callback to run on pipeline execution failure. | None |
Returns: dict[str, Any] - Pipeline execution results, mapping output variable names to their computed values.
Raises:
ValueError: If pipeline name doesn't exist or configuration is invalid.ImportError: If pipeline module cannot be imported.RuntimeError: If execution fails due to pipeline or adapter errors.
Example¶
load¶
Load an existing FlowerPower project.
If the project does not exist, it returns None and logs an error message.
| Parameter | Type | Description | Default |
|---|---|---|---|
base_dir |
str \| None |
The base directory of the project. If None, it defaults to the current working directory. |
None |
storage_options |
dict \| BaseStorageOptions \| None |
Storage options for the filesystem. | {} |
fs |
AbstractFileSystem \| None |
An instance of AbstractFileSystem to use for file operations. |
None |
log_level |
str \| None |
The logging level to set for the project. If None, it uses the default log level. |
None |
Returns: FlowerPowerProject | None - An instance if the project exists, otherwise None.
Example¶
new¶
Initialize a new FlowerPower project.
| Parameter | Type | Description | Default |
|---|---|---|---|
name |
str \| None |
The name of the project. If None, it defaults to the current directory name. |
None |
base_dir |
str \| None |
The base directory where the project will be created. If None, it defaults to the current working directory. |
None |
storage_options |
dict \| BaseStorageOptions \| None |
Storage options for the filesystem. | {} |
fs |
AbstractFileSystem \| None |
An instance of AbstractFileSystem to use for file operations. If None, uses the get_filesystem helper. |
None |
hooks_dir |
str |
The directory where the project hooks will be stored. | settings.HOOKS_DIR |
log_level |
str \| None |
The logging level to set for the project. If None, it uses the default log level. |
None |
overwrite |
bool |
Whether to overwrite an existing project at the specified base directory. | False |
Returns: FlowerPowerProject - An instance of FlowerPowerProject initialized with the new project.
Raises: FileExistsError: If the project already exists at the specified base directory.