Skip to content

Advanced Usage

This section delves into advanced features of flowerpower-io, covering performance optimization, integration with external systems, and specific database integrations like DuckDB and DataFusion.

1. Performance Optimization

flowerpower-io is designed for efficiency, but you can further optimize performance with these tips:

  • Batch Processing: For large datasets, consider processing data in batches to manage memory usage and improve throughput. flowerpower-io loaders and savers often support internal batching mechanisms.
  • Lazy Loading with Polars: When working with Polars, leverage LazyFrames (to_polars(lazy=True)) to defer computation until necessary, which can significantly reduce memory footprint and improve performance for complex transformations.
  • Efficient File Formats: Utilize binary file formats like Parquet over text-based formats like CSV for better performance due to columnar storage and compression.
  • Storage Options: Properly configure storage_options for remote storage (e.g., S3, GCS) to optimize network transfers and authentication.

2. Integration with External Systems

flowerpower-io can be integrated into larger data pipelines and workflows.

Message Queues (e.g., MQTT)

The PayloadReader can consume data from message queues, allowing flowerpower-io to act as a data ingress point for real-time data streams.

from flowerpower_io.loader import PayloadReader
import paho.mqtt.client as mqtt
import time

# This is a simplified example. In a real scenario, you'd handle
# MQTT client setup, connection, and message consumption more robustly.

# class MockMQTTClient:
#     def __init__(self):
#         self.payload = None
#     def subscribe(self, topic): pass
#     def on_message(self, client, userdata, msg):
#         self.payload = msg.payload.decode('utf-8')

# # Simulate an MQTT client and message
# mock_client = MockMQTTClient()
# # Simulate a message being received
# # In a real system, this would be triggered by an actual MQTT message
# # mock_client.on_message(None, None, type('obj', (object,), {'payload': b'{"sensor_id": "A1", "temperature": 25.5}'})())

# # payload_reader = PayloadReader(mqtt_client=mock_client)

# # # In a real application, you would continuously check for new payloads
# # # For this example, we'll just simulate one message
# # time.sleep(1) # Give some time for the "message" to arrive
# # if payload_reader.has_new_payload():
# #     data = payload_reader.read_payload()
# #     print("Received data from MQTT:", data)

Cloud Storage (S3, GCS, Azure Blob)

Utilize fsspec_utils for seamless interaction with various cloud storage providers.

# Example of reading from S3 (requires appropriate AWS credentials configured)
# from flowerpower_io.loader import ParquetFileReader

# s3_reader = ParquetFileReader(
#     path="s3://your-bucket/path/to/data.parquet",
#     storage_options={"key": "YOUR_ACCESS_KEY", "secret": "YOUR_SECRET_KEY"}
# )
# df_s3 = s3_reader.to_pandas()
# print("Data read from S3:")
# print(df_s3.head())

3. SQL Integration

flowerpower-io provides robust integration with SQL databases, including advanced features for querying and data manipulation.

DuckDB Integration

Leverage DuckDB for in-process analytical processing with SQL. flowerpower-io can read directly into or write from DuckDB.

import duckdb
import pandas as pd
from flowerpower_io.loader import DuckDBReader
from flowerpower_io.saver import DuckDBWriter
import os

# Create a dummy DuckDB file for demonstration
db_path = "my_duckdb.db"
conn = duckdb.connect(database=db_path, read_only=False)
conn.execute("CREATE TABLE users (id INTEGER, name VARCHAR)")
conn.execute("INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob')")
conn.close()

# Read from DuckDB
duckdb_reader = DuckDBReader(path=db_path, table_name="users")
df_users = duckdb_reader.to_pandas()
print("Users from DuckDB:")
print(df_users)

# Write to DuckDB
new_users = pd.DataFrame({"id": [3, 4], "name": ["Charlie", "Diana"]})
duckdb_writer = DuckDBWriter(path=db_path, table_name="users")
duckdb_writer.write(data=new_users, if_exists="append") # Append new users

# Verify appended data
conn = duckdb.connect(database=db_path, read_only=True)
print("\nAll users after append:")
print(conn.execute("SELECT * FROM users").fetchdf())
conn.close()

# Clean up
os.remove(db_path)

DataFusion Integration (via PyArrow)

flowerpower-io can work with data that DataFusion can process, often through PyArrow Tables. This allows for powerful query planning and execution.

# from flowerpower_io.loader import ParquetFileReader
# from pyarrow import dataset as pa_dataset
# from datafusion import SessionContext

# # Assuming you have a Parquet file
# # parquet_reader = ParquetFileReader(path="path/to/your/large_data.parquet")
# # arrow_table = parquet_reader.to_pyarrow_table()

# # # Create a DataFusion context
# # ctx = SessionContext()
# # ctx.register_record_batches("my_table", [arrow_table.to_batches()])

# # # Execute a query using DataFusion
# # result = ctx.sql("SELECT COUNT(*) FROM my_table").collect()
# # print("Count from DataFusion query:", result)

4. Customizing I/O Behavior

  • Custom Schemas: When writing, you can often provide a schema to ensure data types are correctly interpreted by the target system.
  • Error Handling: Implement robust error handling around flowerpower-io calls to manage file not found errors, permission issues, or data conversion problems.
  • Logging: Integrate flowerpower-io operations with your application's logging framework to monitor data flows and troubleshoot issues.

This section provides a glimpse into the advanced capabilities of flowerpower-io. By combining these features, you can build sophisticated and high-performance data processing solutions.