Skip to content

JobQueueManager

Module: flowerpower.job_queue.JobQueueManager

The JobQueueManager is an abstract base class that defines the interface for job queue operations in FlowerPower. It is responsible for enqueuing, scheduling, and managing jobs.

Initialization

init

__init__(self, type: str | None = None, name: str | None = None, base_dir: str | None = None, backend: BaseBackend | None = None, storage_options: dict | None = None, fs: AbstractFileSystem | None = None, **kwargs)

Initializes the JobQueueManager.

Parameter Type Description Default
type str \| None The type of job queue backend (e.g., "rq"). None
name str \| None The name of the scheduler. None
base_dir str \| None The base directory of the project. None
backend BaseBackend \| None A backend instance. None
storage_options dict \| None Storage options for the filesystem. None
fs AbstractFileSystem \| None An fsspec-compatible filesystem instance. None

Attributes

Attribute Type Description
is_worker_running bool Indicates if a worker is currently running.
is_scheduler_running bool Indicates if the scheduler is currently running.

Methods

enqueue_pipeline

enqueue_pipeline(self, name: str, *args, **kwargs)

Enqueues a pipeline for immediate execution.

Parameter Type Description
name str The name of the pipeline.
*args Any Positional arguments for the job.
**kwargs Any Keyword arguments for the job.

Returns: Job - The enqueued job object.

Raises: ValueError: If the pipeline name is invalid.

Example

1
2
3
4
5
from flowerpower.job_queue import JobQueueManager

# Assuming manager is an instance of a concrete JobQueueManager subclass
job = manager.enqueue_pipeline("my_data_pipeline", data_path="/data/new.csv")
print(f"Enqueued job: {job.id}")

schedule_pipeline

schedule_pipeline(self, name: str, *args, **kwargs)

Schedules a pipeline for future or recurring execution.

Parameter Type Description
name str The name of the pipeline.
*args Any Positional arguments for the job.
**kwargs Any Keyword arguments for the job (e.g., cron_string, interval).

Returns: ScheduledJob - The scheduled job object.

Raises: ValueError: If the pipeline name is invalid or scheduling parameters are insufficient.

Example

1
2
3
4
5
6
7
8
from flowerpower.job_queue import JobQueueManager

# Schedule a pipeline to run every day at midnight
scheduled_job = manager.schedule_pipeline(
    "daily_report_pipeline",
    cron_string="0 0 * * *"
)
print(f"Scheduled job: {scheduled_job.id}")

start_worker

start_worker(self, queue_name: str | list[str] | None = None, **kwargs)

Starts a worker process to process jobs from the queue.

Parameter Type Description
queue_name str \| list[str] \| None The name(s) of the queue(s) to listen to. Defaults to all queues.
**kwargs Any Additional keyword arguments for the worker.

Returns: None

Raises: RuntimeError: If the worker fails to start.

Example

1
2
3
4
5
6
7
from flowerpower.job_queue import JobQueueManager

# Start a worker for a specific queue
manager.start_worker("high_priority_queue")

# Start a worker for multiple queues
manager.start_worker(["default", "low_priority"])

stop_worker

stop_worker(self)

Stops the currently running worker process.

Returns: None

Raises: RuntimeError: If stopping the worker fails.

Example

1
2
3
from flowerpower.job_queue import JobQueueManager

manager.stop_worker()

start_worker_pool

start_worker_pool(self, num_workers: int = 1, queue_name: str | list[str] | None = None, **kwargs)

Starts a pool of worker processes.

Parameter Type Description
num_workers int The number of worker processes to start.
queue_name str \| list[str] \| None The name(s) of the queue(s) for the workers to listen to. Defaults to all queues.
**kwargs Any Additional keyword arguments for the worker processes.

Returns: None

Raises: RuntimeError: If the worker pool fails to start.

Example

1
2
3
4
from flowerpower.job_queue import JobQueueManager

# Start a pool of 4 workers
manager.start_worker_pool(num_workers=4)

stop_worker_pool

stop_worker_pool(self)

Stops all worker processes in the pool.

Returns: None

Raises: RuntimeError: If stopping the worker pool fails.

Example

1
2
3
from flowerpower.job_queue import JobQueueManager

manager.stop_worker_pool()

enqueue

enqueue(self, func: Callable, *args, **kwargs)

Enqueues a job for immediate, delayed, or scheduled execution.

Parameter Type Description
func Callable The function to execute.
*args Any Positional arguments for the function.
**kwargs Any Keyword arguments for the function and job (e.g., job_id, timeout).

Returns: Job - The enqueued job object.

Raises: ValueError: If func is not callable.

Example

1
2
3
4
5
6
7
from flowerpower.job_queue import JobQueueManager

def my_task(x, y):
    return x + y

job = manager.enqueue(my_task, 1, 2, job_id="my_sum_job")
print(f"Enqueued job: {job.id}")

enqueue_in

enqueue_in(self, delay: timedelta | int | str, func: Callable, *args, **kwargs)

Enqueues a job to run after a specified delay.

Parameter Type Description
delay timedelta | int | str The delay before execution. Can be a timedelta object, an integer (seconds), or a string (e.g., "1m" for 1 minute).
func Callable The function to execute.
*args Any Positional arguments for the function.
**kwargs Any Keyword arguments for the function and job.

Returns: Job - The enqueued job object.

Raises: ValueError: If delay is invalid or func is not callable.

Example

from flowerpower.job_queue import JobQueueManager
from datetime import timedelta

def send_notification(message):
    print(f"Notification: {message}")

# Enqueue a job to run in 5 minutes
job = manager.enqueue_in(timedelta(minutes=5), send_notification, "Your report is ready!")

# Enqueue a job to run in 30 seconds (integer delay)
job = manager.enqueue_in(30, send_notification, "Quick update!")

# Enqueue a job to run in 1 hour (string delay)
job = manager.enqueue_in("1h", send_notification, "Hourly reminder!")

enqueue_at

enqueue_at(self, datetime_obj: datetime, func: Callable, *args, **kwargs)

Enqueues a job to run at a specific datetime.

Parameter Type Description
datetime_obj datetime The datetime to execute the job.
func Callable The function to execute.
*args Any Positional arguments for the function.
**kwargs Any Keyword arguments for the function and job.

Returns: Job - The enqueued job object.

Raises: ValueError: If datetime_obj is in the past or func is not callable.

Example

1
2
3
4
5
6
7
8
9
from flowerpower.job_queue import JobQueueManager
from datetime import datetime

def generate_monthly_report(month, year):
    print(f"Generating report for {month}/{year}")

# Enqueue a job to run at a specific future date and time
target_time = datetime(2025, 1, 1, 9, 0, 0)
job = manager.enqueue_at(target_time, generate_monthly_report, 1, 2025)

add_schedule

add_schedule(self, id: str, func: Callable, cron_string: str | None = None, interval: int | None = None, repeat: int | None = None, enabled: bool = True, **kwargs)

Schedules a job for repeated or one-time execution.

Parameter Type Description
id str A unique identifier for the scheduled job.
func Callable The function to execute.
cron_string str | None A cron string for recurring schedules (e.g., "0 0 * * *" for daily at midnight).
interval int | None Interval in seconds for recurring schedules.
repeat int | None Number of times to repeat the job. None for infinite.
enabled bool Whether the schedule is active.
**kwargs Any Additional keyword arguments for the function and job.

Returns: ScheduledJob - The scheduled job object.

Raises: ValueError: If scheduling parameters are invalid or insufficient.

Example

from flowerpower.job_queue import JobQueueManager

def clean_temp_files():
    print("Cleaning temporary files...")

# Schedule a job to clean temp files every hour
scheduled_job = manager.add_schedule(
    id="hourly_cleanup",
    func=clean_temp_files,
    interval=3600 # Every hour
)

# Schedule a job using a cron string (every Monday at 9 AM)
scheduled_job = manager.add_schedule(
    id="weekly_summary",
    func=lambda: print("Generating weekly summary..."),
    cron_string="0 9 * * MON"
)

get_job_result

get_job_result(self, job: str | Job, delete_result: bool = False)

Gets the result of a completed job.

Parameter Type Description
job str | Job The job ID or Job object.
delete_result bool If True, deletes the result after retrieval.

Returns: Any - The result of the job execution.

Raises:

  • JobNotFinishedError: If the job has not completed yet.
  • JobDoesNotExistError: If the job ID is not found.

Example

1
2
3
4
5
from flowerpower.job_queue import JobQueueManager

# Assuming 'my_job_id' is the ID of a completed job
result = manager.get_job_result("my_job_id")
print(f"Job result: {result}")

get_jobs

get_jobs(self, queue_name: str | list[str] | None = None)

Gets all jobs from specified queues.

Parameter Type Description
queue_name str | list[str] | None The name of the queue(s). Defaults to all queues.

Returns: list[Job] - A list of job objects.

Example

1
2
3
4
5
6
7
from flowerpower.job_queue import JobQueueManager

# Get all jobs from the default queue
all_jobs = manager.get_jobs("default")

# Get jobs from multiple queues
priority_jobs = manager.get_jobs(["high_priority", "medium_priority"])

get_schedules

get_schedules(self, id: str | list[str] | None = None)

Gets all schedules from the scheduler.

Parameter Type Description
id str | list[str] | None The ID(s) of the schedule(s). Defaults to all schedules.

Returns: list[ScheduledJob] - A list of scheduled job objects.

Example

1
2
3
4
5
6
7
from flowerpower.job_queue import JobQueueManager

# Get all active schedules
all_schedules = manager.get_schedules()

# Get a specific schedule
my_schedule = manager.get_schedules(id="hourly_cleanup")