FlowerPowerProject¶
Module: flowerpower.flowerpower.FlowerPowerProject
The FlowerPowerProject
class represents an initialized FlowerPower project, providing an interface to manage pipelines, job queues, and project-level settings.
Initialization¶
init¶
Initializes a FlowerPowerProject
instance. This constructor is typically called internally by FlowerPowerProject.load()
or FlowerPowerProject.new()
.
Parameter | Type | Description |
---|---|---|
pipeline_manager |
PipelineManager |
An instance of PipelineManager to manage pipelines within this project. |
job_queue_manager |
JobQueueManager \| None |
An optional instance of JobQueueManager to handle job queue operations. |
Attributes¶
Attribute | Type | Description |
---|---|---|
pipeline_manager |
PipelineManager |
Manages pipelines within the project. |
job_queue_manager |
JobQueueManager \| None |
Manages job queue operations, if configured. |
name |
str |
The name of the current project. |
_base_dir |
str |
The base directory of the project. |
_fs |
AbstractFileSystem |
The fsspec-compatible filesystem instance used by the project. |
_storage_options |
dict \| Munch \| BaseStorageOptions |
Storage options for the filesystem. |
job_queue_type |
str \| None |
The type of job queue configured for the project (e.g., "rq"). |
job_queue_backend |
Any \| None |
The backend instance for the job queue, if configured. |
Methods¶
run¶
Execute a pipeline synchronously and return its results.
This is a convenience method that delegates to the pipeline manager. It provides the same functionality as self.pipeline_manager.run()
.
Parameter | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the pipeline to run. Must be a valid identifier. | |
inputs |
dict \| None |
Override pipeline input values. Example: {"data_date": "2025-04-28"} |
None |
final_vars |
list[str] \| None |
Specify which output variables to return. Example: ["model", "metrics"] |
None |
config |
dict \| None |
Configuration for Hamilton pipeline executor. Example: {"model": "LogisticRegression"} |
None |
cache |
dict \| None |
Cache configuration for results. Example: {"recompute": ["node1", "final_node"]} |
None |
executor_cfg |
str \| dict \| ExecutorConfig \| None |
Execution configuration, can be: - str : Executor name, e.g. "threadpool", "local" - dict : Raw config, e.g. {"type": "threadpool", "max_workers": 4} - ExecutorConfig : Structured config object |
None |
with_adapter_cfg |
dict \| WithAdapterConfig \| None |
Adapter settings for pipeline execution. Example: {"opentelemetry": True, "tracker": False} |
None |
pipeline_adapter_cfg |
dict \| PipelineAdapterConfig \| None |
Pipeline-specific adapter settings. Example: {"tracker": {"project_id": "123", "tags": {"env": "prod"}}} |
None |
project_adapter_cfg |
dict \| ProjectAdapterConfig \| None |
Project-level adapter settings. Example: {"opentelemetry": {"host": "http://localhost:4317"}} |
None |
adapter |
dict[str, Any] \| None |
Custom adapter instance for pipeline Example: {"ray_graph_adapter": RayGraphAdapter()} |
None |
reload |
bool |
Force reload of pipeline configuration. | False |
log_level |
str \| None |
Logging level for the execution. Valid values: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" | None |
max_retries |
int \| None |
Maximum number of retries for execution. | None |
retry_delay |
float \| None |
Delay between retries in seconds. | None |
jitter_factor |
float \| None |
Random jitter factor to add to retry delay | None |
retry_exceptions |
tuple \| list \| None |
Exceptions that trigger a retry. | None |
on_success |
Callable \| tuple[Callable, tuple | None, dict | None] \| None |
Callback to run on successful pipeline execution. | None |
on_failure |
Callable \| tuple[Callable, tuple | None, dict | None] \| None |
Callback to run on pipeline execution failure. | None |
Returns: dict[str, Any]
- Pipeline execution results, mapping output variable names to their computed values.
Raises:
ValueError
: If pipeline name doesn't exist or configuration is invalid.ImportError
: If pipeline module cannot be imported.RuntimeError
: If execution fails due to pipeline or adapter errors.
Example¶
enqueue¶
Enqueue a pipeline for execution via the job queue.
This is a convenience method that delegates to the job queue manager's enqueue_pipeline
method. It provides asynchronous pipeline execution.
Parameter | Type | Description |
---|---|---|
name |
str |
Name of the pipeline to enqueue. |
*args |
Any |
Additional positional arguments for job execution. |
**kwargs |
Any |
Keyword arguments for pipeline execution and job queue options. Supports all parameters from pipeline_manager.run() plus job queue specific options: - run_in : Schedule the job to run after a delay - run_at : Schedule the job to run at a specific datetime - queue_name : Queue to use (for RQ) - timeout : Job execution timeout - retry : Number of retries - result_ttl : Result time to live - ttl : Job time to live |
Returns: Job
- Job ID or result depending on implementation, or None
if job queue not configured.
Raises: RuntimeError
: If job queue manager is not configured.
Example¶
schedule¶
Schedule a pipeline for recurring or future execution.
This is a convenience method that delegates to the job queue manager's schedule_pipeline
method. It provides scheduled pipeline execution.
Parameter | Type | Description |
---|---|---|
name |
str |
Name of the pipeline to schedule. |
*args |
Any |
Additional positional arguments for scheduling. |
**kwargs |
Any |
Keyword arguments for pipeline execution and scheduling options. Supports all parameters from pipeline_manager.run() plus scheduling options: - cron : Cron expression for recurring execution (e.g., "0 9 * * *") - interval : Time interval for recurring execution (int seconds or dict) - date : Future date for one-time execution (datetime or ISO string) - schedule_id : Unique identifier for the schedule - overwrite : Whether to overwrite existing schedule with same ID |
Returns: ScheduledJob
- Schedule ID or job ID depending on implementation, or None
if job queue not configured.
Raises: RuntimeError
: If job queue manager is not configured.
Example¶
start_worker¶
Start a worker process for processing jobs from the queues.
This is a convenience method that delegates to the job queue manager's start_worker
method.
Parameter | Type | Description | Default |
---|---|---|---|
background |
bool |
If True , runs the worker in a non-blocking background mode. If False , runs in the current process and blocks until stopped. |
False |
queue_names |
list[str] \| None |
List of queue names to process. If None , processes all queues defined in the backend configuration. |
None |
with_scheduler |
bool |
Whether to include the scheduler queue for processing scheduled jobs (if supported by the backend). | True |
**kwargs |
Any |
Additional worker configuration options specific to the job queue backend. |
Raises: RuntimeError
: If job queue manager is not configured.
Example¶
stop_worker¶
Stop the worker process.
This is a convenience method that delegates to the job queue manager's stop_worker
method.
Raises: RuntimeError
: If job queue manager is not configured.
Example¶
start_worker_pool¶
Start a pool of worker processes to handle jobs in parallel.
This is a convenience method that delegates to the job queue manager's start_worker_pool
method.
Parameter | Type | Description | Default |
---|---|---|---|
num_workers |
int \| None |
Number of worker processes to start. If None , uses CPU count or backend-specific default. |
None |
background |
bool |
If True , runs the worker pool in a non-blocking background mode. If False , runs in the current process and blocks until stopped. |
False |
queue_names |
list[str] \| None |
List of queue names to process. If None , processes all queues defined in the backend configuration. |
None |
with_scheduler |
bool |
Whether to include the scheduler queue for processing scheduled jobs (if supported by the backend). | True |
**kwargs |
Any |
Additional worker pool configuration options specific to the job queue backend. |
Raises: RuntimeError
: If job queue manager is not configured.
Example¶
stop_worker_pool¶
Stop all worker processes in the worker pool.
This is a convenience method that delegates to the job queue manager's stop_worker_pool
method.
Raises: RuntimeError
: If job queue manager is not configured.
Example¶
load¶
Load an existing FlowerPower project.
If the project does not exist, it will raise an error.
Parameter | Type | Description | Default |
---|---|---|---|
base_dir |
str \| None |
The base directory of the project. If None , it defaults to the current working directory. |
None |
storage_options |
dict \| BaseStorageOptions \| None |
Storage options for the filesystem. | {} |
fs |
AbstractFileSystem \| None |
An instance of AbstractFileSystem to use for file operations. If None, uses the get_filesystem helper. |
None |
log_level |
str \| None |
The logging level to set for the project. If None , it uses the default log level. |
None |
Returns: FlowerPowerProject
- An instance of FlowerPowerProject
if the project exists, otherwise None
.
Raises: FileNotFoundError
: If the project does not exist at the specified base directory.
Example¶
new¶
Initialize a new FlowerPower project.
Parameter | Type | Description | Default |
---|---|---|---|
name |
str \| None |
The name of the project. If None , it defaults to the current directory name. |
None |
base_dir |
str \| None |
The base directory where the project will be created. If None , it defaults to the current working directory. |
None |
storage_options |
dict \| BaseStorageOptions \| None |
Storage options for the filesystem. | {} |
fs |
AbstractFileSystem \| None |
An instance of AbstractFileSystem to use for file operations. If None, uses the get_filesystem helper. |
None |
job_queue_type |
str |
The type of job queue to use for the project. | settings.JOB_QUEUE_TYPE |
hooks_dir |
str |
The directory where the project hooks will be stored. | settings.HOOKS_DIR |
log_level |
str \| None |
The logging level to set for the project. If None , it uses the default log level. |
None |
Returns: FlowerPowerProject
- An instance of FlowerPowerProject
initialized with the new project.
Raises: FileExistsError
: If the project already exists at the specified base directory.