Skip to content

fsspeckit.datasets API Reference

datasets

Dataset-level operations for fsspeckit.

This package contains dataset-specific functionality including: - DuckDB parquet handlers for high-performance dataset operations - PyArrow utilities for schema management and type conversion - Dataset merging and optimization tools

Classes

fsspeckit.datasets.DuckDBParquetHandler

DuckDBParquetHandler(
    storage_options: Optional[
        Union[BaseStorageOptions, dict]
    ] = None,
    filesystem: Optional[AbstractFileSystem] = None,
)

Bases: DuckDBDatasetIO

Backward compatibility wrapper for DuckDBParquetHandler.

This class has been refactored into: - DuckDBConnection: for connection management - DuckDBDatasetIO: for dataset I/O operations

For new code, consider using DuckDBConnection and DuckDBDatasetIO directly.

Initialize DuckDB parquet handler.

Parameters:

Name Type Description Default
storage_options Optional[Union[BaseStorageOptions, dict]]

Storage configuration options (deprecated)

None
filesystem Optional[AbstractFileSystem]

Filesystem instance (deprecated)

None
Source code in src/fsspeckit/datasets/duckdb.py
def __init__(
    self,
    storage_options: Optional[Union[BaseStorageOptions, dict]] = None,
    filesystem: Optional[AbstractFileSystem] = None,
):
    """Initialize DuckDB parquet handler.

    Args:
        storage_options: Storage configuration options (deprecated)
        filesystem: Filesystem instance (deprecated)
    """
    from fsspeckit.datasets.duckdb_connection import create_duckdb_connection

    # Create connection from filesystem
    self._connection = create_duckdb_connection(filesystem=filesystem)

    # Initialize the IO handler
    super().__init__(self._connection)
Functions
fsspeckit.datasets.DuckDBParquetHandler.__del__
__del__()

Destructor (deprecated).

Source code in src/fsspeckit/datasets/duckdb.py
def __del__(self):
    """Destructor (deprecated)."""
    self.close()
fsspeckit.datasets.DuckDBParquetHandler.__enter__
__enter__()

Enter context manager (deprecated).

Source code in src/fsspeckit/datasets/duckdb.py
def __enter__(self):
    """Enter context manager (deprecated)."""
    return self
fsspeckit.datasets.DuckDBParquetHandler.__exit__
__exit__(exc_type, exc_val, exc_tb)

Exit context manager (deprecated).

Source code in src/fsspeckit/datasets/duckdb.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Exit context manager (deprecated)."""
    self.close()
fsspeckit.datasets.DuckDBParquetHandler.close
close()

Close connection (deprecated, use connection.close).

Source code in src/fsspeckit/datasets/duckdb.py
def close(self):
    """Close connection (deprecated, use connection.close)."""
    self._connection.close()
fsspeckit.datasets.DuckDBParquetHandler.execute_sql
execute_sql(query: str, parameters=None)

Execute SQL query (deprecated, use connection.execute_sql).

Source code in src/fsspeckit/datasets/duckdb.py
def execute_sql(self, query: str, parameters=None):
    """Execute SQL query (deprecated, use connection.execute_sql)."""
    return self._connection.execute_sql(query, parameters)

Functions

fsspeckit.datasets.cast_schema

cast_schema(table: Table, schema: Schema) -> Table

Cast a PyArrow table to a target schema.

Parameters:

Name Type Description Default
table Table

Source table

required
schema Schema

Target schema

required

Returns:

Type Description
Table

Table cast to target schema

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def cast_schema(table: pa.Table, schema: pa.Schema) -> pa.Table:
    """Cast a PyArrow table to a target schema.

    Args:
        table: Source table
        schema: Target schema

    Returns:
        Table cast to target schema
    """
    # Filter schema to only include columns present in the table
    table_schema = table.schema
    valid_fields = []

    for field in schema:
        if field.name in table_schema.names:
            valid_fields.append(field)

    target_schema = pa.schema(valid_fields)

    # Cast the table
    return table.cast(target_schema)

fsspeckit.datasets.collect_dataset_stats_pyarrow

collect_dataset_stats_pyarrow(
    path: str,
    filesystem: AbstractFileSystem | None = None,
    partition_filter: list[str] | None = None,
) -> dict[str, Any]

Collect file-level statistics for a parquet dataset using shared core logic.

This function delegates to the shared fsspeckit.core.maintenance.collect_dataset_stats function, ensuring consistent dataset discovery and statistics across both DuckDB and PyArrow backends.

The helper walks the given dataset directory on the provided filesystem, discovers parquet files (recursively), and returns basic statistics:

  • Per-file path, size in bytes, and number of rows
  • Aggregated total bytes and total rows

The function is intentionally streaming/metadata-driven and never materializes the full dataset as a single :class:pyarrow.Table.

Parameters:

Name Type Description Default
path str

Root directory of the parquet dataset.

required
filesystem AbstractFileSystem | None

Optional fsspec filesystem. If omitted, a local "file" filesystem is used.

None
partition_filter list[str] | None

Optional list of partition prefix filters (e.g. ["date=2025-11-04"]). Only files whose path relative to path starts with one of these prefixes are included.

None

Returns:

Type Description
dict[str, Any]

Dict with keys:

dict[str, Any]
  • files: list of {"path", "size_bytes", "num_rows"} dicts
dict[str, Any]
  • total_bytes: sum of file sizes
dict[str, Any]
  • total_rows: sum of row counts

Raises:

Type Description
FileNotFoundError

If the path does not exist or no parquet files match the optional partition filter.

Note

This is a thin wrapper around the shared core function. See :func:fsspeckit.core.maintenance.collect_dataset_stats for the authoritative implementation.

Source code in src/fsspeckit/datasets/pyarrow_dataset.py
def collect_dataset_stats_pyarrow(
    path: str,
    filesystem: AbstractFileSystem | None = None,
    partition_filter: list[str] | None = None,
) -> dict[str, Any]:
    """Collect file-level statistics for a parquet dataset using shared core logic.

    This function delegates to the shared ``fsspeckit.core.maintenance.collect_dataset_stats``
    function, ensuring consistent dataset discovery and statistics across both DuckDB
    and PyArrow backends.

    The helper walks the given dataset directory on the provided filesystem,
    discovers parquet files (recursively), and returns basic statistics:

    - Per-file path, size in bytes, and number of rows
    - Aggregated total bytes and total rows

    The function is intentionally streaming/metadata-driven and never
    materializes the full dataset as a single :class:`pyarrow.Table`.

    Args:
        path: Root directory of the parquet dataset.
        filesystem: Optional fsspec filesystem. If omitted, a local "file"
            filesystem is used.
        partition_filter: Optional list of partition prefix filters
            (e.g. ["date=2025-11-04"]). Only files whose path relative to
            ``path`` starts with one of these prefixes are included.

    Returns:
        Dict with keys:

        - ``files``: list of ``{"path", "size_bytes", "num_rows"}`` dicts
        - ``total_bytes``: sum of file sizes
        - ``total_rows``: sum of row counts

    Raises:
        FileNotFoundError: If the path does not exist or no parquet files
            match the optional partition filter.

    Note:
        This is a thin wrapper around the shared core function. See
        :func:`fsspeckit.core.maintenance.collect_dataset_stats` for the
        authoritative implementation.
    """
    from fsspeckit.core.maintenance import collect_dataset_stats

    return collect_dataset_stats(
        path=path,
        filesystem=filesystem,
        partition_filter=partition_filter,
    )

fsspeckit.datasets.compact_parquet_dataset_pyarrow

compact_parquet_dataset_pyarrow(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    dry_run: bool = False,
    filesystem: AbstractFileSystem | None = None,
) -> dict[str, Any]

Compact a parquet dataset directory into fewer larger files using PyArrow and shared planning.

Groups small files based on size (MB) and/or row thresholds, rewrites grouped files into new parquet files, and optionally changes compression. Supports a dry-run mode that returns the compaction plan without modifying files.

The implementation uses the shared core planning algorithm for consistent behavior across backends. It processes data in a group-based, streaming fashion: it reads only the files in a given group into memory when processing that group and never materializes the entire dataset as a single table.

Parameters:

Name Type Description Default
path str

Dataset root directory (local path or fsspec URL).

required
target_mb_per_file int | None

Optional max output size per file; must be > 0.

None
target_rows_per_file int | None

Optional max rows per output file; must be > 0.

None
partition_filter list[str] | None

Optional list of partition prefixes (e.g. ["date=2025-11-15"]) used to limit both stats collection and rewrites to matching paths.

None
compression str | None

Optional parquet compression codec; defaults to "snappy".

None
dry_run bool

When True the function returns a plan + before/after stats without reading or writing any parquet data.

False
filesystem AbstractFileSystem | None

Optional fsspec.AbstractFileSystem to reuse existing FS clients.

None

Returns:

Type Description
dict[str, Any]

A stats dictionary describing before/after file counts, total bytes,

dict[str, Any]

rewritten bytes, and optional planned_groups when dry_run is enabled.

dict[str, Any]

The structure follows the canonical MaintenanceStats format from the shared core.

Raises:

Type Description
ValueError

If thresholds are invalid or no files match partition filter.

FileNotFoundError

If the path does not exist.

Example
1
2
3
4
5
6
7
result = compact_parquet_dataset_pyarrow(
    "/path/to/dataset",
    target_mb_per_file=64,
    dry_run=True,
)
print(f"Files before: {result['before_file_count']}")
print(f"Files after: {result['after_file_count']}")
Note

This function delegates dataset discovery and compaction planning to the shared fsspeckit.core.maintenance module, ensuring consistent behavior across DuckDB and PyArrow backends.

Source code in src/fsspeckit/datasets/pyarrow_dataset.py
def compact_parquet_dataset_pyarrow(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    dry_run: bool = False,
    filesystem: AbstractFileSystem | None = None,
) -> dict[str, Any]:
    """Compact a parquet dataset directory into fewer larger files using PyArrow and shared planning.

    Groups small files based on size (MB) and/or row thresholds, rewrites grouped
    files into new parquet files, and optionally changes compression. Supports a
    dry-run mode that returns the compaction plan without modifying files.

    The implementation uses the shared core planning algorithm for consistent
    behavior across backends. It processes data in a group-based, streaming fashion:
    it reads only the files in a given group into memory when processing that group
    and never materializes the entire dataset as a single table.

    Args:
        path: Dataset root directory (local path or fsspec URL).
        target_mb_per_file: Optional max output size per file; must be > 0.
        target_rows_per_file: Optional max rows per output file; must be > 0.
        partition_filter: Optional list of partition prefixes (e.g. ``["date=2025-11-15"]``)
            used to limit both stats collection and rewrites to matching paths.
        compression: Optional parquet compression codec; defaults to ``"snappy"``.
        dry_run: When ``True`` the function returns a plan + before/after stats
            without reading or writing any parquet data.
        filesystem: Optional ``fsspec.AbstractFileSystem`` to reuse existing FS clients.

    Returns:
        A stats dictionary describing before/after file counts, total bytes,
        rewritten bytes, and optional ``planned_groups`` when ``dry_run`` is enabled.
        The structure follows the canonical ``MaintenanceStats`` format from the shared core.

    Raises:
        ValueError: If thresholds are invalid or no files match partition filter.
        FileNotFoundError: If the path does not exist.

    Example:
        ```python
        result = compact_parquet_dataset_pyarrow(
            "/path/to/dataset",
            target_mb_per_file=64,
            dry_run=True,
        )
        print(f"Files before: {result['before_file_count']}")
        print(f"Files after: {result['after_file_count']}")
        ```

    Note:
        This function delegates dataset discovery and compaction planning to the
        shared ``fsspeckit.core.maintenance`` module, ensuring consistent behavior
        across DuckDB and PyArrow backends.
    """
    from fsspeckit.core.maintenance import plan_compaction_groups, MaintenanceStats

    # Get dataset stats using shared logic
    stats = collect_dataset_stats_pyarrow(
        path=path, filesystem=filesystem, partition_filter=partition_filter
    )
    files = stats["files"]

    # Use shared compaction planning
    plan_result = plan_compaction_groups(
        file_infos=files,
        target_mb_per_file=target_mb_per_file,
        target_rows_per_file=target_rows_per_file,
    )

    groups = plan_result["groups"]
    planned_stats = plan_result["planned_stats"]

    # Update planned stats with compression info
    planned_stats.compression_codec = compression
    planned_stats.dry_run = dry_run

    # If dry run, return the plan
    if dry_run:
        result = planned_stats.to_dict()
        result["planned_groups"] = groups
        return result

    # Execute compaction
    if not groups:
        return planned_stats.to_dict()

    # Execute the compaction
    for group in groups:
        # Read all files in this group
        tables = []
        for file_info in group["files"]:
            file_path = file_info["path"]
            table = pq.read_table(
                file_path,
                filesystem=filesystem,
            )
            tables.append(table)

        # Concatenate tables
        if len(tables) > 1:
            combined = pa.concat_tables(tables, promote_options="permissive")
        else:
            combined = tables[0]

        # Write to output file
        output_path = group["output_path"]
        pq.write_table(
            combined,
            output_path,
            filesystem=filesystem,
            compression=compression or "snappy",
        )

    # Remove original files
    for group in groups:
        for file_info in group["files"]:
            file_path = file_info["path"]
            filesystem.rm(file_path)

    return planned_stats.to_dict()

fsspeckit.datasets.convert_large_types_to_normal

convert_large_types_to_normal(schema: Schema) -> Schema

Convert large types (like large_string) to normal types.

Parameters:

Name Type Description Default
schema Schema

PyArrow schema

required

Returns:

Type Description
Schema

Schema with large types converted

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def convert_large_types_to_normal(schema: pa.Schema) -> pa.Schema:
    """Convert large types (like large_string) to normal types.

    Args:
        schema: PyArrow schema

    Returns:
        Schema with large types converted
    """
    fields = []
    for field in schema:
        if pa.types.is_large_string(field.type):
            field = field.with_type(pa.string())
        elif pa.types.is_large_utf8(field.type):
            field = field.with_type(pa.utf8())
        elif pa.types.is_large_list(field.type):
            field = field.with_type(pa.list_(field.type.value_type))
        fields.append(field)

    return pa.schema(fields)

fsspeckit.datasets.opt_dtype_pa

opt_dtype_pa(
    table: Table,
    strict: bool = False,
    columns: list[str] | None = None,
) -> Table

Optimize dtypes in a PyArrow table based on data analysis.

This function analyzes the data in each column and attempts to downcast to more appropriate types (e.g., int64 -> int32, float64 -> float32, string -> int/bool where applicable).

Parameters:

Name Type Description Default
table Table

PyArrow table to optimize

required
strict bool

Whether to use strict type checking

False
columns list[str] | None

List of columns to optimize (None for all)

None

Returns:

Type Description
Table

Table with optimized dtypes

Example
import pyarrow as pa

table = pa.table(
    {
        "a": pa.array([1, 2, 3], type=pa.int64()),
        "b": pa.array([1.0, 2.0, 3.0], type=pa.float64()),
    },
)
optimized = opt_dtype(table)
print(optimized.column(0).type)  # DataType(int32)
print(optimized.column(1).type)  # DataType(float32)
Source code in src/fsspeckit/datasets/pyarrow_schema.py
def opt_dtype(
    table: pa.Table,
    strict: bool = False,
    columns: list[str] | None = None,
) -> pa.Table:
    """Optimize dtypes in a PyArrow table based on data analysis.

    This function analyzes the data in each column and attempts to downcast
    to more appropriate types (e.g., int64 -> int32, float64 -> float32,
    string -> int/bool where applicable).

    Args:
        table: PyArrow table to optimize
        strict: Whether to use strict type checking
        columns: List of columns to optimize (None for all)

    Returns:
        Table with optimized dtypes

    Example:
        ```python
        import pyarrow as pa

        table = pa.table(
            {
                "a": pa.array([1, 2, 3], type=pa.int64()),
                "b": pa.array([1.0, 2.0, 3.0], type=pa.float64()),
            },
        )
        optimized = opt_dtype(table)
        print(optimized.column(0).type)  # DataType(int32)
        print(optimized.column(1).type)  # DataType(float32)
        ```
    """
    from fsspeckit.common.misc import run_parallel

    if columns is None:
        columns = table.column_names

    # Process columns in parallel
    results = run_parallel(
        _process_column_for_opt_dtype,
        [(table, col, strict) for col in columns],
        backend="threading",
        n_jobs=-1,
    )

    # Build new table with optimized columns
    new_columns = {}
    for col_name, optimized_array in results:
        new_columns[col_name] = optimized_array

    # Keep non-optimized columns as-is
    for col_name in table.column_names:
        if col_name not in new_columns:
            new_columns[col_name] = table.column(col_name)

    return pa.table(new_columns)

fsspeckit.datasets.optimize_parquet_dataset_pyarrow

optimize_parquet_dataset_pyarrow(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    filesystem: AbstractFileSystem | None = None,
    verbose: bool = False,
) -> dict[str, Any]

Optimize a parquet dataset through compaction and optional statistics recalculation.

This is a convenience function that combines compaction with optional statistics recalculation. It's particularly useful after many small write operations have created a large number of small files.

Parameters:

Name Type Description Default
path str

Dataset root directory

required
target_mb_per_file int | None

Target size per file in MB

None
target_rows_per_file int | None

Target rows per file

None
partition_filter list[str] | None

Optional partition filters

None
compression str | None

Compression codec to use

None
filesystem AbstractFileSystem | None

Optional filesystem instance

None
verbose bool

Print progress information

False

Returns:

Type Description
dict[str, Any]

Optimization statistics

Example
1
2
3
4
5
6
7
8
9
stats = optimize_parquet_dataset_pyarrow(
    "dataset/",
    target_mb_per_file=64,
    compression="zstd",
)
print(
    f"Reduced from {stats['before_file_count']} "
    f"to {stats['after_file_count']} files",
)
Source code in src/fsspeckit/datasets/pyarrow_dataset.py
def optimize_parquet_dataset_pyarrow(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    filesystem: AbstractFileSystem | None = None,
    verbose: bool = False,
) -> dict[str, Any]:
    """Optimize a parquet dataset through compaction and optional statistics recalculation.

    This is a convenience function that combines compaction with optional statistics
    recalculation. It's particularly useful after many small write operations have
    created a large number of small files.

    Args:
        path: Dataset root directory
        target_mb_per_file: Target size per file in MB
        target_rows_per_file: Target rows per file
        partition_filter: Optional partition filters
        compression: Compression codec to use
        filesystem: Optional filesystem instance
        verbose: Print progress information

    Returns:
        Optimization statistics

    Example:
        ```python
        stats = optimize_parquet_dataset_pyarrow(
            "dataset/",
            target_mb_per_file=64,
            compression="zstd",
        )
        print(
            f"Reduced from {stats['before_file_count']} "
            f"to {stats['after_file_count']} files",
        )
        ```
    """
    # Use compaction
    result = compact_parquet_dataset_pyarrow(
        path=path,
        target_mb_per_file=target_mb_per_file,
        target_rows_per_file=target_rows_per_file,
        partition_filter=partition_filter,
        compression=compression,
        dry_run=False,
        filesystem=filesystem,
    )

    if verbose:
        logger.info("Optimization complete: %s", result)

    return result

fsspeckit.datasets.unify_schemas_pa

unify_schemas_pa(
    schemas: list[Schema],
    standardize_timezones: bool = True,
    verbose: bool = False,
) -> Schema

Unify multiple PyArrow schemas into a single schema.

This function handles type conflicts by: 1. Finding fields with conflicting types 2. Attempting to normalize compatible types 3. Using fallback strategies for incompatible types 4. Removing problematic fields if necessary

Parameters:

Name Type Description Default
schemas list[Schema]

List of schemas to unify

required
standardize_timezones bool

Whether to standardize timezone info

True
verbose bool

Whether to print conflict information

False

Returns:

Type Description
Schema

Unified PyArrow schema

Raises:

Type Description
ValueError

If schemas cannot be unified

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def unify_schemas(
    schemas: list[pa.Schema],
    standardize_timezones: bool = True,
    verbose: bool = False,
) -> pa.Schema:
    """Unify multiple PyArrow schemas into a single schema.

    This function handles type conflicts by:
    1. Finding fields with conflicting types
    2. Attempting to normalize compatible types
    3. Using fallback strategies for incompatible types
    4. Removing problematic fields if necessary

    Args:
        schemas: List of schemas to unify
        standardize_timezones: Whether to standardize timezone info
        verbose: Whether to print conflict information

    Returns:
        Unified PyArrow schema

    Raises:
        ValueError: If schemas cannot be unified
    """
    if not schemas:
        raise ValueError("Cannot unify empty list of schemas")

    if len(schemas) == 1:
        return schemas[0]

    # Remove duplicate schemas
    schemas = _unique_schemas(schemas)

    # Standardize timezones if requested
    if standardize_timezones:
        schemas = standardize_schema_timezones(schemas, standardize_timezones)

    # Find conflicts
    conflicts = _find_conflicting_fields(schemas)

    if not conflicts:
        # No conflicts, concatenate all fields
        all_fields = []
        for schema in schemas:
            all_fields.extend(schema)
        return pa.schema(all_fields)

    if verbose:
        _log_conflict_summary(conflicts, verbose)

    # Try to normalize types
    try:
        normalized = _normalize_schema_types(schemas, conflicts)

        # Check if normalization resolved conflicts
        remaining_conflicts = _find_conflicting_fields(normalized)

        if not remaining_conflicts:
            # Normalization successful
            all_fields = []
            for schema in normalized:
                all_fields.extend(schema)
            return pa.schema(all_fields)

        # Fall through to next strategy
        conflicts = remaining_conflicts

    except (pa.ArrowInvalid, pa.ArrowTypeError, pa.ArrowNotImplementedError) as e:
        # Normalization failed, log and continue to fallback
        logger.debug(
            "Schema type normalization failed: %s. Trying aggressive fallback.",
            str(e)
        )

    # Try aggressive fallback
    try:
        return _aggressive_fallback_unification(schemas)
    except (pa.ArrowInvalid, pa.ArrowTypeError, pa.ArrowNotImplementedError) as e:
        # Aggressive fallback failed, log and try last resort
        logger.debug(
            "Aggressive fallback unification failed: %s. Trying last resort cleanup.",
            str(e)
        )

    # Last resort: remove problematic fields
    cleaned = _remove_problematic_fields(schemas)
    all_fields = []
    for schema in cleaned:
        all_fields.extend(schema)

    if verbose and conflicts:
        logger.debug("Removed %d conflicting fields during unification", len(conflicts))

    return pa.schema(all_fields)

Modules

fsspeckit.datasets.duckdb

Re-export module for backward compatibility.

This module has been decomposed into focused submodules: - duckdb_connection: Connection management and filesystem registration - duckdb_dataset: Dataset I/O and maintenance operations

All public APIs are re-exported here to maintain backward compatibility. New code should import directly from the submodules for better organization.

Classes
fsspeckit.datasets.duckdb.DuckDBConnection
DuckDBConnection(
    filesystem: AbstractFileSystem | None = None,
)

Manages DuckDB connection lifecycle and filesystem registration.

This class is responsible for: - Creating and managing DuckDB connections - Registering fsspec filesystems with DuckDB - Connection cleanup

Parameters:

Name Type Description Default
filesystem AbstractFileSystem | None

fsspec filesystem instance to use

None

Initialize DuckDB connection manager.

Parameters:

Name Type Description Default
filesystem AbstractFileSystem | None

Filesystem to use. Defaults to local filesystem.

None
Source code in src/fsspeckit/datasets/duckdb_connection.py
def __init__(self, filesystem: AbstractFileSystem | None = None) -> None:
    """Initialize DuckDB connection manager.

    Args:
        filesystem: Filesystem to use. Defaults to local filesystem.
    """
    self._connection: duckdb.DuckDBPyConnection | None = None
    self._filesystem = filesystem or fsspec_filesystem("file")
Attributes
fsspeckit.datasets.duckdb.DuckDBConnection.connection property
connection: Any

Get active DuckDB connection, creating it if necessary.

Returns:

Type Description
Any

Active DuckDB connection

fsspeckit.datasets.duckdb.DuckDBConnection.filesystem property
filesystem: AbstractFileSystem

Get the filesystem instance.

Returns:

Type Description
AbstractFileSystem

Filesystem instance

Functions
fsspeckit.datasets.duckdb.DuckDBConnection.__del__
__del__() -> None

Destructor to ensure connection is closed.

Source code in src/fsspeckit/datasets/duckdb_connection.py
def __del__(self) -> None:
    """Destructor to ensure connection is closed."""
    self.close()
fsspeckit.datasets.duckdb.DuckDBConnection.__enter__
__enter__() -> 'DuckDBConnection'

Enter context manager.

Returns:

Type Description
'DuckDBConnection'

self

Source code in src/fsspeckit/datasets/duckdb_connection.py
def __enter__(self) -> "DuckDBConnection":
    """Enter context manager.

    Returns:
        self
    """
    return self
fsspeckit.datasets.duckdb.DuckDBConnection.__exit__
__exit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Exit context manager and close connection.

Source code in src/fsspeckit/datasets/duckdb_connection.py
def __exit__(
    self,
    exc_type: Any,
    exc_val: Any,
    exc_tb: Any,
) -> None:
    """Exit context manager and close connection."""
    self.close()
fsspeckit.datasets.duckdb.DuckDBConnection.close
close() -> None

Close the connection and clean up resources.

Source code in src/fsspeckit/datasets/duckdb_connection.py
def close(self) -> None:
    """Close the connection and clean up resources."""
    if self._connection is not None:
        try:
            self._connection.close()
        except (_DUCKDB_EXCEPTIONS.get("ConnectionException"), _DUCKDB_EXCEPTIONS.get("OperationalException")) as e:
            logger.warning("Error closing DuckDB connection: %s", e)
        finally:
            self._connection = None
fsspeckit.datasets.duckdb.DuckDBConnection.execute_sql
execute_sql(
    query: str, parameters: list[Any] | None = None
) -> Any

Execute a SQL query.

Parameters:

Name Type Description Default
query str

SQL query to execute

required
parameters list[Any] | None

Optional query parameters

None

Returns:

Type Description
Any

Query result

Source code in src/fsspeckit/datasets/duckdb_connection.py
def execute_sql(
    self,
    query: str,
    parameters: list[Any] | None = None,
) -> Any:
    """Execute a SQL query.

    Args:
        query: SQL query to execute
        parameters: Optional query parameters

    Returns:
        Query result
    """
    conn = self.connection

    if parameters:
        return conn.execute(query, parameters)
    else:
        return conn.execute(query)
fsspeckit.datasets.duckdb.DuckDBDatasetIO
DuckDBDatasetIO(connection: DuckDBConnection)

DuckDB-based dataset I/O operations.

This class provides methods for reading and writing parquet files and datasets using DuckDB's high-performance parquet engine.

Parameters:

Name Type Description Default
connection DuckDBConnection

DuckDB connection manager

required

Initialize DuckDB dataset I/O.

Parameters:

Name Type Description Default
connection DuckDBConnection

DuckDB connection manager

required
Source code in src/fsspeckit/datasets/duckdb_dataset.py
def __init__(self, connection: DuckDBConnection) -> None:
    """Initialize DuckDB dataset I/O.

    Args:
        connection: DuckDB connection manager
    """
    self._connection = connection
Functions
fsspeckit.datasets.duckdb.DuckDBDatasetIO.compact_parquet_dataset
compact_parquet_dataset(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    dry_run: bool = False,
    verbose: bool = False,
) -> dict[str, Any]

Compact a parquet dataset using DuckDB.

Parameters:

Name Type Description Default
path str

Dataset path

required
target_mb_per_file int | None

Target size per file

None
target_rows_per_file int | None

Target rows per file

None
partition_filter list[str] | None

Optional partition filters

None
compression str | None

Compression codec

None
dry_run bool

Whether to perform a dry run

False
verbose bool

Print progress information

False

Returns:

Type Description
dict[str, Any]

Compaction statistics

Source code in src/fsspeckit/datasets/duckdb_dataset.py
def compact_parquet_dataset(
    self,
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    dry_run: bool = False,
    verbose: bool = False,
) -> dict[str, Any]:
    """Compact a parquet dataset using DuckDB.

    Args:
        path: Dataset path
        target_mb_per_file: Target size per file
        target_rows_per_file: Target rows per file
        partition_filter: Optional partition filters
        compression: Compression codec
        dry_run: Whether to perform a dry run
        verbose: Print progress information

    Returns:
        Compaction statistics
    """
    from fsspeckit.core.maintenance import plan_compaction_groups, MaintenanceStats

    # Collect stats
    stats = self._collect_dataset_stats(path, partition_filter)
    files = stats["files"]

    # Plan compaction
    plan_result = plan_compaction_groups(
        file_infos=files,
        target_mb_per_file=target_mb_per_file,
        target_rows_per_file=target_rows_per_file,
    )

    groups = plan_result["groups"]
    planned_stats = plan_result["planned_stats"]

    if dry_run:
        result = planned_stats.to_dict()
        result["planned_groups"] = groups
        return result

    # Execute compaction
    if not groups:
        return planned_stats.to_dict()

    conn = self._connection.connection

    for group in groups:
        # Read all files in this group into DuckDB
        tables = []
        for file_info in group["files"]:
            file_path = file_info["path"]
            table = conn.execute(
                f"SELECT * FROM parquet_scan('{file_path}')"
            ).fetch_arrow_table()
            tables.append(table)

        # Concatenate tables
        if len(tables) > 1:
            combined = pa.concat_tables(tables, promote_options="permissive")
        else:
            combined = tables[0]

        # Write to output
        output_path = group["output_path"]
        self.write_parquet(combined, output_path, compression=compression)

    # Remove original files
    for group in groups:
        for file_info in group["files"]:
            file_path = file_info["path"]
            self._connection.filesystem.rm(file_path)

    return planned_stats.to_dict()
fsspeckit.datasets.duckdb.DuckDBDatasetIO.merge_parquet_dataset
merge_parquet_dataset(
    sources: list[str],
    output_path: str,
    target: str | None = None,
    strategy: str | MergeStrategy = "deduplicate",
    key_columns: list[str] | str | None = None,
    compression: str | None = None,
    verbose: bool = False,
    **kwargs: Any,
) -> MergeStats

Merge multiple parquet datasets using DuckDB.

Parameters:

Name Type Description Default
sources list[str]

List of source dataset paths

required
output_path str

Path for merged output

required
target str | None

Target dataset path (for upsert/update strategies)

None
strategy str | MergeStrategy

Merge strategy to use

'deduplicate'
key_columns list[str] | str | None

Key columns for merging

None
compression str | None

Output compression codec

None
verbose bool

Print progress information

False
**kwargs Any

Additional arguments

{}

Returns:

Type Description
MergeStats

MergeStats with merge statistics

Example
from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

conn = create_duckdb_connection()
io = DuckDBDatasetIO(conn)
stats = io.merge_parquet_dataset(
    sources=["dataset1/", "dataset2/"],
    output_path="merged/",
    strategy="deduplicate",
    key_columns=["id"],
)
Source code in src/fsspeckit/datasets/duckdb_dataset.py
def merge_parquet_dataset(
    self,
    sources: list[str],
    output_path: str,
    target: str | None = None,
    strategy: str | CoreMergeStrategy = "deduplicate",
    key_columns: list[str] | str | None = None,
    compression: str | None = None,
    verbose: bool = False,
    **kwargs: Any,
) -> MergeStats:
    """Merge multiple parquet datasets using DuckDB.

    Args:
        sources: List of source dataset paths
        output_path: Path for merged output
        target: Target dataset path (for upsert/update strategies)
        strategy: Merge strategy to use
        key_columns: Key columns for merging
        compression: Output compression codec
        verbose: Print progress information
        **kwargs: Additional arguments

    Returns:
        MergeStats with merge statistics

    Example:
        ```python
        from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
        from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

        conn = create_duckdb_connection()
        io = DuckDBDatasetIO(conn)
        stats = io.merge_parquet_dataset(
            sources=["dataset1/", "dataset2/"],
            output_path="merged/",
            strategy="deduplicate",
            key_columns=["id"],
        )
        ```
    """
    # Validate inputs using shared core logic
    validate_merge_inputs(
        sources=sources,
        strategy=strategy,
        key_columns=key_columns,
        target=target,
    )

    validate_strategy_compatibility(strategy, key_columns, target)

    # Normalize parameters
    if key_columns is not None:
        key_columns = normalize_key_columns(key_columns)

    # Process merge using DuckDB
    result = self._execute_merge_strategy(
        sources=sources,
        output_path=output_path,
        target=target,
        strategy=strategy,
        key_columns=key_columns,
        compression=compression,
        verbose=verbose,
    )

    return result
fsspeckit.datasets.duckdb.DuckDBDatasetIO.optimize_parquet_dataset
optimize_parquet_dataset(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    verbose: bool = False,
) -> dict[str, Any]

Optimize a parquet dataset.

Parameters:

Name Type Description Default
path str

Dataset path

required
target_mb_per_file int | None

Target size per file

None
target_rows_per_file int | None

Target rows per file

None
partition_filter list[str] | None

Optional partition filters

None
compression str | None

Compression codec

None
verbose bool

Print progress information

False

Returns:

Type Description
dict[str, Any]

Optimization statistics

Source code in src/fsspeckit/datasets/duckdb_dataset.py
def optimize_parquet_dataset(
    self,
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    verbose: bool = False,
) -> dict[str, Any]:
    """Optimize a parquet dataset.

    Args:
        path: Dataset path
        target_mb_per_file: Target size per file
        target_rows_per_file: Target rows per file
        partition_filter: Optional partition filters
        compression: Compression codec
        verbose: Print progress information

    Returns:
        Optimization statistics
    """
    # Use compaction for optimization
    result = self.compact_parquet_dataset(
        path=path,
        target_mb_per_file=target_mb_per_file,
        target_rows_per_file=target_rows_per_file,
        partition_filter=partition_filter,
        compression=compression,
        dry_run=False,
        verbose=verbose,
    )

    if verbose:
        logger.info("Optimization complete: %s", result)

    return result
fsspeckit.datasets.duckdb.DuckDBDatasetIO.read_parquet
read_parquet(
    path: str,
    columns: list[str] | None = None,
    filter: str | None = None,
    use_threads: bool = True,
) -> Table

Read parquet file(s) using DuckDB.

Parameters:

Name Type Description Default
path str

Path to parquet file or directory

required
columns list[str] | None

Optional list of columns to read

None
filter str | None

Optional SQL WHERE clause

None
use_threads bool

Whether to use parallel reading

True

Returns:

Type Description
Table

PyArrow table containing the data

Example
1
2
3
4
5
6
from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

conn = create_duckdb_connection()
io = DuckDBDatasetIO(conn)
table = io.read_parquet("/path/to/file.parquet")
Source code in src/fsspeckit/datasets/duckdb_dataset.py
def read_parquet(
    self,
    path: str,
    columns: list[str] | None = None,
    filter: str | None = None,
    use_threads: bool = True,
) -> pa.Table:
    """Read parquet file(s) using DuckDB.

    Args:
        path: Path to parquet file or directory
        columns: Optional list of columns to read
        filter: Optional SQL WHERE clause
        use_threads: Whether to use parallel reading

    Returns:
        PyArrow table containing the data

    Example:
        ```python
        from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
        from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

        conn = create_duckdb_connection()
        io = DuckDBDatasetIO(conn)
        table = io.read_parquet("/path/to/file.parquet")
        ```
    """
    validate_path(path)

    conn = self._connection.connection
    fs = self._connection.filesystem

    # Build the query
    query = "SELECT * FROM parquet_scan(?)"

    params = [path]

    if columns:
        # Escape column names and build select list
        quoted_cols = [f'"{col}"' for col in columns]
        select_list = ", ".join(quoted_cols)
        query = f"SELECT {select_list} FROM parquet_scan(?)"

    if filter:
        query += f" WHERE {filter}"

    try:
        # Execute query
        if use_threads:
            result = conn.execute(query, params).fetch_arrow_table()
        else:
            result = conn.execute(query, params).fetch_arrow_table()

        return result

    except (_DUCKDB_EXCEPTIONS.get("IOException"), _DUCKDB_EXCEPTIONS.get("InvalidInputException"), _DUCKDB_EXCEPTIONS.get("ParserException")) as e:
        raise RuntimeError(
            f"Failed to read parquet from {path}: {safe_format_error(e)}"
        ) from e
fsspeckit.datasets.duckdb.DuckDBDatasetIO.write_parquet
write_parquet(
    data: Table | list[Table],
    path: str,
    compression: str | None = "snappy",
    row_group_size: int | None = None,
    use_threads: bool = True,
) -> None

Write parquet file using DuckDB.

Parameters:

Name Type Description Default
data Table | list[Table]

PyArrow table or list of tables to write

required
path str

Output file path

required
compression str | None

Compression codec to use

'snappy'
row_group_size int | None

Rows per row group

None
use_threads bool

Whether to use parallel writing

True
Example
1
2
3
4
5
6
7
8
import pyarrow as pa
from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

table = pa.table({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
conn = create_duckdb_connection()
io = DuckDBDatasetIO(conn)
io.write_parquet(table, "/tmp/data.parquet")
Source code in src/fsspeckit/datasets/duckdb_dataset.py
def write_parquet(
    self,
    data: pa.Table | list[pa.Table],
    path: str,
    compression: str | None = "snappy",
    row_group_size: int | None = None,
    use_threads: bool = True,
) -> None:
    """Write parquet file using DuckDB.

    Args:
        data: PyArrow table or list of tables to write
        path: Output file path
        compression: Compression codec to use
        row_group_size: Rows per row group
        use_threads: Whether to use parallel writing

    Example:
        ```python
        import pyarrow as pa
        from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
        from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

        table = pa.table({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
        conn = create_duckdb_connection()
        io = DuckDBDatasetIO(conn)
        io.write_parquet(table, "/tmp/data.parquet")
        ```
    """
    validate_path(path)
    validate_compression_codec(compression)

    conn = self._connection.connection

    # Register the data as a temporary table
    table_name = f"temp_{uuid.uuid4().hex[:16]}"
    conn.register("data_table", data)

    try:
        # Build the COPY command
        copy_query = f"COPY data_table TO ?"

        params = [path]

        if compression:
            copy_query += f" (COMPRESSION {compression})"

        if row_group_size:
            copy_query += f" (ROW_GROUP_SIZE {row_group_size})"

        # Execute the copy
        if use_threads:
            conn.execute(copy_query, params)
        else:
            conn.execute(copy_query, params)

    finally:
        # Clean up temporary table
        try:
            conn.unregister("data_table")
        except (_DUCKDB_EXCEPTIONS.get("CatalogException"), _DUCKDB_EXCEPTIONS.get("ConnectionException")) as e:
            logger.warning("Failed to unregister temporary table: %s", e)
fsspeckit.datasets.duckdb.DuckDBDatasetIO.write_parquet_dataset
write_parquet_dataset(
    data: Table | list[Table],
    path: str,
    basename_template: str | None = None,
    schema: Schema | None = None,
    partition_by: str | list[str] | None = None,
    compression: str | None = "snappy",
    max_rows_per_file: int | None = 5000000,
    row_group_size: int | None = 500000,
    use_threads: bool = True,
) -> None

Write a parquet dataset using DuckDB.

Parameters:

Name Type Description Default
data Table | list[Table]

PyArrow table or list of tables to write

required
path str

Output directory path

required
basename_template str | None

Template for file names

None
schema Schema | None

Optional schema to enforce

None
partition_by str | list[str] | None

Column(s) to partition by

None
compression str | None

Compression codec

'snappy'
max_rows_per_file int | None

Maximum rows per file

5000000
row_group_size int | None

Rows per row group

500000
use_threads bool

Whether to use parallel writing

True
Example
1
2
3
4
5
6
7
8
import pyarrow as pa
from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

table = pa.table({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
conn = create_duckdb_connection()
io = DuckDBDatasetIO(conn)
io.write_parquet_dataset(table, "/tmp/dataset/")
Source code in src/fsspeckit/datasets/duckdb_dataset.py
def write_parquet_dataset(
    self,
    data: pa.Table | list[pa.Table],
    path: str,
    basename_template: str | None = None,
    schema: pa.Schema | None = None,
    partition_by: str | list[str] | None = None,
    compression: str | None = "snappy",
    max_rows_per_file: int | None = 5_000_000,
    row_group_size: int | None = 500_000,
    use_threads: bool = True,
) -> None:
    """Write a parquet dataset using DuckDB.

    Args:
        data: PyArrow table or list of tables to write
        path: Output directory path
        basename_template: Template for file names
        schema: Optional schema to enforce
        partition_by: Column(s) to partition by
        compression: Compression codec
        max_rows_per_file: Maximum rows per file
        row_group_size: Rows per row group
        use_threads: Whether to use parallel writing

    Example:
        ```python
        import pyarrow as pa
        from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
        from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

        table = pa.table({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
        conn = create_duckdb_connection()
        io = DuckDBDatasetIO(conn)
        io.write_parquet_dataset(table, "/tmp/dataset/")
        ```
    """
    from fsspeckit.common.optional import _import_pyarrow

    validate_path(path)
    validate_compression_codec(compression)

    pa_mod = _import_pyarrow()

    conn = self._connection.connection

    # Register the data as a temporary table
    table_name = f"temp_{uuid.uuid4().hex[:16]}"
    conn.register("data_table", data)

    try:
        # Build the COPY command for dataset
        copy_query = "COPY data_table TO ? (FORMAT PARQUET"

        params = [path + "/{i}.parquet"]

        if compression:
            copy_query += f", COMPRESSION {compression}"

        if max_rows_per_file:
            copy_query += f", MAX_ROWS_PER_FILE {max_rows_per_file}"

        if row_group_size:
            copy_query += f", ROW_GROUP_SIZE {row_group_size}"

        copy_query += ")"

        # Execute with file numbering
        conn.execute(copy_query, params)

    finally:
        # Clean up temporary table
        try:
            conn.unregister("data_table")
        except (_DUCKDB_EXCEPTIONS.get("CatalogException"), _DUCKDB_EXCEPTIONS.get("ConnectionException")) as e:
            logger.warning("Failed to unregister temporary table: %s", e)
fsspeckit.datasets.duckdb.DuckDBParquetHandler
DuckDBParquetHandler(
    storage_options: Optional[
        Union[BaseStorageOptions, dict]
    ] = None,
    filesystem: Optional[AbstractFileSystem] = None,
)

Bases: DuckDBDatasetIO

Backward compatibility wrapper for DuckDBParquetHandler.

This class has been refactored into: - DuckDBConnection: for connection management - DuckDBDatasetIO: for dataset I/O operations

For new code, consider using DuckDBConnection and DuckDBDatasetIO directly.

Initialize DuckDB parquet handler.

Parameters:

Name Type Description Default
storage_options Optional[Union[BaseStorageOptions, dict]]

Storage configuration options (deprecated)

None
filesystem Optional[AbstractFileSystem]

Filesystem instance (deprecated)

None
Source code in src/fsspeckit/datasets/duckdb.py
def __init__(
    self,
    storage_options: Optional[Union[BaseStorageOptions, dict]] = None,
    filesystem: Optional[AbstractFileSystem] = None,
):
    """Initialize DuckDB parquet handler.

    Args:
        storage_options: Storage configuration options (deprecated)
        filesystem: Filesystem instance (deprecated)
    """
    from fsspeckit.datasets.duckdb_connection import create_duckdb_connection

    # Create connection from filesystem
    self._connection = create_duckdb_connection(filesystem=filesystem)

    # Initialize the IO handler
    super().__init__(self._connection)
Functions
fsspeckit.datasets.duckdb.DuckDBParquetHandler.__del__
__del__()

Destructor (deprecated).

Source code in src/fsspeckit/datasets/duckdb.py
def __del__(self):
    """Destructor (deprecated)."""
    self.close()
fsspeckit.datasets.duckdb.DuckDBParquetHandler.__enter__
__enter__()

Enter context manager (deprecated).

Source code in src/fsspeckit/datasets/duckdb.py
def __enter__(self):
    """Enter context manager (deprecated)."""
    return self
fsspeckit.datasets.duckdb.DuckDBParquetHandler.__exit__
__exit__(exc_type, exc_val, exc_tb)

Exit context manager (deprecated).

Source code in src/fsspeckit/datasets/duckdb.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Exit context manager (deprecated)."""
    self.close()
fsspeckit.datasets.duckdb.DuckDBParquetHandler.close
close()

Close connection (deprecated, use connection.close).

Source code in src/fsspeckit/datasets/duckdb.py
def close(self):
    """Close connection (deprecated, use connection.close)."""
    self._connection.close()
fsspeckit.datasets.duckdb.DuckDBParquetHandler.execute_sql
execute_sql(query: str, parameters=None)

Execute SQL query (deprecated, use connection.execute_sql).

Source code in src/fsspeckit/datasets/duckdb.py
def execute_sql(self, query: str, parameters=None):
    """Execute SQL query (deprecated, use connection.execute_sql)."""
    return self._connection.execute_sql(query, parameters)
Functions
fsspeckit.datasets.duckdb.create_duckdb_connection
create_duckdb_connection(
    filesystem: AbstractFileSystem | None = None,
) -> DuckDBConnection

Create a DuckDB connection manager.

Parameters:

Name Type Description Default
filesystem AbstractFileSystem | None

fsspec filesystem to use

None

Returns:

Type Description
DuckDBConnection

DuckDB connection manager

Source code in src/fsspeckit/datasets/duckdb_connection.py
def create_duckdb_connection(
    filesystem: AbstractFileSystem | None = None,
) -> DuckDBConnection:
    """Create a DuckDB connection manager.

    Args:
        filesystem: fsspec filesystem to use

    Returns:
        DuckDB connection manager
    """
    return DuckDBConnection(filesystem=filesystem)

fsspeckit.datasets.duckdb_cleanup_helpers

fsspeckit.datasets.duckdb_connection

DuckDB connection and filesystem registration helpers.

This module contains functions and classes for managing DuckDB connections and integrating with fsspec filesystems.

Classes
fsspeckit.datasets.duckdb_connection.DuckDBConnection
DuckDBConnection(
    filesystem: AbstractFileSystem | None = None,
)

Manages DuckDB connection lifecycle and filesystem registration.

This class is responsible for: - Creating and managing DuckDB connections - Registering fsspec filesystems with DuckDB - Connection cleanup

Parameters:

Name Type Description Default
filesystem AbstractFileSystem | None

fsspec filesystem instance to use

None

Initialize DuckDB connection manager.

Parameters:

Name Type Description Default
filesystem AbstractFileSystem | None

Filesystem to use. Defaults to local filesystem.

None
Source code in src/fsspeckit/datasets/duckdb_connection.py
def __init__(self, filesystem: AbstractFileSystem | None = None) -> None:
    """Initialize DuckDB connection manager.

    Args:
        filesystem: Filesystem to use. Defaults to local filesystem.
    """
    self._connection: duckdb.DuckDBPyConnection | None = None
    self._filesystem = filesystem or fsspec_filesystem("file")
Attributes
fsspeckit.datasets.duckdb_connection.DuckDBConnection.connection property
connection: Any

Get active DuckDB connection, creating it if necessary.

Returns:

Type Description
Any

Active DuckDB connection

fsspeckit.datasets.duckdb_connection.DuckDBConnection.filesystem property
filesystem: AbstractFileSystem

Get the filesystem instance.

Returns:

Type Description
AbstractFileSystem

Filesystem instance

Functions
fsspeckit.datasets.duckdb_connection.DuckDBConnection.__del__
__del__() -> None

Destructor to ensure connection is closed.

Source code in src/fsspeckit/datasets/duckdb_connection.py
def __del__(self) -> None:
    """Destructor to ensure connection is closed."""
    self.close()
fsspeckit.datasets.duckdb_connection.DuckDBConnection.__enter__
__enter__() -> 'DuckDBConnection'

Enter context manager.

Returns:

Type Description
'DuckDBConnection'

self

Source code in src/fsspeckit/datasets/duckdb_connection.py
def __enter__(self) -> "DuckDBConnection":
    """Enter context manager.

    Returns:
        self
    """
    return self
fsspeckit.datasets.duckdb_connection.DuckDBConnection.__exit__
__exit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Exit context manager and close connection.

Source code in src/fsspeckit/datasets/duckdb_connection.py
def __exit__(
    self,
    exc_type: Any,
    exc_val: Any,
    exc_tb: Any,
) -> None:
    """Exit context manager and close connection."""
    self.close()
fsspeckit.datasets.duckdb_connection.DuckDBConnection.close
close() -> None

Close the connection and clean up resources.

Source code in src/fsspeckit/datasets/duckdb_connection.py
def close(self) -> None:
    """Close the connection and clean up resources."""
    if self._connection is not None:
        try:
            self._connection.close()
        except (_DUCKDB_EXCEPTIONS.get("ConnectionException"), _DUCKDB_EXCEPTIONS.get("OperationalException")) as e:
            logger.warning("Error closing DuckDB connection: %s", e)
        finally:
            self._connection = None
fsspeckit.datasets.duckdb_connection.DuckDBConnection.execute_sql
execute_sql(
    query: str, parameters: list[Any] | None = None
) -> Any

Execute a SQL query.

Parameters:

Name Type Description Default
query str

SQL query to execute

required
parameters list[Any] | None

Optional query parameters

None

Returns:

Type Description
Any

Query result

Source code in src/fsspeckit/datasets/duckdb_connection.py
def execute_sql(
    self,
    query: str,
    parameters: list[Any] | None = None,
) -> Any:
    """Execute a SQL query.

    Args:
        query: SQL query to execute
        parameters: Optional query parameters

    Returns:
        Query result
    """
    conn = self.connection

    if parameters:
        return conn.execute(query, parameters)
    else:
        return conn.execute(query)
Functions
fsspeckit.datasets.duckdb_connection.create_duckdb_connection
create_duckdb_connection(
    filesystem: AbstractFileSystem | None = None,
) -> DuckDBConnection

Create a DuckDB connection manager.

Parameters:

Name Type Description Default
filesystem AbstractFileSystem | None

fsspec filesystem to use

None

Returns:

Type Description
DuckDBConnection

DuckDB connection manager

Source code in src/fsspeckit/datasets/duckdb_connection.py
def create_duckdb_connection(
    filesystem: AbstractFileSystem | None = None,
) -> DuckDBConnection:
    """Create a DuckDB connection manager.

    Args:
        filesystem: fsspec filesystem to use

    Returns:
        DuckDB connection manager
    """
    return DuckDBConnection(filesystem=filesystem)

fsspeckit.datasets.duckdb_dataset

DuckDB dataset I/O and maintenance operations.

This module contains functions for reading, writing, and maintaining parquet datasets using DuckDB.

Classes
fsspeckit.datasets.duckdb_dataset.DuckDBDatasetIO
DuckDBDatasetIO(connection: DuckDBConnection)

DuckDB-based dataset I/O operations.

This class provides methods for reading and writing parquet files and datasets using DuckDB's high-performance parquet engine.

Parameters:

Name Type Description Default
connection DuckDBConnection

DuckDB connection manager

required

Initialize DuckDB dataset I/O.

Parameters:

Name Type Description Default
connection DuckDBConnection

DuckDB connection manager

required
Source code in src/fsspeckit/datasets/duckdb_dataset.py
def __init__(self, connection: DuckDBConnection) -> None:
    """Initialize DuckDB dataset I/O.

    Args:
        connection: DuckDB connection manager
    """
    self._connection = connection
Functions
fsspeckit.datasets.duckdb_dataset.DuckDBDatasetIO.compact_parquet_dataset
compact_parquet_dataset(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    dry_run: bool = False,
    verbose: bool = False,
) -> dict[str, Any]

Compact a parquet dataset using DuckDB.

Parameters:

Name Type Description Default
path str

Dataset path

required
target_mb_per_file int | None

Target size per file

None
target_rows_per_file int | None

Target rows per file

None
partition_filter list[str] | None

Optional partition filters

None
compression str | None

Compression codec

None
dry_run bool

Whether to perform a dry run

False
verbose bool

Print progress information

False

Returns:

Type Description
dict[str, Any]

Compaction statistics

Source code in src/fsspeckit/datasets/duckdb_dataset.py
def compact_parquet_dataset(
    self,
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    dry_run: bool = False,
    verbose: bool = False,
) -> dict[str, Any]:
    """Compact a parquet dataset using DuckDB.

    Args:
        path: Dataset path
        target_mb_per_file: Target size per file
        target_rows_per_file: Target rows per file
        partition_filter: Optional partition filters
        compression: Compression codec
        dry_run: Whether to perform a dry run
        verbose: Print progress information

    Returns:
        Compaction statistics
    """
    from fsspeckit.core.maintenance import plan_compaction_groups, MaintenanceStats

    # Collect stats
    stats = self._collect_dataset_stats(path, partition_filter)
    files = stats["files"]

    # Plan compaction
    plan_result = plan_compaction_groups(
        file_infos=files,
        target_mb_per_file=target_mb_per_file,
        target_rows_per_file=target_rows_per_file,
    )

    groups = plan_result["groups"]
    planned_stats = plan_result["planned_stats"]

    if dry_run:
        result = planned_stats.to_dict()
        result["planned_groups"] = groups
        return result

    # Execute compaction
    if not groups:
        return planned_stats.to_dict()

    conn = self._connection.connection

    for group in groups:
        # Read all files in this group into DuckDB
        tables = []
        for file_info in group["files"]:
            file_path = file_info["path"]
            table = conn.execute(
                f"SELECT * FROM parquet_scan('{file_path}')"
            ).fetch_arrow_table()
            tables.append(table)

        # Concatenate tables
        if len(tables) > 1:
            combined = pa.concat_tables(tables, promote_options="permissive")
        else:
            combined = tables[0]

        # Write to output
        output_path = group["output_path"]
        self.write_parquet(combined, output_path, compression=compression)

    # Remove original files
    for group in groups:
        for file_info in group["files"]:
            file_path = file_info["path"]
            self._connection.filesystem.rm(file_path)

    return planned_stats.to_dict()
fsspeckit.datasets.duckdb_dataset.DuckDBDatasetIO.merge_parquet_dataset
merge_parquet_dataset(
    sources: list[str],
    output_path: str,
    target: str | None = None,
    strategy: str | MergeStrategy = "deduplicate",
    key_columns: list[str] | str | None = None,
    compression: str | None = None,
    verbose: bool = False,
    **kwargs: Any,
) -> MergeStats

Merge multiple parquet datasets using DuckDB.

Parameters:

Name Type Description Default
sources list[str]

List of source dataset paths

required
output_path str

Path for merged output

required
target str | None

Target dataset path (for upsert/update strategies)

None
strategy str | MergeStrategy

Merge strategy to use

'deduplicate'
key_columns list[str] | str | None

Key columns for merging

None
compression str | None

Output compression codec

None
verbose bool

Print progress information

False
**kwargs Any

Additional arguments

{}

Returns:

Type Description
MergeStats

MergeStats with merge statistics

Example
from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

conn = create_duckdb_connection()
io = DuckDBDatasetIO(conn)
stats = io.merge_parquet_dataset(
    sources=["dataset1/", "dataset2/"],
    output_path="merged/",
    strategy="deduplicate",
    key_columns=["id"],
)
Source code in src/fsspeckit/datasets/duckdb_dataset.py
def merge_parquet_dataset(
    self,
    sources: list[str],
    output_path: str,
    target: str | None = None,
    strategy: str | CoreMergeStrategy = "deduplicate",
    key_columns: list[str] | str | None = None,
    compression: str | None = None,
    verbose: bool = False,
    **kwargs: Any,
) -> MergeStats:
    """Merge multiple parquet datasets using DuckDB.

    Args:
        sources: List of source dataset paths
        output_path: Path for merged output
        target: Target dataset path (for upsert/update strategies)
        strategy: Merge strategy to use
        key_columns: Key columns for merging
        compression: Output compression codec
        verbose: Print progress information
        **kwargs: Additional arguments

    Returns:
        MergeStats with merge statistics

    Example:
        ```python
        from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
        from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

        conn = create_duckdb_connection()
        io = DuckDBDatasetIO(conn)
        stats = io.merge_parquet_dataset(
            sources=["dataset1/", "dataset2/"],
            output_path="merged/",
            strategy="deduplicate",
            key_columns=["id"],
        )
        ```
    """
    # Validate inputs using shared core logic
    validate_merge_inputs(
        sources=sources,
        strategy=strategy,
        key_columns=key_columns,
        target=target,
    )

    validate_strategy_compatibility(strategy, key_columns, target)

    # Normalize parameters
    if key_columns is not None:
        key_columns = normalize_key_columns(key_columns)

    # Process merge using DuckDB
    result = self._execute_merge_strategy(
        sources=sources,
        output_path=output_path,
        target=target,
        strategy=strategy,
        key_columns=key_columns,
        compression=compression,
        verbose=verbose,
    )

    return result
fsspeckit.datasets.duckdb_dataset.DuckDBDatasetIO.optimize_parquet_dataset
optimize_parquet_dataset(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    verbose: bool = False,
) -> dict[str, Any]

Optimize a parquet dataset.

Parameters:

Name Type Description Default
path str

Dataset path

required
target_mb_per_file int | None

Target size per file

None
target_rows_per_file int | None

Target rows per file

None
partition_filter list[str] | None

Optional partition filters

None
compression str | None

Compression codec

None
verbose bool

Print progress information

False

Returns:

Type Description
dict[str, Any]

Optimization statistics

Source code in src/fsspeckit/datasets/duckdb_dataset.py
def optimize_parquet_dataset(
    self,
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    verbose: bool = False,
) -> dict[str, Any]:
    """Optimize a parquet dataset.

    Args:
        path: Dataset path
        target_mb_per_file: Target size per file
        target_rows_per_file: Target rows per file
        partition_filter: Optional partition filters
        compression: Compression codec
        verbose: Print progress information

    Returns:
        Optimization statistics
    """
    # Use compaction for optimization
    result = self.compact_parquet_dataset(
        path=path,
        target_mb_per_file=target_mb_per_file,
        target_rows_per_file=target_rows_per_file,
        partition_filter=partition_filter,
        compression=compression,
        dry_run=False,
        verbose=verbose,
    )

    if verbose:
        logger.info("Optimization complete: %s", result)

    return result
fsspeckit.datasets.duckdb_dataset.DuckDBDatasetIO.read_parquet
read_parquet(
    path: str,
    columns: list[str] | None = None,
    filter: str | None = None,
    use_threads: bool = True,
) -> Table

Read parquet file(s) using DuckDB.

Parameters:

Name Type Description Default
path str

Path to parquet file or directory

required
columns list[str] | None

Optional list of columns to read

None
filter str | None

Optional SQL WHERE clause

None
use_threads bool

Whether to use parallel reading

True

Returns:

Type Description
Table

PyArrow table containing the data

Example
1
2
3
4
5
6
from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

conn = create_duckdb_connection()
io = DuckDBDatasetIO(conn)
table = io.read_parquet("/path/to/file.parquet")
Source code in src/fsspeckit/datasets/duckdb_dataset.py
def read_parquet(
    self,
    path: str,
    columns: list[str] | None = None,
    filter: str | None = None,
    use_threads: bool = True,
) -> pa.Table:
    """Read parquet file(s) using DuckDB.

    Args:
        path: Path to parquet file or directory
        columns: Optional list of columns to read
        filter: Optional SQL WHERE clause
        use_threads: Whether to use parallel reading

    Returns:
        PyArrow table containing the data

    Example:
        ```python
        from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
        from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

        conn = create_duckdb_connection()
        io = DuckDBDatasetIO(conn)
        table = io.read_parquet("/path/to/file.parquet")
        ```
    """
    validate_path(path)

    conn = self._connection.connection
    fs = self._connection.filesystem

    # Build the query
    query = "SELECT * FROM parquet_scan(?)"

    params = [path]

    if columns:
        # Escape column names and build select list
        quoted_cols = [f'"{col}"' for col in columns]
        select_list = ", ".join(quoted_cols)
        query = f"SELECT {select_list} FROM parquet_scan(?)"

    if filter:
        query += f" WHERE {filter}"

    try:
        # Execute query
        if use_threads:
            result = conn.execute(query, params).fetch_arrow_table()
        else:
            result = conn.execute(query, params).fetch_arrow_table()

        return result

    except (_DUCKDB_EXCEPTIONS.get("IOException"), _DUCKDB_EXCEPTIONS.get("InvalidInputException"), _DUCKDB_EXCEPTIONS.get("ParserException")) as e:
        raise RuntimeError(
            f"Failed to read parquet from {path}: {safe_format_error(e)}"
        ) from e
fsspeckit.datasets.duckdb_dataset.DuckDBDatasetIO.write_parquet
write_parquet(
    data: Table | list[Table],
    path: str,
    compression: str | None = "snappy",
    row_group_size: int | None = None,
    use_threads: bool = True,
) -> None

Write parquet file using DuckDB.

Parameters:

Name Type Description Default
data Table | list[Table]

PyArrow table or list of tables to write

required
path str

Output file path

required
compression str | None

Compression codec to use

'snappy'
row_group_size int | None

Rows per row group

None
use_threads bool

Whether to use parallel writing

True
Example
1
2
3
4
5
6
7
8
import pyarrow as pa
from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

table = pa.table({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
conn = create_duckdb_connection()
io = DuckDBDatasetIO(conn)
io.write_parquet(table, "/tmp/data.parquet")
Source code in src/fsspeckit/datasets/duckdb_dataset.py
def write_parquet(
    self,
    data: pa.Table | list[pa.Table],
    path: str,
    compression: str | None = "snappy",
    row_group_size: int | None = None,
    use_threads: bool = True,
) -> None:
    """Write parquet file using DuckDB.

    Args:
        data: PyArrow table or list of tables to write
        path: Output file path
        compression: Compression codec to use
        row_group_size: Rows per row group
        use_threads: Whether to use parallel writing

    Example:
        ```python
        import pyarrow as pa
        from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
        from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

        table = pa.table({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
        conn = create_duckdb_connection()
        io = DuckDBDatasetIO(conn)
        io.write_parquet(table, "/tmp/data.parquet")
        ```
    """
    validate_path(path)
    validate_compression_codec(compression)

    conn = self._connection.connection

    # Register the data as a temporary table
    table_name = f"temp_{uuid.uuid4().hex[:16]}"
    conn.register("data_table", data)

    try:
        # Build the COPY command
        copy_query = f"COPY data_table TO ?"

        params = [path]

        if compression:
            copy_query += f" (COMPRESSION {compression})"

        if row_group_size:
            copy_query += f" (ROW_GROUP_SIZE {row_group_size})"

        # Execute the copy
        if use_threads:
            conn.execute(copy_query, params)
        else:
            conn.execute(copy_query, params)

    finally:
        # Clean up temporary table
        try:
            conn.unregister("data_table")
        except (_DUCKDB_EXCEPTIONS.get("CatalogException"), _DUCKDB_EXCEPTIONS.get("ConnectionException")) as e:
            logger.warning("Failed to unregister temporary table: %s", e)
fsspeckit.datasets.duckdb_dataset.DuckDBDatasetIO.write_parquet_dataset
write_parquet_dataset(
    data: Table | list[Table],
    path: str,
    basename_template: str | None = None,
    schema: Schema | None = None,
    partition_by: str | list[str] | None = None,
    compression: str | None = "snappy",
    max_rows_per_file: int | None = 5000000,
    row_group_size: int | None = 500000,
    use_threads: bool = True,
) -> None

Write a parquet dataset using DuckDB.

Parameters:

Name Type Description Default
data Table | list[Table]

PyArrow table or list of tables to write

required
path str

Output directory path

required
basename_template str | None

Template for file names

None
schema Schema | None

Optional schema to enforce

None
partition_by str | list[str] | None

Column(s) to partition by

None
compression str | None

Compression codec

'snappy'
max_rows_per_file int | None

Maximum rows per file

5000000
row_group_size int | None

Rows per row group

500000
use_threads bool

Whether to use parallel writing

True
Example
1
2
3
4
5
6
7
8
import pyarrow as pa
from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

table = pa.table({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
conn = create_duckdb_connection()
io = DuckDBDatasetIO(conn)
io.write_parquet_dataset(table, "/tmp/dataset/")
Source code in src/fsspeckit/datasets/duckdb_dataset.py
def write_parquet_dataset(
    self,
    data: pa.Table | list[pa.Table],
    path: str,
    basename_template: str | None = None,
    schema: pa.Schema | None = None,
    partition_by: str | list[str] | None = None,
    compression: str | None = "snappy",
    max_rows_per_file: int | None = 5_000_000,
    row_group_size: int | None = 500_000,
    use_threads: bool = True,
) -> None:
    """Write a parquet dataset using DuckDB.

    Args:
        data: PyArrow table or list of tables to write
        path: Output directory path
        basename_template: Template for file names
        schema: Optional schema to enforce
        partition_by: Column(s) to partition by
        compression: Compression codec
        max_rows_per_file: Maximum rows per file
        row_group_size: Rows per row group
        use_threads: Whether to use parallel writing

    Example:
        ```python
        import pyarrow as pa
        from fsspeckit.datasets.duckdb_connection import create_duckdb_connection
        from fsspeckit.datasets.duckdb_dataset import DuckDBDatasetIO

        table = pa.table({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
        conn = create_duckdb_connection()
        io = DuckDBDatasetIO(conn)
        io.write_parquet_dataset(table, "/tmp/dataset/")
        ```
    """
    from fsspeckit.common.optional import _import_pyarrow

    validate_path(path)
    validate_compression_codec(compression)

    pa_mod = _import_pyarrow()

    conn = self._connection.connection

    # Register the data as a temporary table
    table_name = f"temp_{uuid.uuid4().hex[:16]}"
    conn.register("data_table", data)

    try:
        # Build the COPY command for dataset
        copy_query = "COPY data_table TO ? (FORMAT PARQUET"

        params = [path + "/{i}.parquet"]

        if compression:
            copy_query += f", COMPRESSION {compression}"

        if max_rows_per_file:
            copy_query += f", MAX_ROWS_PER_FILE {max_rows_per_file}"

        if row_group_size:
            copy_query += f", ROW_GROUP_SIZE {row_group_size}"

        copy_query += ")"

        # Execute with file numbering
        conn.execute(copy_query, params)

    finally:
        # Clean up temporary table
        try:
            conn.unregister("data_table")
        except (_DUCKDB_EXCEPTIONS.get("CatalogException"), _DUCKDB_EXCEPTIONS.get("ConnectionException")) as e:
            logger.warning("Failed to unregister temporary table: %s", e)
Functions

fsspeckit.datasets.pyarrow

Re-export module for backward compatibility.

This module has been decomposed into focused submodules: - pyarrow_schema: Schema unification, type inference, and optimization - pyarrow_dataset: Dataset merge and maintenance operations

All public APIs are re-exported here to maintain backward compatibility. New code should import directly from the submodules for better organization.

Functions
fsspeckit.datasets.pyarrow.cast_schema
cast_schema(table: Table, schema: Schema) -> Table

Cast a PyArrow table to a target schema.

Parameters:

Name Type Description Default
table Table

Source table

required
schema Schema

Target schema

required

Returns:

Type Description
Table

Table cast to target schema

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def cast_schema(table: pa.Table, schema: pa.Schema) -> pa.Table:
    """Cast a PyArrow table to a target schema.

    Args:
        table: Source table
        schema: Target schema

    Returns:
        Table cast to target schema
    """
    # Filter schema to only include columns present in the table
    table_schema = table.schema
    valid_fields = []

    for field in schema:
        if field.name in table_schema.names:
            valid_fields.append(field)

    target_schema = pa.schema(valid_fields)

    # Cast the table
    return table.cast(target_schema)
fsspeckit.datasets.pyarrow.collect_dataset_stats_pyarrow
collect_dataset_stats_pyarrow(
    path: str,
    filesystem: AbstractFileSystem | None = None,
    partition_filter: list[str] | None = None,
) -> dict[str, Any]

Collect file-level statistics for a parquet dataset using shared core logic.

This function delegates to the shared fsspeckit.core.maintenance.collect_dataset_stats function, ensuring consistent dataset discovery and statistics across both DuckDB and PyArrow backends.

The helper walks the given dataset directory on the provided filesystem, discovers parquet files (recursively), and returns basic statistics:

  • Per-file path, size in bytes, and number of rows
  • Aggregated total bytes and total rows

The function is intentionally streaming/metadata-driven and never materializes the full dataset as a single :class:pyarrow.Table.

Parameters:

Name Type Description Default
path str

Root directory of the parquet dataset.

required
filesystem AbstractFileSystem | None

Optional fsspec filesystem. If omitted, a local "file" filesystem is used.

None
partition_filter list[str] | None

Optional list of partition prefix filters (e.g. ["date=2025-11-04"]). Only files whose path relative to path starts with one of these prefixes are included.

None

Returns:

Type Description
dict[str, Any]

Dict with keys:

dict[str, Any]
  • files: list of {"path", "size_bytes", "num_rows"} dicts
dict[str, Any]
  • total_bytes: sum of file sizes
dict[str, Any]
  • total_rows: sum of row counts

Raises:

Type Description
FileNotFoundError

If the path does not exist or no parquet files match the optional partition filter.

Note

This is a thin wrapper around the shared core function. See :func:fsspeckit.core.maintenance.collect_dataset_stats for the authoritative implementation.

Source code in src/fsspeckit/datasets/pyarrow_dataset.py
def collect_dataset_stats_pyarrow(
    path: str,
    filesystem: AbstractFileSystem | None = None,
    partition_filter: list[str] | None = None,
) -> dict[str, Any]:
    """Collect file-level statistics for a parquet dataset using shared core logic.

    This function delegates to the shared ``fsspeckit.core.maintenance.collect_dataset_stats``
    function, ensuring consistent dataset discovery and statistics across both DuckDB
    and PyArrow backends.

    The helper walks the given dataset directory on the provided filesystem,
    discovers parquet files (recursively), and returns basic statistics:

    - Per-file path, size in bytes, and number of rows
    - Aggregated total bytes and total rows

    The function is intentionally streaming/metadata-driven and never
    materializes the full dataset as a single :class:`pyarrow.Table`.

    Args:
        path: Root directory of the parquet dataset.
        filesystem: Optional fsspec filesystem. If omitted, a local "file"
            filesystem is used.
        partition_filter: Optional list of partition prefix filters
            (e.g. ["date=2025-11-04"]). Only files whose path relative to
            ``path`` starts with one of these prefixes are included.

    Returns:
        Dict with keys:

        - ``files``: list of ``{"path", "size_bytes", "num_rows"}`` dicts
        - ``total_bytes``: sum of file sizes
        - ``total_rows``: sum of row counts

    Raises:
        FileNotFoundError: If the path does not exist or no parquet files
            match the optional partition filter.

    Note:
        This is a thin wrapper around the shared core function. See
        :func:`fsspeckit.core.maintenance.collect_dataset_stats` for the
        authoritative implementation.
    """
    from fsspeckit.core.maintenance import collect_dataset_stats

    return collect_dataset_stats(
        path=path,
        filesystem=filesystem,
        partition_filter=partition_filter,
    )
fsspeckit.datasets.pyarrow.compact_parquet_dataset_pyarrow
compact_parquet_dataset_pyarrow(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    dry_run: bool = False,
    filesystem: AbstractFileSystem | None = None,
) -> dict[str, Any]

Compact a parquet dataset directory into fewer larger files using PyArrow and shared planning.

Groups small files based on size (MB) and/or row thresholds, rewrites grouped files into new parquet files, and optionally changes compression. Supports a dry-run mode that returns the compaction plan without modifying files.

The implementation uses the shared core planning algorithm for consistent behavior across backends. It processes data in a group-based, streaming fashion: it reads only the files in a given group into memory when processing that group and never materializes the entire dataset as a single table.

Parameters:

Name Type Description Default
path str

Dataset root directory (local path or fsspec URL).

required
target_mb_per_file int | None

Optional max output size per file; must be > 0.

None
target_rows_per_file int | None

Optional max rows per output file; must be > 0.

None
partition_filter list[str] | None

Optional list of partition prefixes (e.g. ["date=2025-11-15"]) used to limit both stats collection and rewrites to matching paths.

None
compression str | None

Optional parquet compression codec; defaults to "snappy".

None
dry_run bool

When True the function returns a plan + before/after stats without reading or writing any parquet data.

False
filesystem AbstractFileSystem | None

Optional fsspec.AbstractFileSystem to reuse existing FS clients.

None

Returns:

Type Description
dict[str, Any]

A stats dictionary describing before/after file counts, total bytes,

dict[str, Any]

rewritten bytes, and optional planned_groups when dry_run is enabled.

dict[str, Any]

The structure follows the canonical MaintenanceStats format from the shared core.

Raises:

Type Description
ValueError

If thresholds are invalid or no files match partition filter.

FileNotFoundError

If the path does not exist.

Example
1
2
3
4
5
6
7
result = compact_parquet_dataset_pyarrow(
    "/path/to/dataset",
    target_mb_per_file=64,
    dry_run=True,
)
print(f"Files before: {result['before_file_count']}")
print(f"Files after: {result['after_file_count']}")
Note

This function delegates dataset discovery and compaction planning to the shared fsspeckit.core.maintenance module, ensuring consistent behavior across DuckDB and PyArrow backends.

Source code in src/fsspeckit/datasets/pyarrow_dataset.py
def compact_parquet_dataset_pyarrow(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    dry_run: bool = False,
    filesystem: AbstractFileSystem | None = None,
) -> dict[str, Any]:
    """Compact a parquet dataset directory into fewer larger files using PyArrow and shared planning.

    Groups small files based on size (MB) and/or row thresholds, rewrites grouped
    files into new parquet files, and optionally changes compression. Supports a
    dry-run mode that returns the compaction plan without modifying files.

    The implementation uses the shared core planning algorithm for consistent
    behavior across backends. It processes data in a group-based, streaming fashion:
    it reads only the files in a given group into memory when processing that group
    and never materializes the entire dataset as a single table.

    Args:
        path: Dataset root directory (local path or fsspec URL).
        target_mb_per_file: Optional max output size per file; must be > 0.
        target_rows_per_file: Optional max rows per output file; must be > 0.
        partition_filter: Optional list of partition prefixes (e.g. ``["date=2025-11-15"]``)
            used to limit both stats collection and rewrites to matching paths.
        compression: Optional parquet compression codec; defaults to ``"snappy"``.
        dry_run: When ``True`` the function returns a plan + before/after stats
            without reading or writing any parquet data.
        filesystem: Optional ``fsspec.AbstractFileSystem`` to reuse existing FS clients.

    Returns:
        A stats dictionary describing before/after file counts, total bytes,
        rewritten bytes, and optional ``planned_groups`` when ``dry_run`` is enabled.
        The structure follows the canonical ``MaintenanceStats`` format from the shared core.

    Raises:
        ValueError: If thresholds are invalid or no files match partition filter.
        FileNotFoundError: If the path does not exist.

    Example:
        ```python
        result = compact_parquet_dataset_pyarrow(
            "/path/to/dataset",
            target_mb_per_file=64,
            dry_run=True,
        )
        print(f"Files before: {result['before_file_count']}")
        print(f"Files after: {result['after_file_count']}")
        ```

    Note:
        This function delegates dataset discovery and compaction planning to the
        shared ``fsspeckit.core.maintenance`` module, ensuring consistent behavior
        across DuckDB and PyArrow backends.
    """
    from fsspeckit.core.maintenance import plan_compaction_groups, MaintenanceStats

    # Get dataset stats using shared logic
    stats = collect_dataset_stats_pyarrow(
        path=path, filesystem=filesystem, partition_filter=partition_filter
    )
    files = stats["files"]

    # Use shared compaction planning
    plan_result = plan_compaction_groups(
        file_infos=files,
        target_mb_per_file=target_mb_per_file,
        target_rows_per_file=target_rows_per_file,
    )

    groups = plan_result["groups"]
    planned_stats = plan_result["planned_stats"]

    # Update planned stats with compression info
    planned_stats.compression_codec = compression
    planned_stats.dry_run = dry_run

    # If dry run, return the plan
    if dry_run:
        result = planned_stats.to_dict()
        result["planned_groups"] = groups
        return result

    # Execute compaction
    if not groups:
        return planned_stats.to_dict()

    # Execute the compaction
    for group in groups:
        # Read all files in this group
        tables = []
        for file_info in group["files"]:
            file_path = file_info["path"]
            table = pq.read_table(
                file_path,
                filesystem=filesystem,
            )
            tables.append(table)

        # Concatenate tables
        if len(tables) > 1:
            combined = pa.concat_tables(tables, promote_options="permissive")
        else:
            combined = tables[0]

        # Write to output file
        output_path = group["output_path"]
        pq.write_table(
            combined,
            output_path,
            filesystem=filesystem,
            compression=compression or "snappy",
        )

    # Remove original files
    for group in groups:
        for file_info in group["files"]:
            file_path = file_info["path"]
            filesystem.rm(file_path)

    return planned_stats.to_dict()
fsspeckit.datasets.pyarrow.convert_large_types_to_normal
convert_large_types_to_normal(schema: Schema) -> Schema

Convert large types (like large_string) to normal types.

Parameters:

Name Type Description Default
schema Schema

PyArrow schema

required

Returns:

Type Description
Schema

Schema with large types converted

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def convert_large_types_to_normal(schema: pa.Schema) -> pa.Schema:
    """Convert large types (like large_string) to normal types.

    Args:
        schema: PyArrow schema

    Returns:
        Schema with large types converted
    """
    fields = []
    for field in schema:
        if pa.types.is_large_string(field.type):
            field = field.with_type(pa.string())
        elif pa.types.is_large_utf8(field.type):
            field = field.with_type(pa.utf8())
        elif pa.types.is_large_list(field.type):
            field = field.with_type(pa.list_(field.type.value_type))
        fields.append(field)

    return pa.schema(fields)
fsspeckit.datasets.pyarrow.merge_parquet_dataset_pyarrow
merge_parquet_dataset_pyarrow(
    sources: list[str],
    output_path: str,
    target: str | None = None,
    strategy: str | MergeStrategy = "deduplicate",
    key_columns: list[str] | str | None = None,
    filesystem: AbstractFileSystem | None = None,
    compression: str | None = None,
    row_group_size: int | None = 500000,
    max_rows_per_file: int | None = 5000000,
    verbose: bool = False,
    **kwargs: Any,
) -> MergeStats

Merge multiple parquet datasets using PyArrow with various strategies.

This function provides dataset merging capabilities with support for: - Multiple merge strategies (upsert, insert, update, full_merge, deduplicate) - Key-based merging for relational operations - Batch processing for large datasets - Configurable output settings

Parameters:

Name Type Description Default
sources list[str]

List of source dataset paths

required
output_path str

Path for merged output

required
target str | None

Target dataset path (for upsert/update strategies)

None
strategy str | MergeStrategy

Merge strategy to use

'deduplicate'
key_columns list[str] | str | None

Key columns for merging (required for relational strategies)

None
filesystem AbstractFileSystem | None

fsspec filesystem instance

None
compression str | None

Output compression codec

None
row_group_size int | None

Rows per parquet row group

500000
max_rows_per_file int | None

Max rows per output file

5000000
verbose bool

Print progress information

False
**kwargs Any

Additional arguments

{}

Returns:

Type Description
MergeStats

MergeStats with merge statistics

Raises:

Type Description
ValueError

If required parameters are missing

FileNotFoundError

If sources don't exist

Example
1
2
3
4
5
6
7
8
stats = merge_parquet_dataset_pyarrow(
    sources=["dataset1/", "dataset2/"],
    output_path="merged/",
    strategy="deduplicate",
    key_columns=["id"],
    verbose=True,
)
print(f"Merged {stats.total_rows} rows")
Source code in src/fsspeckit/datasets/pyarrow_dataset.py
def merge_parquet_dataset_pyarrow(
    sources: list[str],
    output_path: str,
    target: str | None = None,
    strategy: str | CoreMergeStrategy = "deduplicate",
    key_columns: list[str] | str | None = None,
    filesystem: AbstractFileSystem | None = None,
    compression: str | None = None,
    row_group_size: int | None = 500_000,
    max_rows_per_file: int | None = 5_000_000,
    verbose: bool = False,
    **kwargs: Any,
) -> MergeStats:
    """Merge multiple parquet datasets using PyArrow with various strategies.

    This function provides dataset merging capabilities with support for:
    - Multiple merge strategies (upsert, insert, update, full_merge, deduplicate)
    - Key-based merging for relational operations
    - Batch processing for large datasets
    - Configurable output settings

    Args:
        sources: List of source dataset paths
        output_path: Path for merged output
        target: Target dataset path (for upsert/update strategies)
        strategy: Merge strategy to use
        key_columns: Key columns for merging (required for relational strategies)
        filesystem: fsspec filesystem instance
        compression: Output compression codec
        row_group_size: Rows per parquet row group
        max_rows_per_file: Max rows per output file
        verbose: Print progress information
        **kwargs: Additional arguments

    Returns:
        MergeStats with merge statistics

    Raises:
        ValueError: If required parameters are missing
        FileNotFoundError: If sources don't exist

    Example:
        ```python
        stats = merge_parquet_dataset_pyarrow(
            sources=["dataset1/", "dataset2/"],
            output_path="merged/",
            strategy="deduplicate",
            key_columns=["id"],
            verbose=True,
        )
        print(f"Merged {stats.total_rows} rows")
        ```
    """
    # Validate inputs using shared core logic
    validate_merge_inputs(
        sources=sources,
        strategy=strategy,
        key_columns=key_columns,
        target=target,
    )

    validate_strategy_compatibility(strategy, key_columns, target)

    # Normalize parameters
    if key_columns is not None:
        key_columns = _normalize_key_columns(key_columns)

    # Get filesystem
    if filesystem is None:
        filesystem = fsspec_filesystem("file")

    pa_filesystem = _ensure_pyarrow_filesystem(filesystem)

    # Load target if provided
    target_table = None
    if target and strategy in ["upsert", "update"]:
        target_table = _load_source_table_pyarrow(target, filesystem)

        if key_columns:
            _ensure_no_null_keys_table(target_table, key_columns)

    # Process sources
    merged_data = []
    total_rows = 0

    for source_path in sources:
        if verbose:
            logger.info("Processing source: %s", source_path)

        source_table = _load_source_table_pyarrow(source_path, filesystem)

        if key_columns:
            _ensure_no_null_keys_table(source_table, key_columns)

        if strategy == "full_merge":
            # Simply concatenate all data
            merged_data.append(source_table)
            total_rows += source_table.num_rows

        elif strategy == "deduplicate":
            # Remove duplicates based on key columns
            if key_columns:
                # Group by keys and keep first occurrence
                table = source_table

                # Use PyArrow's group_by for deduplication
                # This is a simplified implementation
                groups = table.group_by(key_columns).aggregate([])
                keys = groups.select(key_columns)

                # Get unique keys
                unique_keys = []
                for row in keys.to_pylist():
                    unique_keys.append(tuple(row[col] for col in key_columns))

                # Filter to keep only unique rows
                filtered = []
                for row in table.to_pylist():
                    key = tuple(row[col] for col in key_columns)
                    if key in unique_keys:
                        filtered.append(row)
                        unique_keys.remove(key)  # Remove to avoid duplicates

                if filtered:
                    deduped = pa.Table.from_pylist(filtered, schema=table.schema)
                    merged_data.append(deduped)
                    total_rows += deduped.num_rows
            else:
                # No key columns, remove exact duplicates
                merged_data.append(source_table)
                total_rows += source_table.num_rows

        elif strategy in ["upsert", "insert", "update"] and target_table is not None:
            # Key-based relational operations
            if strategy == "insert":
                # Only insert non-existing rows
                target_keys = _extract_key_tuples(target_table, key_columns)
                source_keys = _extract_key_tuples(source_table, key_columns)

                # Find keys that don't exist in target
                new_keys = set(source_keys) - set(target_keys)

                # Filter source for new keys
                new_rows = []
                for row in source_table.to_pylist():
                    key = tuple(row[col] for col in key_columns)
                    if key in new_keys:
                        new_rows.append(row)

                if new_rows:
                    new_table = pa.Table.from_pylist(new_rows, schema=source_table.schema)
                    merged_data.append(new_table)
                    total_rows += new_table.num_rows

            elif strategy == "update":
                # Update existing rows
                target_keys = _extract_key_tuples(target_table, key_columns)
                source_keys = _extract_key_tuples(source_table, key_columns)

                # Find common keys
                common_keys = set(source_keys) & set(target_keys)

                # Build updated target
                updated_data = []

                # Keep non-matching rows from target
                for row in target_table.to_pylist():
                    key = tuple(row[col] for col in key_columns)
                    if key not in common_keys:
                        updated_data.append(row)

                # Add updated rows from source
                for row in source_table.to_pylist():
                    key = tuple(row[col] for col in key_columns)
                    if key in common_keys:
                        updated_data.append(row)

                if updated_data:
                    updated_table = pa.Table.from_pylist(updated_data, schema=target_table.schema)
                    merged_data.append(updated_table)
                    total_rows += updated_table.num_rows

            elif strategy == "upsert":
                # Insert or update
                all_data = [target_table] if target_table else []
                all_data.append(source_table)

                if all_data:
                    combined = pa.concat_tables(all_data, promote_options="permissive")

                    # Deduplicate based on keys
                    if key_columns:
                        # Group by keys and keep last occurrence
                        # This is a simplified implementation
                        groups = combined.group_by(key_columns).aggregate([])
                        keys = groups.select(key_columns)

                        unique_keys = []
                        for row in keys.to_pylist():
                            unique_keys.append(tuple(row[col] for col in key_columns))

                        # Keep only last occurrence of each key
                        filtered = []
                        seen = set()
                        for row in reversed(combined.to_pylist()):
                            key = tuple(row[col] for col in key_columns)
                            if key not in seen:
                                filtered.append(row)
                                seen.add(key)

                        if filtered:
                            deduped = pa.Table.from_pylist(
                                list(reversed(filtered)), schema=combined.schema
                            )
                            merged_data.append(deduped)
                            total_rows += deduped.num_rows
                    else:
                        merged_data.append(combined)
                        total_rows += combined.num_rows

    # Combine all data
    if merged_data:
        if len(merged_data) == 1:
            final_table = merged_data[0]
        else:
            final_table = pa.concat_tables(
                merged_data, promote_options="permissive"
            )
    else:
        # No data to merge
        final_table = pa.table({})

    # Write output
    pq.write_table(
        final_table,
        output_path,
        filesystem=pa_filesystem,
        compression=compression,
        row_group_size=row_group_size,
        max_rows_per_file=max_rows_per_file,
    )

    # Calculate stats
    stats = calculate_merge_stats(
        sources=sources,
        target=output_path,
        strategy=strategy,
        total_rows=total_rows,
        output_rows=final_table.num_rows,
    )

    if verbose:
        logger.info("\nMerge complete: %s", stats)

    return stats
fsspeckit.datasets.pyarrow.opt_dtype
opt_dtype(
    table: Table,
    strict: bool = False,
    columns: list[str] | None = None,
) -> Table

Optimize dtypes in a PyArrow table based on data analysis.

This function analyzes the data in each column and attempts to downcast to more appropriate types (e.g., int64 -> int32, float64 -> float32, string -> int/bool where applicable).

Parameters:

Name Type Description Default
table Table

PyArrow table to optimize

required
strict bool

Whether to use strict type checking

False
columns list[str] | None

List of columns to optimize (None for all)

None

Returns:

Type Description
Table

Table with optimized dtypes

Example
import pyarrow as pa

table = pa.table(
    {
        "a": pa.array([1, 2, 3], type=pa.int64()),
        "b": pa.array([1.0, 2.0, 3.0], type=pa.float64()),
    },
)
optimized = opt_dtype(table)
print(optimized.column(0).type)  # DataType(int32)
print(optimized.column(1).type)  # DataType(float32)
Source code in src/fsspeckit/datasets/pyarrow_schema.py
def opt_dtype(
    table: pa.Table,
    strict: bool = False,
    columns: list[str] | None = None,
) -> pa.Table:
    """Optimize dtypes in a PyArrow table based on data analysis.

    This function analyzes the data in each column and attempts to downcast
    to more appropriate types (e.g., int64 -> int32, float64 -> float32,
    string -> int/bool where applicable).

    Args:
        table: PyArrow table to optimize
        strict: Whether to use strict type checking
        columns: List of columns to optimize (None for all)

    Returns:
        Table with optimized dtypes

    Example:
        ```python
        import pyarrow as pa

        table = pa.table(
            {
                "a": pa.array([1, 2, 3], type=pa.int64()),
                "b": pa.array([1.0, 2.0, 3.0], type=pa.float64()),
            },
        )
        optimized = opt_dtype(table)
        print(optimized.column(0).type)  # DataType(int32)
        print(optimized.column(1).type)  # DataType(float32)
        ```
    """
    from fsspeckit.common.misc import run_parallel

    if columns is None:
        columns = table.column_names

    # Process columns in parallel
    results = run_parallel(
        _process_column_for_opt_dtype,
        [(table, col, strict) for col in columns],
        backend="threading",
        n_jobs=-1,
    )

    # Build new table with optimized columns
    new_columns = {}
    for col_name, optimized_array in results:
        new_columns[col_name] = optimized_array

    # Keep non-optimized columns as-is
    for col_name in table.column_names:
        if col_name not in new_columns:
            new_columns[col_name] = table.column(col_name)

    return pa.table(new_columns)
fsspeckit.datasets.pyarrow.optimize_parquet_dataset_pyarrow
optimize_parquet_dataset_pyarrow(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    filesystem: AbstractFileSystem | None = None,
    verbose: bool = False,
) -> dict[str, Any]

Optimize a parquet dataset through compaction and optional statistics recalculation.

This is a convenience function that combines compaction with optional statistics recalculation. It's particularly useful after many small write operations have created a large number of small files.

Parameters:

Name Type Description Default
path str

Dataset root directory

required
target_mb_per_file int | None

Target size per file in MB

None
target_rows_per_file int | None

Target rows per file

None
partition_filter list[str] | None

Optional partition filters

None
compression str | None

Compression codec to use

None
filesystem AbstractFileSystem | None

Optional filesystem instance

None
verbose bool

Print progress information

False

Returns:

Type Description
dict[str, Any]

Optimization statistics

Example
1
2
3
4
5
6
7
8
9
stats = optimize_parquet_dataset_pyarrow(
    "dataset/",
    target_mb_per_file=64,
    compression="zstd",
)
print(
    f"Reduced from {stats['before_file_count']} "
    f"to {stats['after_file_count']} files",
)
Source code in src/fsspeckit/datasets/pyarrow_dataset.py
def optimize_parquet_dataset_pyarrow(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    filesystem: AbstractFileSystem | None = None,
    verbose: bool = False,
) -> dict[str, Any]:
    """Optimize a parquet dataset through compaction and optional statistics recalculation.

    This is a convenience function that combines compaction with optional statistics
    recalculation. It's particularly useful after many small write operations have
    created a large number of small files.

    Args:
        path: Dataset root directory
        target_mb_per_file: Target size per file in MB
        target_rows_per_file: Target rows per file
        partition_filter: Optional partition filters
        compression: Compression codec to use
        filesystem: Optional filesystem instance
        verbose: Print progress information

    Returns:
        Optimization statistics

    Example:
        ```python
        stats = optimize_parquet_dataset_pyarrow(
            "dataset/",
            target_mb_per_file=64,
            compression="zstd",
        )
        print(
            f"Reduced from {stats['before_file_count']} "
            f"to {stats['after_file_count']} files",
        )
        ```
    """
    # Use compaction
    result = compact_parquet_dataset_pyarrow(
        path=path,
        target_mb_per_file=target_mb_per_file,
        target_rows_per_file=target_rows_per_file,
        partition_filter=partition_filter,
        compression=compression,
        dry_run=False,
        filesystem=filesystem,
    )

    if verbose:
        logger.info("Optimization complete: %s", result)

    return result
fsspeckit.datasets.pyarrow.unify_schemas
unify_schemas(
    schemas: list[Schema],
    standardize_timezones: bool = True,
    verbose: bool = False,
) -> Schema

Unify multiple PyArrow schemas into a single schema.

This function handles type conflicts by: 1. Finding fields with conflicting types 2. Attempting to normalize compatible types 3. Using fallback strategies for incompatible types 4. Removing problematic fields if necessary

Parameters:

Name Type Description Default
schemas list[Schema]

List of schemas to unify

required
standardize_timezones bool

Whether to standardize timezone info

True
verbose bool

Whether to print conflict information

False

Returns:

Type Description
Schema

Unified PyArrow schema

Raises:

Type Description
ValueError

If schemas cannot be unified

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def unify_schemas(
    schemas: list[pa.Schema],
    standardize_timezones: bool = True,
    verbose: bool = False,
) -> pa.Schema:
    """Unify multiple PyArrow schemas into a single schema.

    This function handles type conflicts by:
    1. Finding fields with conflicting types
    2. Attempting to normalize compatible types
    3. Using fallback strategies for incompatible types
    4. Removing problematic fields if necessary

    Args:
        schemas: List of schemas to unify
        standardize_timezones: Whether to standardize timezone info
        verbose: Whether to print conflict information

    Returns:
        Unified PyArrow schema

    Raises:
        ValueError: If schemas cannot be unified
    """
    if not schemas:
        raise ValueError("Cannot unify empty list of schemas")

    if len(schemas) == 1:
        return schemas[0]

    # Remove duplicate schemas
    schemas = _unique_schemas(schemas)

    # Standardize timezones if requested
    if standardize_timezones:
        schemas = standardize_schema_timezones(schemas, standardize_timezones)

    # Find conflicts
    conflicts = _find_conflicting_fields(schemas)

    if not conflicts:
        # No conflicts, concatenate all fields
        all_fields = []
        for schema in schemas:
            all_fields.extend(schema)
        return pa.schema(all_fields)

    if verbose:
        _log_conflict_summary(conflicts, verbose)

    # Try to normalize types
    try:
        normalized = _normalize_schema_types(schemas, conflicts)

        # Check if normalization resolved conflicts
        remaining_conflicts = _find_conflicting_fields(normalized)

        if not remaining_conflicts:
            # Normalization successful
            all_fields = []
            for schema in normalized:
                all_fields.extend(schema)
            return pa.schema(all_fields)

        # Fall through to next strategy
        conflicts = remaining_conflicts

    except (pa.ArrowInvalid, pa.ArrowTypeError, pa.ArrowNotImplementedError) as e:
        # Normalization failed, log and continue to fallback
        logger.debug(
            "Schema type normalization failed: %s. Trying aggressive fallback.",
            str(e)
        )

    # Try aggressive fallback
    try:
        return _aggressive_fallback_unification(schemas)
    except (pa.ArrowInvalid, pa.ArrowTypeError, pa.ArrowNotImplementedError) as e:
        # Aggressive fallback failed, log and try last resort
        logger.debug(
            "Aggressive fallback unification failed: %s. Trying last resort cleanup.",
            str(e)
        )

    # Last resort: remove problematic fields
    cleaned = _remove_problematic_fields(schemas)
    all_fields = []
    for schema in cleaned:
        all_fields.extend(schema)

    if verbose and conflicts:
        logger.debug("Removed %d conflicting fields during unification", len(conflicts))

    return pa.schema(all_fields)

fsspeckit.datasets.pyarrow_dataset

PyArrow dataset operations including merge and maintenance helpers.

This module contains functions for dataset-level operations including: - Dataset merging with various strategies - Dataset statistics collection - Dataset compaction and optimization - Maintenance operations

Classes
Functions
fsspeckit.datasets.pyarrow_dataset.collect_dataset_stats_pyarrow
collect_dataset_stats_pyarrow(
    path: str,
    filesystem: AbstractFileSystem | None = None,
    partition_filter: list[str] | None = None,
) -> dict[str, Any]

Collect file-level statistics for a parquet dataset using shared core logic.

This function delegates to the shared fsspeckit.core.maintenance.collect_dataset_stats function, ensuring consistent dataset discovery and statistics across both DuckDB and PyArrow backends.

The helper walks the given dataset directory on the provided filesystem, discovers parquet files (recursively), and returns basic statistics:

  • Per-file path, size in bytes, and number of rows
  • Aggregated total bytes and total rows

The function is intentionally streaming/metadata-driven and never materializes the full dataset as a single :class:pyarrow.Table.

Parameters:

Name Type Description Default
path str

Root directory of the parquet dataset.

required
filesystem AbstractFileSystem | None

Optional fsspec filesystem. If omitted, a local "file" filesystem is used.

None
partition_filter list[str] | None

Optional list of partition prefix filters (e.g. ["date=2025-11-04"]). Only files whose path relative to path starts with one of these prefixes are included.

None

Returns:

Type Description
dict[str, Any]

Dict with keys:

dict[str, Any]
  • files: list of {"path", "size_bytes", "num_rows"} dicts
dict[str, Any]
  • total_bytes: sum of file sizes
dict[str, Any]
  • total_rows: sum of row counts

Raises:

Type Description
FileNotFoundError

If the path does not exist or no parquet files match the optional partition filter.

Note

This is a thin wrapper around the shared core function. See :func:fsspeckit.core.maintenance.collect_dataset_stats for the authoritative implementation.

Source code in src/fsspeckit/datasets/pyarrow_dataset.py
def collect_dataset_stats_pyarrow(
    path: str,
    filesystem: AbstractFileSystem | None = None,
    partition_filter: list[str] | None = None,
) -> dict[str, Any]:
    """Collect file-level statistics for a parquet dataset using shared core logic.

    This function delegates to the shared ``fsspeckit.core.maintenance.collect_dataset_stats``
    function, ensuring consistent dataset discovery and statistics across both DuckDB
    and PyArrow backends.

    The helper walks the given dataset directory on the provided filesystem,
    discovers parquet files (recursively), and returns basic statistics:

    - Per-file path, size in bytes, and number of rows
    - Aggregated total bytes and total rows

    The function is intentionally streaming/metadata-driven and never
    materializes the full dataset as a single :class:`pyarrow.Table`.

    Args:
        path: Root directory of the parquet dataset.
        filesystem: Optional fsspec filesystem. If omitted, a local "file"
            filesystem is used.
        partition_filter: Optional list of partition prefix filters
            (e.g. ["date=2025-11-04"]). Only files whose path relative to
            ``path`` starts with one of these prefixes are included.

    Returns:
        Dict with keys:

        - ``files``: list of ``{"path", "size_bytes", "num_rows"}`` dicts
        - ``total_bytes``: sum of file sizes
        - ``total_rows``: sum of row counts

    Raises:
        FileNotFoundError: If the path does not exist or no parquet files
            match the optional partition filter.

    Note:
        This is a thin wrapper around the shared core function. See
        :func:`fsspeckit.core.maintenance.collect_dataset_stats` for the
        authoritative implementation.
    """
    from fsspeckit.core.maintenance import collect_dataset_stats

    return collect_dataset_stats(
        path=path,
        filesystem=filesystem,
        partition_filter=partition_filter,
    )
fsspeckit.datasets.pyarrow_dataset.compact_parquet_dataset_pyarrow
compact_parquet_dataset_pyarrow(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    dry_run: bool = False,
    filesystem: AbstractFileSystem | None = None,
) -> dict[str, Any]

Compact a parquet dataset directory into fewer larger files using PyArrow and shared planning.

Groups small files based on size (MB) and/or row thresholds, rewrites grouped files into new parquet files, and optionally changes compression. Supports a dry-run mode that returns the compaction plan without modifying files.

The implementation uses the shared core planning algorithm for consistent behavior across backends. It processes data in a group-based, streaming fashion: it reads only the files in a given group into memory when processing that group and never materializes the entire dataset as a single table.

Parameters:

Name Type Description Default
path str

Dataset root directory (local path or fsspec URL).

required
target_mb_per_file int | None

Optional max output size per file; must be > 0.

None
target_rows_per_file int | None

Optional max rows per output file; must be > 0.

None
partition_filter list[str] | None

Optional list of partition prefixes (e.g. ["date=2025-11-15"]) used to limit both stats collection and rewrites to matching paths.

None
compression str | None

Optional parquet compression codec; defaults to "snappy".

None
dry_run bool

When True the function returns a plan + before/after stats without reading or writing any parquet data.

False
filesystem AbstractFileSystem | None

Optional fsspec.AbstractFileSystem to reuse existing FS clients.

None

Returns:

Type Description
dict[str, Any]

A stats dictionary describing before/after file counts, total bytes,

dict[str, Any]

rewritten bytes, and optional planned_groups when dry_run is enabled.

dict[str, Any]

The structure follows the canonical MaintenanceStats format from the shared core.

Raises:

Type Description
ValueError

If thresholds are invalid or no files match partition filter.

FileNotFoundError

If the path does not exist.

Example
1
2
3
4
5
6
7
result = compact_parquet_dataset_pyarrow(
    "/path/to/dataset",
    target_mb_per_file=64,
    dry_run=True,
)
print(f"Files before: {result['before_file_count']}")
print(f"Files after: {result['after_file_count']}")
Note

This function delegates dataset discovery and compaction planning to the shared fsspeckit.core.maintenance module, ensuring consistent behavior across DuckDB and PyArrow backends.

Source code in src/fsspeckit/datasets/pyarrow_dataset.py
def compact_parquet_dataset_pyarrow(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    dry_run: bool = False,
    filesystem: AbstractFileSystem | None = None,
) -> dict[str, Any]:
    """Compact a parquet dataset directory into fewer larger files using PyArrow and shared planning.

    Groups small files based on size (MB) and/or row thresholds, rewrites grouped
    files into new parquet files, and optionally changes compression. Supports a
    dry-run mode that returns the compaction plan without modifying files.

    The implementation uses the shared core planning algorithm for consistent
    behavior across backends. It processes data in a group-based, streaming fashion:
    it reads only the files in a given group into memory when processing that group
    and never materializes the entire dataset as a single table.

    Args:
        path: Dataset root directory (local path or fsspec URL).
        target_mb_per_file: Optional max output size per file; must be > 0.
        target_rows_per_file: Optional max rows per output file; must be > 0.
        partition_filter: Optional list of partition prefixes (e.g. ``["date=2025-11-15"]``)
            used to limit both stats collection and rewrites to matching paths.
        compression: Optional parquet compression codec; defaults to ``"snappy"``.
        dry_run: When ``True`` the function returns a plan + before/after stats
            without reading or writing any parquet data.
        filesystem: Optional ``fsspec.AbstractFileSystem`` to reuse existing FS clients.

    Returns:
        A stats dictionary describing before/after file counts, total bytes,
        rewritten bytes, and optional ``planned_groups`` when ``dry_run`` is enabled.
        The structure follows the canonical ``MaintenanceStats`` format from the shared core.

    Raises:
        ValueError: If thresholds are invalid or no files match partition filter.
        FileNotFoundError: If the path does not exist.

    Example:
        ```python
        result = compact_parquet_dataset_pyarrow(
            "/path/to/dataset",
            target_mb_per_file=64,
            dry_run=True,
        )
        print(f"Files before: {result['before_file_count']}")
        print(f"Files after: {result['after_file_count']}")
        ```

    Note:
        This function delegates dataset discovery and compaction planning to the
        shared ``fsspeckit.core.maintenance`` module, ensuring consistent behavior
        across DuckDB and PyArrow backends.
    """
    from fsspeckit.core.maintenance import plan_compaction_groups, MaintenanceStats

    # Get dataset stats using shared logic
    stats = collect_dataset_stats_pyarrow(
        path=path, filesystem=filesystem, partition_filter=partition_filter
    )
    files = stats["files"]

    # Use shared compaction planning
    plan_result = plan_compaction_groups(
        file_infos=files,
        target_mb_per_file=target_mb_per_file,
        target_rows_per_file=target_rows_per_file,
    )

    groups = plan_result["groups"]
    planned_stats = plan_result["planned_stats"]

    # Update planned stats with compression info
    planned_stats.compression_codec = compression
    planned_stats.dry_run = dry_run

    # If dry run, return the plan
    if dry_run:
        result = planned_stats.to_dict()
        result["planned_groups"] = groups
        return result

    # Execute compaction
    if not groups:
        return planned_stats.to_dict()

    # Execute the compaction
    for group in groups:
        # Read all files in this group
        tables = []
        for file_info in group["files"]:
            file_path = file_info["path"]
            table = pq.read_table(
                file_path,
                filesystem=filesystem,
            )
            tables.append(table)

        # Concatenate tables
        if len(tables) > 1:
            combined = pa.concat_tables(tables, promote_options="permissive")
        else:
            combined = tables[0]

        # Write to output file
        output_path = group["output_path"]
        pq.write_table(
            combined,
            output_path,
            filesystem=filesystem,
            compression=compression or "snappy",
        )

    # Remove original files
    for group in groups:
        for file_info in group["files"]:
            file_path = file_info["path"]
            filesystem.rm(file_path)

    return planned_stats.to_dict()
fsspeckit.datasets.pyarrow_dataset.merge_parquet_dataset_pyarrow
merge_parquet_dataset_pyarrow(
    sources: list[str],
    output_path: str,
    target: str | None = None,
    strategy: str | MergeStrategy = "deduplicate",
    key_columns: list[str] | str | None = None,
    filesystem: AbstractFileSystem | None = None,
    compression: str | None = None,
    row_group_size: int | None = 500000,
    max_rows_per_file: int | None = 5000000,
    verbose: bool = False,
    **kwargs: Any,
) -> MergeStats

Merge multiple parquet datasets using PyArrow with various strategies.

This function provides dataset merging capabilities with support for: - Multiple merge strategies (upsert, insert, update, full_merge, deduplicate) - Key-based merging for relational operations - Batch processing for large datasets - Configurable output settings

Parameters:

Name Type Description Default
sources list[str]

List of source dataset paths

required
output_path str

Path for merged output

required
target str | None

Target dataset path (for upsert/update strategies)

None
strategy str | MergeStrategy

Merge strategy to use

'deduplicate'
key_columns list[str] | str | None

Key columns for merging (required for relational strategies)

None
filesystem AbstractFileSystem | None

fsspec filesystem instance

None
compression str | None

Output compression codec

None
row_group_size int | None

Rows per parquet row group

500000
max_rows_per_file int | None

Max rows per output file

5000000
verbose bool

Print progress information

False
**kwargs Any

Additional arguments

{}

Returns:

Type Description
MergeStats

MergeStats with merge statistics

Raises:

Type Description
ValueError

If required parameters are missing

FileNotFoundError

If sources don't exist

Example
1
2
3
4
5
6
7
8
stats = merge_parquet_dataset_pyarrow(
    sources=["dataset1/", "dataset2/"],
    output_path="merged/",
    strategy="deduplicate",
    key_columns=["id"],
    verbose=True,
)
print(f"Merged {stats.total_rows} rows")
Source code in src/fsspeckit/datasets/pyarrow_dataset.py
def merge_parquet_dataset_pyarrow(
    sources: list[str],
    output_path: str,
    target: str | None = None,
    strategy: str | CoreMergeStrategy = "deduplicate",
    key_columns: list[str] | str | None = None,
    filesystem: AbstractFileSystem | None = None,
    compression: str | None = None,
    row_group_size: int | None = 500_000,
    max_rows_per_file: int | None = 5_000_000,
    verbose: bool = False,
    **kwargs: Any,
) -> MergeStats:
    """Merge multiple parquet datasets using PyArrow with various strategies.

    This function provides dataset merging capabilities with support for:
    - Multiple merge strategies (upsert, insert, update, full_merge, deduplicate)
    - Key-based merging for relational operations
    - Batch processing for large datasets
    - Configurable output settings

    Args:
        sources: List of source dataset paths
        output_path: Path for merged output
        target: Target dataset path (for upsert/update strategies)
        strategy: Merge strategy to use
        key_columns: Key columns for merging (required for relational strategies)
        filesystem: fsspec filesystem instance
        compression: Output compression codec
        row_group_size: Rows per parquet row group
        max_rows_per_file: Max rows per output file
        verbose: Print progress information
        **kwargs: Additional arguments

    Returns:
        MergeStats with merge statistics

    Raises:
        ValueError: If required parameters are missing
        FileNotFoundError: If sources don't exist

    Example:
        ```python
        stats = merge_parquet_dataset_pyarrow(
            sources=["dataset1/", "dataset2/"],
            output_path="merged/",
            strategy="deduplicate",
            key_columns=["id"],
            verbose=True,
        )
        print(f"Merged {stats.total_rows} rows")
        ```
    """
    # Validate inputs using shared core logic
    validate_merge_inputs(
        sources=sources,
        strategy=strategy,
        key_columns=key_columns,
        target=target,
    )

    validate_strategy_compatibility(strategy, key_columns, target)

    # Normalize parameters
    if key_columns is not None:
        key_columns = _normalize_key_columns(key_columns)

    # Get filesystem
    if filesystem is None:
        filesystem = fsspec_filesystem("file")

    pa_filesystem = _ensure_pyarrow_filesystem(filesystem)

    # Load target if provided
    target_table = None
    if target and strategy in ["upsert", "update"]:
        target_table = _load_source_table_pyarrow(target, filesystem)

        if key_columns:
            _ensure_no_null_keys_table(target_table, key_columns)

    # Process sources
    merged_data = []
    total_rows = 0

    for source_path in sources:
        if verbose:
            logger.info("Processing source: %s", source_path)

        source_table = _load_source_table_pyarrow(source_path, filesystem)

        if key_columns:
            _ensure_no_null_keys_table(source_table, key_columns)

        if strategy == "full_merge":
            # Simply concatenate all data
            merged_data.append(source_table)
            total_rows += source_table.num_rows

        elif strategy == "deduplicate":
            # Remove duplicates based on key columns
            if key_columns:
                # Group by keys and keep first occurrence
                table = source_table

                # Use PyArrow's group_by for deduplication
                # This is a simplified implementation
                groups = table.group_by(key_columns).aggregate([])
                keys = groups.select(key_columns)

                # Get unique keys
                unique_keys = []
                for row in keys.to_pylist():
                    unique_keys.append(tuple(row[col] for col in key_columns))

                # Filter to keep only unique rows
                filtered = []
                for row in table.to_pylist():
                    key = tuple(row[col] for col in key_columns)
                    if key in unique_keys:
                        filtered.append(row)
                        unique_keys.remove(key)  # Remove to avoid duplicates

                if filtered:
                    deduped = pa.Table.from_pylist(filtered, schema=table.schema)
                    merged_data.append(deduped)
                    total_rows += deduped.num_rows
            else:
                # No key columns, remove exact duplicates
                merged_data.append(source_table)
                total_rows += source_table.num_rows

        elif strategy in ["upsert", "insert", "update"] and target_table is not None:
            # Key-based relational operations
            if strategy == "insert":
                # Only insert non-existing rows
                target_keys = _extract_key_tuples(target_table, key_columns)
                source_keys = _extract_key_tuples(source_table, key_columns)

                # Find keys that don't exist in target
                new_keys = set(source_keys) - set(target_keys)

                # Filter source for new keys
                new_rows = []
                for row in source_table.to_pylist():
                    key = tuple(row[col] for col in key_columns)
                    if key in new_keys:
                        new_rows.append(row)

                if new_rows:
                    new_table = pa.Table.from_pylist(new_rows, schema=source_table.schema)
                    merged_data.append(new_table)
                    total_rows += new_table.num_rows

            elif strategy == "update":
                # Update existing rows
                target_keys = _extract_key_tuples(target_table, key_columns)
                source_keys = _extract_key_tuples(source_table, key_columns)

                # Find common keys
                common_keys = set(source_keys) & set(target_keys)

                # Build updated target
                updated_data = []

                # Keep non-matching rows from target
                for row in target_table.to_pylist():
                    key = tuple(row[col] for col in key_columns)
                    if key not in common_keys:
                        updated_data.append(row)

                # Add updated rows from source
                for row in source_table.to_pylist():
                    key = tuple(row[col] for col in key_columns)
                    if key in common_keys:
                        updated_data.append(row)

                if updated_data:
                    updated_table = pa.Table.from_pylist(updated_data, schema=target_table.schema)
                    merged_data.append(updated_table)
                    total_rows += updated_table.num_rows

            elif strategy == "upsert":
                # Insert or update
                all_data = [target_table] if target_table else []
                all_data.append(source_table)

                if all_data:
                    combined = pa.concat_tables(all_data, promote_options="permissive")

                    # Deduplicate based on keys
                    if key_columns:
                        # Group by keys and keep last occurrence
                        # This is a simplified implementation
                        groups = combined.group_by(key_columns).aggregate([])
                        keys = groups.select(key_columns)

                        unique_keys = []
                        for row in keys.to_pylist():
                            unique_keys.append(tuple(row[col] for col in key_columns))

                        # Keep only last occurrence of each key
                        filtered = []
                        seen = set()
                        for row in reversed(combined.to_pylist()):
                            key = tuple(row[col] for col in key_columns)
                            if key not in seen:
                                filtered.append(row)
                                seen.add(key)

                        if filtered:
                            deduped = pa.Table.from_pylist(
                                list(reversed(filtered)), schema=combined.schema
                            )
                            merged_data.append(deduped)
                            total_rows += deduped.num_rows
                    else:
                        merged_data.append(combined)
                        total_rows += combined.num_rows

    # Combine all data
    if merged_data:
        if len(merged_data) == 1:
            final_table = merged_data[0]
        else:
            final_table = pa.concat_tables(
                merged_data, promote_options="permissive"
            )
    else:
        # No data to merge
        final_table = pa.table({})

    # Write output
    pq.write_table(
        final_table,
        output_path,
        filesystem=pa_filesystem,
        compression=compression,
        row_group_size=row_group_size,
        max_rows_per_file=max_rows_per_file,
    )

    # Calculate stats
    stats = calculate_merge_stats(
        sources=sources,
        target=output_path,
        strategy=strategy,
        total_rows=total_rows,
        output_rows=final_table.num_rows,
    )

    if verbose:
        logger.info("\nMerge complete: %s", stats)

    return stats
fsspeckit.datasets.pyarrow_dataset.optimize_parquet_dataset_pyarrow
optimize_parquet_dataset_pyarrow(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    filesystem: AbstractFileSystem | None = None,
    verbose: bool = False,
) -> dict[str, Any]

Optimize a parquet dataset through compaction and optional statistics recalculation.

This is a convenience function that combines compaction with optional statistics recalculation. It's particularly useful after many small write operations have created a large number of small files.

Parameters:

Name Type Description Default
path str

Dataset root directory

required
target_mb_per_file int | None

Target size per file in MB

None
target_rows_per_file int | None

Target rows per file

None
partition_filter list[str] | None

Optional partition filters

None
compression str | None

Compression codec to use

None
filesystem AbstractFileSystem | None

Optional filesystem instance

None
verbose bool

Print progress information

False

Returns:

Type Description
dict[str, Any]

Optimization statistics

Example
1
2
3
4
5
6
7
8
9
stats = optimize_parquet_dataset_pyarrow(
    "dataset/",
    target_mb_per_file=64,
    compression="zstd",
)
print(
    f"Reduced from {stats['before_file_count']} "
    f"to {stats['after_file_count']} files",
)
Source code in src/fsspeckit/datasets/pyarrow_dataset.py
def optimize_parquet_dataset_pyarrow(
    path: str,
    target_mb_per_file: int | None = None,
    target_rows_per_file: int | None = None,
    partition_filter: list[str] | None = None,
    compression: str | None = None,
    filesystem: AbstractFileSystem | None = None,
    verbose: bool = False,
) -> dict[str, Any]:
    """Optimize a parquet dataset through compaction and optional statistics recalculation.

    This is a convenience function that combines compaction with optional statistics
    recalculation. It's particularly useful after many small write operations have
    created a large number of small files.

    Args:
        path: Dataset root directory
        target_mb_per_file: Target size per file in MB
        target_rows_per_file: Target rows per file
        partition_filter: Optional partition filters
        compression: Compression codec to use
        filesystem: Optional filesystem instance
        verbose: Print progress information

    Returns:
        Optimization statistics

    Example:
        ```python
        stats = optimize_parquet_dataset_pyarrow(
            "dataset/",
            target_mb_per_file=64,
            compression="zstd",
        )
        print(
            f"Reduced from {stats['before_file_count']} "
            f"to {stats['after_file_count']} files",
        )
        ```
    """
    # Use compaction
    result = compact_parquet_dataset_pyarrow(
        path=path,
        target_mb_per_file=target_mb_per_file,
        target_rows_per_file=target_rows_per_file,
        partition_filter=partition_filter,
        compression=compression,
        dry_run=False,
        filesystem=filesystem,
    )

    if verbose:
        logger.info("Optimization complete: %s", result)

    return result

fsspeckit.datasets.pyarrow_schema

PyArrow schema utilities for type inference, unification, and optimization.

This module contains functions for working with PyArrow schemas including: - Schema unification across multiple tables - Type inference and optimization - Timezone handling - Schema casting

Classes
Functions
fsspeckit.datasets.pyarrow_schema.cast_schema
cast_schema(table: Table, schema: Schema) -> Table

Cast a PyArrow table to a target schema.

Parameters:

Name Type Description Default
table Table

Source table

required
schema Schema

Target schema

required

Returns:

Type Description
Table

Table cast to target schema

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def cast_schema(table: pa.Table, schema: pa.Schema) -> pa.Table:
    """Cast a PyArrow table to a target schema.

    Args:
        table: Source table
        schema: Target schema

    Returns:
        Table cast to target schema
    """
    # Filter schema to only include columns present in the table
    table_schema = table.schema
    valid_fields = []

    for field in schema:
        if field.name in table_schema.names:
            valid_fields.append(field)

    target_schema = pa.schema(valid_fields)

    # Cast the table
    return table.cast(target_schema)
fsspeckit.datasets.pyarrow_schema.convert_large_types_to_normal
convert_large_types_to_normal(schema: Schema) -> Schema

Convert large types (like large_string) to normal types.

Parameters:

Name Type Description Default
schema Schema

PyArrow schema

required

Returns:

Type Description
Schema

Schema with large types converted

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def convert_large_types_to_normal(schema: pa.Schema) -> pa.Schema:
    """Convert large types (like large_string) to normal types.

    Args:
        schema: PyArrow schema

    Returns:
        Schema with large types converted
    """
    fields = []
    for field in schema:
        if pa.types.is_large_string(field.type):
            field = field.with_type(pa.string())
        elif pa.types.is_large_utf8(field.type):
            field = field.with_type(pa.utf8())
        elif pa.types.is_large_list(field.type):
            field = field.with_type(pa.list_(field.type.value_type))
        fields.append(field)

    return pa.schema(fields)
fsspeckit.datasets.pyarrow_schema.dominant_timezone_per_column
dominant_timezone_per_column(
    schemas: list[Schema],
) -> dict[str, str]

Determine the dominant timezone for each column across schemas.

Parameters:

Name Type Description Default
schemas list[Schema]

List of PyArrow schemas to analyze

required

Returns:

Name Type Description
dict dict[str, str]

Mapping of column names to dominant timezone strings

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def dominant_timezone_per_column(schemas: list[pa.Schema]) -> dict[str, str]:
    """Determine the dominant timezone for each column across schemas.

    Args:
        schemas: List of PyArrow schemas to analyze

    Returns:
        dict: Mapping of column names to dominant timezone strings
    """
    from collections import Counter

    timezone_counts: dict[str, Counter] = {}

    for schema in schemas:
        for field in schema:
            if pa.types.is_timestamp(field.type):
                tz = field.type.tz
                if tz:
                    if field.name not in timezone_counts:
                        timezone_counts[field.name] = Counter()
                    timezone_counts[field.name][tz] += 1

    # Select dominant timezone for each column
    result = {}
    for col_name, counter in timezone_counts.items():
        result[col_name] = counter.most_common(1)[0][0]

    return result
fsspeckit.datasets.pyarrow_schema.opt_dtype
opt_dtype(
    table: Table,
    strict: bool = False,
    columns: list[str] | None = None,
) -> Table

Optimize dtypes in a PyArrow table based on data analysis.

This function analyzes the data in each column and attempts to downcast to more appropriate types (e.g., int64 -> int32, float64 -> float32, string -> int/bool where applicable).

Parameters:

Name Type Description Default
table Table

PyArrow table to optimize

required
strict bool

Whether to use strict type checking

False
columns list[str] | None

List of columns to optimize (None for all)

None

Returns:

Type Description
Table

Table with optimized dtypes

Example
import pyarrow as pa

table = pa.table(
    {
        "a": pa.array([1, 2, 3], type=pa.int64()),
        "b": pa.array([1.0, 2.0, 3.0], type=pa.float64()),
    },
)
optimized = opt_dtype(table)
print(optimized.column(0).type)  # DataType(int32)
print(optimized.column(1).type)  # DataType(float32)
Source code in src/fsspeckit/datasets/pyarrow_schema.py
def opt_dtype(
    table: pa.Table,
    strict: bool = False,
    columns: list[str] | None = None,
) -> pa.Table:
    """Optimize dtypes in a PyArrow table based on data analysis.

    This function analyzes the data in each column and attempts to downcast
    to more appropriate types (e.g., int64 -> int32, float64 -> float32,
    string -> int/bool where applicable).

    Args:
        table: PyArrow table to optimize
        strict: Whether to use strict type checking
        columns: List of columns to optimize (None for all)

    Returns:
        Table with optimized dtypes

    Example:
        ```python
        import pyarrow as pa

        table = pa.table(
            {
                "a": pa.array([1, 2, 3], type=pa.int64()),
                "b": pa.array([1.0, 2.0, 3.0], type=pa.float64()),
            },
        )
        optimized = opt_dtype(table)
        print(optimized.column(0).type)  # DataType(int32)
        print(optimized.column(1).type)  # DataType(float32)
        ```
    """
    from fsspeckit.common.misc import run_parallel

    if columns is None:
        columns = table.column_names

    # Process columns in parallel
    results = run_parallel(
        _process_column_for_opt_dtype,
        [(table, col, strict) for col in columns],
        backend="threading",
        n_jobs=-1,
    )

    # Build new table with optimized columns
    new_columns = {}
    for col_name, optimized_array in results:
        new_columns[col_name] = optimized_array

    # Keep non-optimized columns as-is
    for col_name in table.column_names:
        if col_name not in new_columns:
            new_columns[col_name] = table.column(col_name)

    return pa.table(new_columns)
fsspeckit.datasets.pyarrow_schema.remove_empty_columns
remove_empty_columns(table: Table) -> Table

Remove empty columns from a PyArrow table.

Parameters:

Name Type Description Default
table Table

PyArrow table

required

Returns:

Type Description
Table

Table with empty columns removed

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def remove_empty_columns(table: pa.Table) -> pa.Table:
    """Remove empty columns from a PyArrow table.

    Args:
        table: PyArrow table

    Returns:
        Table with empty columns removed
    """
    empty_cols = _identify_empty_columns(table)
    if not empty_cols:
        return table

    return table.drop(empty_cols)
fsspeckit.datasets.pyarrow_schema.standardize_schema_timezones
standardize_schema_timezones(
    schemas: list[Schema],
    standardize_timezones: bool = True,
) -> list[Schema]

Standardize timezone information across schemas.

Parameters:

Name Type Description Default
schemas list[Schema]

List of schemas to standardize

required
standardize_timezones bool

Whether to standardize timezones

True

Returns:

Type Description
list[Schema]

List of schemas with standardized timezones

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def standardize_schema_timezones(
    schemas: list[pa.Schema],
    standardize_timezones: bool = True,
) -> list[pa.Schema]:
    """Standardize timezone information across schemas.

    Args:
        schemas: List of schemas to standardize
        standardize_timezones: Whether to standardize timezones

    Returns:
        List of schemas with standardized timezones
    """
    if not standardize_timezones:
        return schemas

    return standardize_schema_timezones_by_majority(schemas)
fsspeckit.datasets.pyarrow_schema.standardize_schema_timezones_by_majority
standardize_schema_timezones_by_majority(
    schemas: list[Schema],
) -> list[Schema]

Standardize timezone information across schemas based on majority.

Parameters:

Name Type Description Default
schemas list[Schema]

List of schemas to standardize

required

Returns:

Type Description
list[Schema]

List of schemas with standardized timezones

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def standardize_schema_timezones_by_majority(
    schemas: list[pa.Schema],
) -> list[pa.Schema]:
    """Standardize timezone information across schemas based on majority.

    Args:
        schemas: List of schemas to standardize

    Returns:
        List of schemas with standardized timezones
    """
    # Get dominant timezones
    dominant_tz = dominant_timezone_per_column(schemas)

    # Apply dominant timezone to all schemas
    standardized = []
    for schema in schemas:
        fields = []
        for field in schema:
            if pa.types.is_timestamp(field.type) and field.name in dominant_tz:
                # Update timezone to dominant one
                tz = dominant_tz[field.name]
                if field.type.tz != tz:
                    field = field.with_type(
                        pa.timestamp("us", tz=tz) if tz else pa.timestamp("us")
                    )
            fields.append(field)
        standardized.append(pa.schema(fields))

    return standardized
fsspeckit.datasets.pyarrow_schema.unify_schemas
unify_schemas(
    schemas: list[Schema],
    standardize_timezones: bool = True,
    verbose: bool = False,
) -> Schema

Unify multiple PyArrow schemas into a single schema.

This function handles type conflicts by: 1. Finding fields with conflicting types 2. Attempting to normalize compatible types 3. Using fallback strategies for incompatible types 4. Removing problematic fields if necessary

Parameters:

Name Type Description Default
schemas list[Schema]

List of schemas to unify

required
standardize_timezones bool

Whether to standardize timezone info

True
verbose bool

Whether to print conflict information

False

Returns:

Type Description
Schema

Unified PyArrow schema

Raises:

Type Description
ValueError

If schemas cannot be unified

Source code in src/fsspeckit/datasets/pyarrow_schema.py
def unify_schemas(
    schemas: list[pa.Schema],
    standardize_timezones: bool = True,
    verbose: bool = False,
) -> pa.Schema:
    """Unify multiple PyArrow schemas into a single schema.

    This function handles type conflicts by:
    1. Finding fields with conflicting types
    2. Attempting to normalize compatible types
    3. Using fallback strategies for incompatible types
    4. Removing problematic fields if necessary

    Args:
        schemas: List of schemas to unify
        standardize_timezones: Whether to standardize timezone info
        verbose: Whether to print conflict information

    Returns:
        Unified PyArrow schema

    Raises:
        ValueError: If schemas cannot be unified
    """
    if not schemas:
        raise ValueError("Cannot unify empty list of schemas")

    if len(schemas) == 1:
        return schemas[0]

    # Remove duplicate schemas
    schemas = _unique_schemas(schemas)

    # Standardize timezones if requested
    if standardize_timezones:
        schemas = standardize_schema_timezones(schemas, standardize_timezones)

    # Find conflicts
    conflicts = _find_conflicting_fields(schemas)

    if not conflicts:
        # No conflicts, concatenate all fields
        all_fields = []
        for schema in schemas:
            all_fields.extend(schema)
        return pa.schema(all_fields)

    if verbose:
        _log_conflict_summary(conflicts, verbose)

    # Try to normalize types
    try:
        normalized = _normalize_schema_types(schemas, conflicts)

        # Check if normalization resolved conflicts
        remaining_conflicts = _find_conflicting_fields(normalized)

        if not remaining_conflicts:
            # Normalization successful
            all_fields = []
            for schema in normalized:
                all_fields.extend(schema)
            return pa.schema(all_fields)

        # Fall through to next strategy
        conflicts = remaining_conflicts

    except (pa.ArrowInvalid, pa.ArrowTypeError, pa.ArrowNotImplementedError) as e:
        # Normalization failed, log and continue to fallback
        logger.debug(
            "Schema type normalization failed: %s. Trying aggressive fallback.",
            str(e)
        )

    # Try aggressive fallback
    try:
        return _aggressive_fallback_unification(schemas)
    except (pa.ArrowInvalid, pa.ArrowTypeError, pa.ArrowNotImplementedError) as e:
        # Aggressive fallback failed, log and try last resort
        logger.debug(
            "Aggressive fallback unification failed: %s. Trying last resort cleanup.",
            str(e)
        )

    # Last resort: remove problematic fields
    cleaned = _remove_problematic_fields(schemas)
    all_fields = []
    for schema in cleaned:
        all_fields.extend(schema)

    if verbose and conflicts:
        logger.debug("Removed %d conflicting fields during unification", len(conflicts))

    return pa.schema(all_fields)