Skip to content

fsspeckit.common API Reference

common

Cross-cutting utilities for fsspeckit.

This package contains utilities that are shared across different components: - Datetime parsing and manipulation utilities - Logging configuration and helpers - General purpose utility functions - Polars DataFrame optimization and manipulation - Type conversion and data transformation utilities

Functions

fsspeckit.common.dict_to_dataframe

dict_to_dataframe(
    data: Union[dict, list[dict]],
    unique: Union[bool, list[str], str] = False,
) -> Any

Convert a dictionary or list of dictionaries to a Polars DataFrame.

Handles various input formats: - Single dict with list values → DataFrame rows - Single dict with scalar values → Single row DataFrame - List of dicts with scalar values → Multi-row DataFrame - List of dicts with list values → DataFrame with list columns

Parameters:

Name Type Description Default
data Union[dict, list[dict]]

Dictionary or list of dictionaries to convert.

required
unique Union[bool, list[str], str]

If True, remove duplicate rows. Can also specify columns.

False

Returns:

Type Description
Any

Polars DataFrame containing the converted data.

Examples:

>>> # Single dict with list values
>>> data = {'a': [1, 2, 3], 'b': [4, 5, 6]}
>>> dict_to_dataframe(data)
shape: (3, 2)
┌─────┬─────┐
│ a   ┆ b   │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1   ┆ 4   │
│ 2   ┆ 5   │
│ 3   ┆ 6   │
└─────┴─────┘
>>> # Single dict with scalar values
>>> data = {'a': 1, 'b': 2}
>>> dict_to_dataframe(data)
shape: (1, 2)
┌─────┬─────┐
│ a   ┆ b   │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1   ┆ 2   │
└─────┴─────┘
>>> # List of dicts with scalar values
>>> data = [{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]
>>> dict_to_dataframe(data)
shape: (2, 2)
┌─────┬─────┐
│ a   ┆ b   │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1   ┆ 2   │
│ 3   ┆ 4   │
└─────┴─────┘
Source code in src/fsspeckit/common/types.py
def dict_to_dataframe(
    data: Union[dict, list[dict]], unique: Union[bool, list[str], str] = False
) -> Any:
    """Convert a dictionary or list of dictionaries to a Polars DataFrame.

    Handles various input formats:
    - Single dict with list values → DataFrame rows
    - Single dict with scalar values → Single row DataFrame
    - List of dicts with scalar values → Multi-row DataFrame
    - List of dicts with list values → DataFrame with list columns

    Args:
        data: Dictionary or list of dictionaries to convert.
        unique: If True, remove duplicate rows. Can also specify columns.

    Returns:
        Polars DataFrame containing the converted data.

    Examples:
        >>> # Single dict with list values
        >>> data = {'a': [1, 2, 3], 'b': [4, 5, 6]}
        >>> dict_to_dataframe(data)
        shape: (3, 2)
        ┌─────┬─────┐
        │ a   ┆ b   │
        │ --- ┆ --- │
        │ i64 ┆ i64 │
        ╞═════╪═════╡
        │ 1   ┆ 4   │
        │ 2   ┆ 5   │
        │ 3   ┆ 6   │
        └─────┴─────┘

        >>> # Single dict with scalar values
        >>> data = {'a': 1, 'b': 2}
        >>> dict_to_dataframe(data)
        shape: (1, 2)
        ┌─────┬─────┐
        │ a   ┆ b   │
        │ --- ┆ --- │
        │ i64 ┆ i64 │
        ╞═════╪═════╡
        │ 1   ┆ 2   │
        └─────┴─────┘

        >>> # List of dicts with scalar values
        >>> data = [{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]
        >>> dict_to_dataframe(data)
        shape: (2, 2)
        ┌─────┬─────┐
        │ a   ┆ b   │
        │ --- ┆ --- │
        │ i64 ┆ i64 │
        ╞═════╪═════╡
        │ 1   ┆ 2   │
        │ 3   ┆ 4   │
        └─────┴─────┘
    """
    from fsspeckit.common.optional import _import_polars

    pl = _import_polars()

    if isinstance(data, list):
        # If it's a single-element list, just use the first element
        if len(data) == 1:
            data = data[0]
        # If it's a list of dicts
        else:
            first_item = data[0]
            # Check if the dict values are lists/tuples
            if any(isinstance(v, (list, tuple)) for v in first_item.values()):
                # Each dict becomes a row with list/tuple values
                data = pl.DataFrame(data)
            else:
                # If values are scalars, convert list of dicts to DataFrame
                data = pl.DataFrame(data)

            if unique:
                data = data.unique(
                    subset=None if not isinstance(unique, (str, list)) else unique,
                    maintain_order=True,
                )
            return data

    # If it's a single dict
    if isinstance(data, dict):
        # Check if values are lists/tuples
        if any(isinstance(v, (list, tuple)) for v in data.values()):
            # Get the length of any list value (assuming all lists have same length)
            length = len(next(v for v in data.values() if isinstance(v, (list, tuple))))
            # Convert to DataFrame where each list element becomes a row
            data = pl.DataFrame(
                {
                    k: v if isinstance(v, (list, tuple)) else [v] * length
                    for k, v in data.items()
                }
            )
        else:
            # If values are scalars, wrap them in a list to create a single row
            data = pl.DataFrame({k: [v] for k, v in data.items()})

        if unique:
            data = data.unique(
                subset=None if not isinstance(unique, (str, list)) else unique,
                maintain_order=True,
            )
        return data

    raise ValueError("Input must be a dictionary or list of dictionaries")

fsspeckit.common.get_logger

get_logger(name: str = 'fsspeckit') -> logger

Get a logger instance for the given name.

Parameters:

Name Type Description Default
name str

Logger name, typically the module name.

'fsspeckit'

Returns:

Type Description
logger

Configured logger instance.

Example
logger = get_logger(__name__)
logger.info("This is a log message")
Source code in src/fsspeckit/common/logging.py
def get_logger(name: str = "fsspeckit") -> "logger":
    """Get a logger instance for the given name.

    Args:
        name: Logger name, typically the module name.

    Returns:
        Configured logger instance.

    Example:
        ```python
        logger = get_logger(__name__)
        logger.info("This is a log message")
        ```
    """
    return logger.bind(name=name)

fsspeckit.common.get_partitions_from_path

get_partitions_from_path(
    path: str,
    partitioning: Union[str, list[str], None] = None,
) -> list[tuple]

Extract dataset partitions from a file path.

Parses file paths to extract partition information based on different partitioning schemes. This is the canonical implementation used across all fsspeckit backends.

Parameters:

Name Type Description Default
path str

File path potentially containing partition information.

required
partitioning Union[str, list[str], None]

Partitioning scheme: - "hive": Hive-style partitioning (key=value) - str: Single partition column name - list[str]: Multiple partition column names - None: Return empty list

None

Returns:

Type Description
list[tuple]

List of tuples containing (column, value) pairs.

Examples:

1
2
3
>>> # Hive-style partitioning
>>> get_partitions_from_path("data/year=2023/month=01/file.parquet", "hive")
[('year', '2023'), ('month', '01')]
1
2
3
>>> # Single partition column
>>> get_partitions_from_path("data/2023/01/file.parquet", "year")
[('year', '2023')]
1
2
3
>>> # Multiple partition columns
>>> get_partitions_from_path("data/2023/01/file.parquet", ["year", "month"])
[('year', '2023'), ('month', '01')]
1
2
3
>>> # No partitioning
>>> get_partitions_from_path("data/file.parquet", None)
[]
Source code in src/fsspeckit/common/partitions.py
def get_partitions_from_path(
    path: str, partitioning: Union[str, list[str], None] = None
) -> list[tuple]:
    """
    Extract dataset partitions from a file path.

    Parses file paths to extract partition information based on
    different partitioning schemes. This is the canonical implementation
    used across all fsspeckit backends.

    Args:
        path: File path potentially containing partition information.
        partitioning: Partitioning scheme:
            - "hive": Hive-style partitioning (key=value)
            - str: Single partition column name
            - list[str]: Multiple partition column names
            - None: Return empty list

    Returns:
        List of tuples containing (column, value) pairs.

    Examples:
        >>> # Hive-style partitioning
        >>> get_partitions_from_path("data/year=2023/month=01/file.parquet", "hive")
        [('year', '2023'), ('month', '01')]

        >>> # Single partition column
        >>> get_partitions_from_path("data/2023/01/file.parquet", "year")
        [('year', '2023')]

        >>> # Multiple partition columns
        >>> get_partitions_from_path("data/2023/01/file.parquet", ["year", "month"])
        [('year', '2023'), ('month', '01')]

        >>> # No partitioning
        >>> get_partitions_from_path("data/file.parquet", None)
        []
    """
    if "." in path:
        path = os.path.dirname(path)

    parts = path.split("/")

    if isinstance(partitioning, str):
        if partitioning == "hive":
            return [tuple(p.split("=")) for p in parts if "=" in p]
        else:
            # Single partition column - take the first directory that looks like a value
            # This is a simple heuristic for cases like data/2023/file.parquet
            if parts:
                return [(partitioning, parts[0])]
            return []
    elif isinstance(partitioning, list):
        # Multiple partition columns - map column names to path parts from right to left
        if not parts:
            return []

        # Take the last N parts where N is the number of partition columns
        partition_parts = (
            parts[-len(partitioning) :] if len(parts) >= len(partitioning) else parts
        )
        return list(zip(partitioning, partition_parts))
    else:
        return []

fsspeckit.common.get_timedelta_str

get_timedelta_str(
    timedelta_string: str, to: str = "polars"
) -> str

Convert timedelta strings between different formats.

Converts timedelta strings between Polars and DuckDB formats, with graceful fallback for unknown units. Never raises errors for unknown units - instead returns a reasonable string representation.

Parameters:

Name Type Description Default
timedelta_string str

Input timedelta string (e.g., "1h", "2d", "5invalid").

required
to str

Target format - "polars" or "duckdb". Defaults to "polars".

'polars'

Returns:

Type Description
str

String in the target format. For unknown units, returns "value unit"

str

format without raising errors.

Examples:

>>> # Valid Polars units
>>> get_timedelta_str("1h", to="polars")
'1h'
>>> get_timedelta_str("1d", to="polars")
'1d'
>>>
>>> # Convert to DuckDB format
>>> get_timedelta_str("1h", to="duckdb")
'1 hour'
>>> get_timedelta_str("1s", to="duckdb")
'1 second'
>>>
>>> # Unknown units - graceful fallback
>>> get_timedelta_str("1invalid", to="polars")
'1 invalid'
>>> get_timedelta_str("5unknown", to="duckdb")
'5 unknown'
Source code in src/fsspeckit/common/datetime.py
def get_timedelta_str(timedelta_string: str, to: str = "polars") -> str:
    """Convert timedelta strings between different formats.

    Converts timedelta strings between Polars and DuckDB formats, with graceful
    fallback for unknown units. Never raises errors for unknown units - instead
    returns a reasonable string representation.

    Args:
        timedelta_string: Input timedelta string (e.g., "1h", "2d", "5invalid").
        to: Target format - "polars" or "duckdb". Defaults to "polars".

    Returns:
        String in the target format. For unknown units, returns "value unit"
        format without raising errors.

    Examples:
        >>> # Valid Polars units
        >>> get_timedelta_str("1h", to="polars")
        '1h'
        >>> get_timedelta_str("1d", to="polars")
        '1d'
        >>>
        >>> # Convert to DuckDB format
        >>> get_timedelta_str("1h", to="duckdb")
        '1 hour'
        >>> get_timedelta_str("1s", to="duckdb")
        '1 second'
        >>>
        >>> # Unknown units - graceful fallback
        >>> get_timedelta_str("1invalid", to="polars")
        '1 invalid'
        >>> get_timedelta_str("5unknown", to="duckdb")
        '5 unknown'
    """
    polars_timedelta_units = [
        "ns",
        "us",
        "ms",
        "s",
        "m",
        "h",
        "d",
        "w",
        "mo",
        "y",
    ]
    duckdb_timedelta_units = [
        "nanosecond",
        "microsecond",
        "millisecond",
        "second",
        "minute",
        "hour",
        "day",
        "week",
        "month",
        "year",
    ]

    unit = re.sub("[0-9]", "", timedelta_string).strip()
    val = timedelta_string.replace(unit, "").strip()
    base_unit = re.sub("s$", "", unit)

    if to == "polars":
        # Already a valid Polars unit
        if unit in polars_timedelta_units:
            return timedelta_string
        # Try to map known DuckDB units to Polars units
        mapping = dict(zip(duckdb_timedelta_units, polars_timedelta_units))
        target = mapping.get(base_unit)
        if target is not None:
            return f"{val}{target}"
        # Fallback for unknown units: preserve value and unit as-is
        return f"{val} {unit}".strip()

    # DuckDB branch
    if unit in polars_timedelta_units:
        mapping = dict(zip(polars_timedelta_units, duckdb_timedelta_units))
        return f"{val} {mapping[unit]}"

    # Unknown Polars-style unit when targeting DuckDB: keep unit without trailing "s"
    return f"{val} {base_unit}".strip()

fsspeckit.common.get_timestamp_column

get_timestamp_column(df: Any) -> Union[str, list[str]]

Get timestamp column names from a DataFrame or PyArrow Table.

Automatically detects and normalizes different DataFrame types to work with pandas DataFrames, Polars DataFrames/LazyFrames, and PyArrow Tables.

Parameters:

Name Type Description Default
df Any

A Polars DataFrame/LazyFrame, PyArrow Table, or pandas DataFrame. The function automatically converts pandas DataFrames and PyArrow Tables to Polars LazyFrames for consistent timestamp detection.

required

Returns:

Type Description
Union[str, list[str]]

List of strings containing timestamp column names. Returns an empty list

Union[str, list[str]]

if no timestamp columns are found.

Examples:

>>> import pandas as pd
>>> import polars as pl
>>> import pyarrow as pa
>>>
>>> # Works with pandas DataFrames
>>> df_pd = pd.DataFrame({"ts": pd.date_range("2023-01-01", periods=3)})
>>> get_timestamp_column(df_pd)
['ts']
>>>
>>> # Works with Polars DataFrames
>>> df_pl = pl.DataFrame({"ts": [datetime(2023, 1, 1)]})
>>> get_timestamp_column(df_pl)
['ts']
>>>
>>> # Works with PyArrow Tables
>>> table = pa.table({"ts": pa.array([datetime(2023, 1, 1)])})
>>> get_timestamp_column(table)
['ts']
Source code in src/fsspeckit/common/datetime.py
def get_timestamp_column(df: Any) -> Union[str, list[str]]:
    """Get timestamp column names from a DataFrame or PyArrow Table.

    Automatically detects and normalizes different DataFrame types to work with
    pandas DataFrames, Polars DataFrames/LazyFrames, and PyArrow Tables.

    Args:
        df: A Polars DataFrame/LazyFrame, PyArrow Table, or pandas DataFrame.
            The function automatically converts pandas DataFrames and PyArrow Tables
            to Polars LazyFrames for consistent timestamp detection.

    Returns:
        List of strings containing timestamp column names. Returns an empty list
        if no timestamp columns are found.

    Examples:
        >>> import pandas as pd
        >>> import polars as pl
        >>> import pyarrow as pa
        >>>
        >>> # Works with pandas DataFrames
        >>> df_pd = pd.DataFrame({"ts": pd.date_range("2023-01-01", periods=3)})
        >>> get_timestamp_column(df_pd)
        ['ts']
        >>>
        >>> # Works with Polars DataFrames
        >>> df_pl = pl.DataFrame({"ts": [datetime(2023, 1, 1)]})
        >>> get_timestamp_column(df_pl)
        ['ts']
        >>>
        >>> # Works with PyArrow Tables
        >>> table = pa.table({"ts": pa.array([datetime(2023, 1, 1)])})
        >>> get_timestamp_column(table)
        ['ts']
    """
    from fsspeckit.common.optional import (
        _import_pandas,
        _import_polars,
        _import_pyarrow,
    )

    pl = _import_polars()
    pa = _import_pyarrow()
    pd = _import_pandas()

    # Import polars.selectors at runtime
    import polars.selectors as cs

    # Normalise supported input types to a Polars LazyFrame
    if isinstance(df, pa.Table):
        df = pl.from_arrow(df).lazy()
    elif isinstance(df, pd.DataFrame):
        df = pl.from_pandas(df).lazy()

    return df.select(cs.datetime() | cs.date()).collect_schema().names()

fsspeckit.common.opt_dtype_pl

opt_dtype_pl(
    df: DataFrame,
    include: str | list[str] | None = None,
    exclude: str | list[str] | None = None,
    time_zone: str | None = None,
    shrink_numerics: bool = False,
    allow_unsigned: bool = True,
    allow_null: bool = True,
    sample_size: int | None = 1024,
    sample_method: SampleMethod = "first",
    strict: bool = False,
    *,
    force_timezone: str | None = None,
) -> DataFrame

Optimize data types of a Polars DataFrame for performance and memory efficiency.

This function analyzes each column and converts it to the most appropriate data type based on content, handling string-to-type conversions and numeric type downcasting.

Parameters:

Name Type Description Default
df DataFrame

The Polars DataFrame to optimize.

required
include str | list[str] | None

Column(s) to include in optimization (default: all columns).

None
exclude str | list[str] | None

Column(s) to exclude from optimization.

None
time_zone str | None

Optional time zone hint during datetime parsing.

None
shrink_numerics bool

Whether to downcast numeric types when possible.

False
allow_unsigned bool

Whether to allow unsigned integer types.

True
allow_null bool

Whether to allow columns with all null values to be cast to Null type.

True
sample_size int | None

Maximum number of cleaned values to inspect for regex-based inference. Use None to inspect the entire column.

1024
sample_method SampleMethod

Which subset to inspect ("first" or "random").

'first'
strict bool

If True, will raise an error if any column cannot be optimized.

False
force_timezone str | None

If set, ensure all parsed datetime columns end up with this timezone.

None

Returns:

Type Description
DataFrame

DataFrame with optimized data types.

Source code in src/fsspeckit/common/polars.py
def opt_dtype(
    df: pl.DataFrame,
    include: str | list[str] | None = None,
    exclude: str | list[str] | None = None,
    time_zone: str | None = None,
    shrink_numerics: bool = False,
    allow_unsigned: bool = True,
    allow_null: bool = True,
    sample_size: int | None = 1024,
    sample_method: SampleMethod = "first",
    strict: bool = False,
    *,
    force_timezone: str | None = None,
) -> pl.DataFrame:
    """
    Optimize data types of a Polars DataFrame for performance and memory efficiency.

    This function analyzes each column and converts it to the most appropriate
    data type based on content, handling string-to-type conversions and
    numeric type downcasting.

    Args:
        df: The Polars DataFrame to optimize.
        include: Column(s) to include in optimization (default: all columns).
        exclude: Column(s) to exclude from optimization.
        time_zone: Optional time zone hint during datetime parsing.
        shrink_numerics: Whether to downcast numeric types when possible.
        allow_unsigned: Whether to allow unsigned integer types.
        allow_null: Whether to allow columns with all null values to be cast to Null type.
        sample_size: Maximum number of cleaned values to inspect for regex-based inference. Use None to inspect the entire column.
        sample_method: Which subset to inspect (`"first"` or `"random"`).
        strict: If True, will raise an error if any column cannot be optimized.
        force_timezone: If set, ensure all parsed datetime columns end up with this timezone.

    Returns:
        DataFrame with optimized data types.
    """
    if sample_method not in ("first", "random"):
        raise ValueError("sample_method must be 'first' or 'random'")

    if isinstance(df, pl.LazyFrame):
        return opt_dtype(
            df.collect(),
            include=include,
            exclude=exclude,
            time_zone=time_zone,
            shrink_numerics=shrink_numerics,
            allow_unsigned=allow_unsigned,
            allow_null=allow_null,
            sample_size=sample_size,
            sample_method=sample_method,
            strict=strict,
            force_timezone=force_timezone,
        ).lazy()

    # Normalize include/exclude parameters
    if isinstance(include, str):
        include = [include]
    if isinstance(exclude, str):
        exclude = [exclude]

    # Determine columns to process
    cols_to_process = df.columns
    if include:
        cols_to_process = [col for col in include if col in df.columns]
    if exclude:
        cols_to_process = [col for col in cols_to_process if col not in exclude]

    # Generate optimization expressions for all columns
    expressions = []
    for col_name in cols_to_process:
        try:
            expressions.append(
                _get_column_expr(
                    df,
                    col_name,
                    shrink_numerics,
                    allow_unsigned,
                    allow_null,
                    time_zone,
                    force_timezone,
                    sample_size,
                    sample_method,
                    strict,
                )
            )
        except Exception as e:
            if strict:
                raise e
            # If strict mode is off, just keep the original column
            continue

    # Apply all transformations at once if any exist
    return df if not expressions else df.with_columns(expressions)

fsspeckit.common.safe_format_error

safe_format_error(
    operation: str,
    path: str | None = None,
    error: BaseException | None = None,
    **context: Any,
) -> str

Format an error message with credentials scrubbed.

Parameters:

Name Type Description Default
operation str

Description of the operation that failed.

required
path str | None

Optional path involved in the operation.

None
error BaseException | None

Optional exception that occurred.

None
**context Any

Additional context key-value pairs.

{}

Returns:

Type Description
str

A formatted, credential-scrubbed error message.

Source code in src/fsspeckit/common/security.py
def safe_format_error(
    operation: str,
    path: str | None = None,
    error: BaseException | None = None,
    **context: Any,
) -> str:
    """Format an error message with credentials scrubbed.

    Args:
        operation: Description of the operation that failed.
        path: Optional path involved in the operation.
        error: Optional exception that occurred.
        **context: Additional context key-value pairs.

    Returns:
        A formatted, credential-scrubbed error message.
    """
    parts = [f"Failed to {operation}"]

    if path:
        parts.append(f"at '{path}'")

    if error:
        parts.append(f": {scrub_exception(error)}")

    if context:
        context_str = ", ".join(f"{k}={scrub_credentials(str(v))}" for k, v in context.items())
        parts.append(f" ({context_str})")

    return " ".join(parts)

fsspeckit.common.scrub_credentials

scrub_credentials(message: str) -> str

Remove or mask credential-like values from a string.

This is intended for use before logging error messages that might contain sensitive information like access keys or tokens.

Parameters:

Name Type Description Default
message str

The string to scrub.

required

Returns:

Type Description
str

The string with credential-like values replaced with [REDACTED].

Examples:

>>> scrub_credentials("Error: access_key_id=AKIAIOSFODNN7EXAMPLE")
'Error: access_key_id=[REDACTED]'
>>> scrub_credentials("Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...")
'[REDACTED]'
Source code in src/fsspeckit/common/security.py
def scrub_credentials(message: str) -> str:
    """Remove or mask credential-like values from a string.

    This is intended for use before logging error messages that might
    contain sensitive information like access keys or tokens.

    Args:
        message: The string to scrub.

    Returns:
        The string with credential-like values replaced with [REDACTED].

    Examples:
        >>> scrub_credentials("Error: access_key_id=AKIAIOSFODNN7EXAMPLE")
        'Error: access_key_id=[REDACTED]'

        >>> scrub_credentials("Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...")
        '[REDACTED]'
    """
    if not message:
        return message

    result = message

    for pattern in _CREDENTIAL_PATTERNS:
        # Replace matched groups with [REDACTED]
        def redact_match(match: re.Match) -> str:
            groups = match.groups()
            if len(groups) >= 2:
                # Pattern with key=value format - keep the key, redact the value
                return match.group(0).replace(groups[-1], "[REDACTED]")
            else:
                # Single match - redact entire thing
                return "[REDACTED]"

        result = pattern.sub(redact_match, result)

    return result

fsspeckit.common.scrub_exception

scrub_exception(exc: BaseException) -> str

Scrub credentials from an exception's string representation.

Parameters:

Name Type Description Default
exc BaseException

The exception to scrub.

required

Returns:

Type Description
str

A scrubbed string representation of the exception.

Source code in src/fsspeckit/common/security.py
def scrub_exception(exc: BaseException) -> str:
    """Scrub credentials from an exception's string representation.

    Args:
        exc: The exception to scrub.

    Returns:
        A scrubbed string representation of the exception.
    """
    return scrub_credentials(str(exc))

fsspeckit.common.setup_logging

setup_logging(
    level: Optional[str] = None,
    disable: bool = False,
    format_string: Optional[str] = None,
) -> None

Configure the Loguru logger for fsspeckit.

Removes the default handler and adds a new one targeting stderr with customizable level and format.

Parameters:

Name Type Description Default
level Optional[str]

Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL). If None, uses fsspeckit_LOG_LEVEL environment variable or defaults to "INFO".

None
disable bool

Whether to disable logging for fsspeckit package.

False
format_string Optional[str]

Custom format string for log messages. If None, uses a default comprehensive format.

None
Example
1
2
3
4
5
6
7
8
# Basic setup
setup_logging()

# Custom level and format
setup_logging(level="DEBUG", format_string="{time} | {level} | {message}")

# Disable logging
setup_logging(disable=True)
Source code in src/fsspeckit/common/logging.py
def setup_logging(
    level: Optional[str] = None,
    disable: bool = False,
    format_string: Optional[str] = None,
) -> None:
    """Configure the Loguru logger for fsspeckit.

    Removes the default handler and adds a new one targeting stderr
    with customizable level and format.

    Args:
        level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
               If None, uses fsspeckit_LOG_LEVEL environment variable
               or defaults to "INFO".
        disable: Whether to disable logging for fsspeckit package.
        format_string: Custom format string for log messages.
                      If None, uses a default comprehensive format.

    Example:
        ```python
        # Basic setup
        setup_logging()

        # Custom level and format
        setup_logging(level="DEBUG", format_string="{time} | {level} | {message}")

        # Disable logging
        setup_logging(disable=True)
        ```
    """
    # Determine log level
    if level is None:
        level = os.getenv("fsspeckit_LOG_LEVEL", "INFO")

    # Default format if none provided
    if format_string is None:
        format_string = (
            "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
            "<level>{level: <8}</level> | "
            "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
            "<level>{message}</level>"
        )

    # Remove the default handler added by Loguru
    logger.remove()

    # Add new handler with custom configuration
    logger.add(
        sys.stderr,
        level=level.upper(),
        format=format_string,
    )

    # Optionally disable logging for this package
    if disable:
        logger.disable("fsspeckit")

fsspeckit.common.sync_dir

sync_dir(
    src_fs: AbstractFileSystem,
    dst_fs: AbstractFileSystem,
    src_path: str = "",
    dst_path: str = "",
    server_side: bool = True,
    chunk_size: int = 8 * 1024 * 1024,
    parallel: bool = False,
    n_jobs: int = -1,
    verbose: bool = True,
) -> dict[str, list[str]]

Sync two directories between different filesystems.

Compares files in the source and destination directories, copies new or updated files from source to destination, and deletes stale files from destination.

Parameters:

Name Type Description Default
src_fs AbstractFileSystem

Source filesystem (fsspec AbstractFileSystem)

required
dst_fs AbstractFileSystem

Destination filesystem (fsspec AbstractFileSystem)

required
src_path str

Path in source filesystem to sync. Default is root ('').

''
dst_path str

Path in destination filesystem to sync. Default is root ('').

''
chunk_size int

Size of chunks to read/write files (in bytes). Default is 8MB.

8 * 1024 * 1024
parallel bool

Whether to perform copy/delete operations in parallel. Default is False.

False
n_jobs int

Number of parallel jobs if parallel=True. Default is -1 (all cores).

-1
verbose bool

Whether to show progress bars. Default is True.

True

Returns:

Name Type Description
dict dict[str, list[str]]

Summary of added and deleted files

Source code in src/fsspeckit/common/misc.py
def sync_dir(
    src_fs: AbstractFileSystem,
    dst_fs: AbstractFileSystem,
    src_path: str = "",
    dst_path: str = "",
    server_side: bool = True,
    chunk_size: int = 8 * 1024 * 1024,
    parallel: bool = False,
    n_jobs: int = -1,
    verbose: bool = True,
) -> dict[str, list[str]]:
    """Sync two directories between different filesystems.

    Compares files in the source and destination directories, copies new or updated files from source to destination,
    and deletes stale files from destination.

    Args:
        src_fs: Source filesystem (fsspec AbstractFileSystem)
        dst_fs: Destination filesystem (fsspec AbstractFileSystem)
        src_path: Path in source filesystem to sync. Default is root ('').
        dst_path: Path in destination filesystem to sync. Default is root ('').
        chunk_size: Size of chunks to read/write files (in bytes). Default is 8MB.
        parallel: Whether to perform copy/delete operations in parallel. Default is False.
        n_jobs: Number of parallel jobs if parallel=True. Default is -1 (all cores).
        verbose: Whether to show progress bars. Default is True.

    Returns:
        dict: Summary of added and deleted files
    """

    src_mapper = src_fs.get_mapper(src_path)
    dst_mapper = dst_fs.get_mapper(dst_path)

    add_files = sorted(src_mapper.keys() - dst_mapper.keys())
    delete_files = sorted(dst_mapper.keys() - src_mapper.keys())

    return sync_files(
        add_files=add_files,
        delete_files=delete_files,
        src_fs=src_fs,
        dst_fs=dst_fs,
        src_path=src_path,
        dst_path=dst_path,
        chunk_size=chunk_size,
        server_side=server_side,
        parallel=parallel,
        n_jobs=n_jobs,
        verbose=verbose,
    )

fsspeckit.common.sync_files

sync_files(
    add_files: list[str],
    delete_files: list[str],
    src_fs: AbstractFileSystem,
    dst_fs: AbstractFileSystem,
    src_path: str = "",
    dst_path: str = "",
    server_side: bool = False,
    chunk_size: int = 8 * 1024 * 1024,
    parallel: bool = False,
    n_jobs: int = -1,
    verbose: bool = True,
) -> dict[str, list[str]]

Sync files between two filesystems by copying new files and deleting old ones.

Parameters:

Name Type Description Default
add_files list[str]

List of file paths to add (copy from source to destination)

required
delete_files list[str]

List of file paths to delete from destination

required
src_fs AbstractFileSystem

Source filesystem (fsspec AbstractFileSystem)

required
dst_fs AbstractFileSystem

Destination filesystem (fsspec AbstractFileSystem)

required
src_path str

Base path in source filesystem. Default is root ('').

''
dst_path str

Base path in destination filesystem. Default is root ('').

''
server_side bool

Whether to use server-side copy if supported. Default is False.

False
chunk_size int

Size of chunks to read/write files (in bytes). Default is 8MB.

8 * 1024 * 1024
parallel bool

Whether to perform copy/delete operations in parallel. Default is False.

False
n_jobs int

Number of parallel jobs if parallel=True. Default is -1 (all cores).

-1
verbose bool

Whether to show progress bars. Default is True.

True

Returns:

Name Type Description
dict dict[str, list[str]]

Summary of added and deleted files

Source code in src/fsspeckit/common/misc.py
def sync_files(
    add_files: list[str],
    delete_files: list[str],
    src_fs: AbstractFileSystem,
    dst_fs: AbstractFileSystem,
    src_path: str = "",
    dst_path: str = "",
    server_side: bool = False,
    chunk_size: int = 8 * 1024 * 1024,
    parallel: bool = False,
    n_jobs: int = -1,
    verbose: bool = True,
) -> dict[str, list[str]]:
    """Sync files between two filesystems by copying new files and deleting old ones.

    Args:
        add_files: List of file paths to add (copy from source to destination)
        delete_files: List of file paths to delete from destination
        src_fs: Source filesystem (fsspec AbstractFileSystem)
        dst_fs: Destination filesystem (fsspec AbstractFileSystem)
        src_path: Base path in source filesystem. Default is root ('').
        dst_path: Base path in destination filesystem. Default is root ('').
        server_side: Whether to use server-side copy if supported. Default is False.
        chunk_size: Size of chunks to read/write files (in bytes). Default is 8MB.
        parallel: Whether to perform copy/delete operations in parallel. Default is False.
        n_jobs: Number of parallel jobs if parallel=True. Default is -1 (all cores).
        verbose: Whether to show progress bars. Default is True.

    Returns:
        dict: Summary of added and deleted files
    """
    CHUNK = chunk_size
    RETRIES = 3

    server_side = check_fs_identical(src_fs, dst_fs) and server_side

    src_mapper = src_fs.get_mapper(src_path)
    dst_mapper = dst_fs.get_mapper(dst_path)

    def server_side_copy_file(key, src_mapper, dst_mapper, RETRIES):
        last_exc = None
        for attempt in range(1, RETRIES + 1):
            try:
                dst_mapper[key] = src_mapper[key]
                break
            except (OSError, IOError) as e:
                last_exc = e
                if attempt == RETRIES:
                    logger.error(
                        "Failed to copy file %s after %d attempts: %s",
                        key,
                        RETRIES,
                        str(e),
                        exc_info=True,
                    )
                    raise RuntimeError(f"Failed to copy file {key} after {RETRIES} attempts") from e
            except Exception as e:
                last_exc = e
                if attempt == RETRIES:
                    logger.error(
                        "Unexpected error copying file %s: %s",
                        key,
                        str(e),
                        exc_info=True,
                    )
                    raise RuntimeError(f"Unexpected error copying file {key}: {e}") from e

    def copy_file(key, src_fs, dst_fs, src_path, dst_path, CHUNK, RETRIES):
        last_exc = None
        for attempt in range(1, RETRIES + 1):
            try:
                with (
                    src_fs.open(posixpath.join(src_path, key), "rb") as r,
                    dst_fs.open(posixpath.join(dst_path, key), "wb") as w,
                ):
                    while True:
                        chunk = r.read(CHUNK)
                        if not chunk:
                            break
                        w.write(chunk)
                break
            except (OSError, IOError) as e:
                last_exc = e
                if attempt == RETRIES:
                    logger.error(
                        "Failed to copy file %s after %d attempts: %s",
                        key,
                        RETRIES,
                        str(e),
                        exc_info=True,
                    )
                    raise RuntimeError(f"Failed to copy file {key} after {RETRIES} attempts") from e
            except Exception as e:
                last_exc = e
                if attempt == RETRIES:
                    logger.error(
                        "Unexpected error copying file %s: %s",
                        key,
                        str(e),
                        exc_info=True,
                    )
                    raise RuntimeError(f"Unexpected error copying file {key}: {e}") from e

    def delete_file(key, dst_fs, dst_path, RETRIES):
        last_exc = None
        for attempt in range(1, RETRIES + 1):
            try:
                dst_fs.rm(posixpath.join(dst_path, key))
                break
            except (OSError, IOError) as e:
                last_exc = e
                if attempt == RETRIES:
                    logger.error(
                        "Failed to delete file %s after %d attempts: %s",
                        key,
                        RETRIES,
                        str(e),
                        exc_info=True,
                    )
                    raise RuntimeError(f"Failed to delete file {key} after {RETRIES} attempts") from e
            except Exception as e:
                last_exc = e
                if attempt == RETRIES:
                    logger.error(
                        "Unexpected error deleting file %s: %s",
                        key,
                        str(e),
                        exc_info=True,
                    )
                    raise RuntimeError(f"Unexpected error deleting file {key}: {e}") from e

    if len(add_files):
        # Copy new files
        if parallel:
            if server_side:
                try:
                    run_parallel(
                        server_side_copy_file,
                        add_files,
                        src_mapper=src_mapper,
                        dst_mapper=dst_mapper,
                        RETRIES=RETRIES,
                        n_jobs=n_jobs,
                        verbose=verbose,
                    )
                except (RuntimeError, OSError) as e:
                    logger.warning(
                        "Server-side copy failed for some files, falling back to client-side: %s",
                        str(e),
                    )
                    # Fallback to client-side copy if server-side fails
                    run_parallel(
                        copy_file,
                        add_files,
                        src_fs=src_fs,
                        dst_fs=dst_fs,
                        src_path=src_path,
                        dst_path=dst_path,
                        CHUNK=CHUNK,
                        RETRIES=RETRIES,
                        n_jobs=n_jobs,
                        verbose=verbose,
                    )

            else:
                run_parallel(
                    copy_file,
                    add_files,
                    src_fs=src_fs,
                    dst_fs=dst_fs,
                    src_path=src_path,
                    dst_path=dst_path,
                    CHUNK=CHUNK,
                    RETRIES=RETRIES,
                    n_jobs=n_jobs,
                    verbose=verbose,
                )
        else:
            if verbose:
                from rich.progress import track

                for key in track(
                    add_files,
                    description="Copying new files...",
                    total=len(add_files),
                ):
                    if server_side:
                        try:
                            server_side_copy_file(key, src_mapper, dst_mapper, RETRIES)
                        except (RuntimeError, OSError):
                            copy_file(
                                key, src_fs, dst_fs, src_path, dst_path, CHUNK, RETRIES
                            )
                    else:
                        copy_file(
                            key, src_fs, dst_fs, src_path, dst_path, CHUNK, RETRIES
                        )
            else:
                for key in add_files:
                    if server_side:
                        try:
                            server_side_copy_file(key, src_mapper, dst_mapper, RETRIES)
                        except (RuntimeError, OSError):
                            copy_file(
                                key, src_fs, dst_fs, src_path, dst_path, CHUNK, RETRIES
                            )
                    else:
                        copy_file(
                            key, src_fs, dst_fs, src_path, dst_path, CHUNK, RETRIES
                        )

    if len(delete_files):
        # Delete old files from destination
        if parallel:
            run_parallel(
                delete_file,
                delete_files,
                dst_fs=dst_fs,
                dst_path=dst_path,
                RETRIES=RETRIES,
                n_jobs=n_jobs,
                verbose=verbose,
            )
        else:
            if verbose:
                from rich.progress import track

                for key in track(
                    delete_files,
                    description="Deleting stale files...",
                    total=len(delete_files),
                ):
                    delete_file(key, dst_fs, dst_path, RETRIES)
            else:
                for key in delete_files:
                    delete_file(key, dst_fs, dst_path, RETRIES)

    return {"added_files": add_files, "deleted_files": delete_files}

fsspeckit.common.timestamp_from_string cached

timestamp_from_string(
    timestamp_str: str,
    tz: Union[str, None] = None,
    naive: bool = False,
) -> Union[datetime, date, time]

Converts a timestamp string (ISO 8601 format) into a datetime, date, or time object using only standard Python libraries.

Handles strings with or without timezone information (e.g., '2023-01-01T10:00:00+02:00', '2023-01-01', '10:00:00'). Supports timezone offsets like '+HH:MM' or '+HHMM'. For named timezones (e.g., 'Europe/Paris'), requires Python 3.9+ and the 'tzdata' package to be installed.

Parameters:

Name Type Description Default
timestamp_str str

The string representation of the timestamp (ISO 8601 format).

required
tz str

Target timezone identifier (e.g., 'UTC', '+02:00', 'Europe/Paris'). If provided, the output datetime/time will be localized or converted to this timezone. Defaults to None.

None
naive bool

If True, return a naive datetime/time (no timezone info), even if the input string or tz parameter specifies one. Defaults to False.

False

Returns:

Type Description
Union[datetime, date, time]

Union[dt.datetime, dt.date, dt.time]: The parsed datetime, date, or time object.

Raises:

Type Description
ValueError

If the timestamp string format is invalid or the timezone is invalid/unsupported.

Source code in src/fsspeckit/common/datetime.py
@lru_cache(maxsize=128)
def timestamp_from_string(
    timestamp_str: str,
    tz: Union[str, None] = None,
    naive: bool = False,
) -> Union[dt.datetime, dt.date, dt.time]:
    """
    Converts a timestamp string (ISO 8601 format) into a datetime, date, or time object
    using only standard Python libraries.

    Handles strings with or without timezone information (e.g., '2023-01-01T10:00:00+02:00',
    '2023-01-01', '10:00:00'). Supports timezone offsets like '+HH:MM' or '+HHMM'.
    For named timezones (e.g., 'Europe/Paris'), requires Python 3.9+ and the 'tzdata'
    package to be installed.

    Args:
        timestamp_str (str): The string representation of the timestamp (ISO 8601 format).
        tz (str, optional): Target timezone identifier (e.g., 'UTC', '+02:00', 'Europe/Paris').
            If provided, the output datetime/time will be localized or converted to this timezone.
            Defaults to None.
        naive (bool, optional): If True, return a naive datetime/time (no timezone info),
            even if the input string or `tz` parameter specifies one. Defaults to False.

    Returns:
        Union[dt.datetime, dt.date, dt.time]: The parsed datetime, date, or time object.

    Raises:
        ValueError: If the timestamp string format is invalid or the timezone is
                    invalid/unsupported.
    """

    # Regex to parse timezone offsets like +HH:MM or +HHMM
    _TZ_OFFSET_REGEX = re.compile(r"([+-])(\d{2}):?(\d{2})")

    def _parse_tz_offset(tz_str: str) -> dt.tzinfo | None:
        """Parses a timezone offset string into a timezone object."""
        match = _TZ_OFFSET_REGEX.fullmatch(tz_str)
        if match:
            sign, hours, minutes = match.groups()
            offset_seconds = (int(hours) * 3600 + int(minutes) * 60) * (
                -1 if sign == "-" else 1
            )
            if abs(offset_seconds) >= 24 * 3600:
                raise ValueError(f"Invalid timezone offset: {tz_str}")
            return dt.timezone(dt.timedelta(seconds=offset_seconds), name=tz_str)
        return None

    def _get_tzinfo(tz_identifier: str | None) -> dt.tzinfo | None:
        """Gets a tzinfo object from a string (offset or IANA name)."""
        if tz_identifier is None:
            return None
        if tz_identifier.upper() == "UTC":
            return dt.timezone.utc

        # Try parsing as offset first
        offset_tz = _parse_tz_offset(tz_identifier)
        if offset_tz:
            return offset_tz

        # Try parsing as IANA name using zoneinfo (if available)
        if ZoneInfo:
            try:
                return ZoneInfo(tz_identifier)
            except ZoneInfoNotFoundError:
                raise ValueError(
                    f"Timezone '{tz_identifier}' not found. Install 'tzdata' or use offset format."
                )
            except Exception as e:  # Catch other potential zoneinfo errors
                raise ValueError(f"Error loading timezone '{tz_identifier}': {e}")
        else:
            # zoneinfo not available
            raise ValueError(
                f"Invalid timezone: '{tz_identifier}'. Use offset format (e.g., '+02:00') "
                "or run Python 3.9+ with 'tzdata' installed for named timezones."
            )

    target_tz: dt.tzinfo | None = _get_tzinfo(tz)
    parsed_obj: dt.datetime | dt.date | dt.time | None = None

    # Preprocess: Replace space separator, strip whitespace
    processed_str = timestamp_str.strip().replace(" ", "T")

    # Attempt parsing (datetime, date, time) using fromisoformat
    try:
        # Python < 3.11 fromisoformat has limitations (e.g., no Z, no +HHMM offset)
        # This implementation assumes Python 3.11+ for full ISO 8601 support via fromisoformat
        # or that input strings use formats compatible with older versions (e.g., +HH:MM)
        parsed_obj = dt.datetime.fromisoformat(processed_str)
    except ValueError:
        try:
            parsed_obj = dt.date.fromisoformat(processed_str)
        except ValueError:
            try:
                # Time parsing needs care, especially with offsets in older Python
                parsed_obj = dt.time.fromisoformat(processed_str)
            except ValueError:
                # Add fallback for simple HH:MM:SS if needed, though less robust
                # try:
                #     parsed_obj = dt.datetime.strptime(processed_str, "%H:%M:%S").time()
                # except ValueError:
                raise ValueError(f"Invalid timestamp format: '{timestamp_str}'")

    # Apply timezone logic if we have a datetime or time object
    if isinstance(parsed_obj, (dt.datetime, dt.time)):
        is_aware = (
            parsed_obj.tzinfo is not None
            and parsed_obj.tzinfo.utcoffset(
                parsed_obj if isinstance(parsed_obj, dt.datetime) else None
            )
            is not None
        )

        if target_tz:
            if is_aware:
                # Convert existing aware object to target timezone (only for datetime)
                if isinstance(parsed_obj, dt.datetime):
                    parsed_obj = parsed_obj.astimezone(target_tz)
                # else: dt.time cannot be converted without a date context. Keep original tz.
            else:
                # Localize naive object to target timezone
                parsed_obj = parsed_obj.replace(tzinfo=target_tz)
            is_aware = True  # Object is now considered aware

        # Handle naive flag: remove tzinfo if requested
        if naive and is_aware:
            parsed_obj = parsed_obj.replace(tzinfo=None)

    # If it's a date object, tz/naive flags are ignored
    elif isinstance(parsed_obj, dt.date):
        pass

    return parsed_obj

fsspeckit.common.to_pyarrow_table

to_pyarrow_table(
    data: Union[Any, dict, list[Any]],
    concat: bool = False,
    unique: Union[bool, list[str], str] = False,
) -> Any

Convert various data formats to PyArrow Table.

Handles conversion from Polars DataFrames, Pandas DataFrames, dictionaries, and lists of these types to PyArrow Tables.

Parameters:

Name Type Description Default
data Union[Any, dict, list[Any]]

Input data to convert.

required
concat bool

Whether to concatenate multiple inputs into single table.

False
unique Union[bool, list[str], str]

Whether to remove duplicates. Can specify columns.

False

Returns:

Type Description
Any

PyArrow Table containing the converted data.

Example
1
2
3
4
5
df = pl.DataFrame({"a": [1, 2], "b": [3, 4]})
table = to_pyarrow_table(df)
print(table.schema)
# a: int64
# b: int64
Source code in src/fsspeckit/common/types.py
def to_pyarrow_table(
    data: Union[
        Any,  # pl.DataFrame, pl.LazyFrame, pd.DataFrame
        dict,
        list[Any],  # list of DataFrames or dicts
    ],
    concat: bool = False,
    unique: Union[bool, list[str], str] = False,
) -> Any:
    """Convert various data formats to PyArrow Table.

    Handles conversion from Polars DataFrames, Pandas DataFrames,
    dictionaries, and lists of these types to PyArrow Tables.

    Args:
        data: Input data to convert.
        concat: Whether to concatenate multiple inputs into single table.
        unique: Whether to remove duplicates. Can specify columns.

    Returns:
        PyArrow Table containing the converted data.

    Example:
        ```python
        df = pl.DataFrame({"a": [1, 2], "b": [3, 4]})
        table = to_pyarrow_table(df)
        print(table.schema)
        # a: int64
        # b: int64
        ```
    """
    from fsspeckit.common.optional import (
        _import_pandas,
        _import_polars,
        _import_pyarrow,
    )
    from fsspeckit.datasets.pyarrow import convert_large_types_to_normal

    pl = _import_polars()
    pd = _import_pandas()
    pa = _import_pyarrow()

    # Convert dict to DataFrame first
    if isinstance(data, dict):
        data = dict_to_dataframe(data)
    if isinstance(data, list):
        if isinstance(data[0], dict):
            data = dict_to_dataframe(data, unique=unique)

    # Ensure data is a list for uniform processing
    if not isinstance(data, list):
        data = [data]

    # Collect lazy frames
    if isinstance(data[0], pl.LazyFrame):
        data = [dd.collect() for dd in data]

    # Convert based on the first item's type
    if isinstance(data[0], pl.DataFrame):
        if concat:
            data = pl.concat(data, how="diagonal_relaxed")
            if unique:
                data = data.unique(
                    subset=None if not isinstance(unique, (str, list)) else unique,
                    maintain_order=True,
                )
            data = data.to_arrow()
            data = data.cast(convert_large_types_to_normal(data.schema))
        else:
            data = [dd.to_arrow() for dd in data]
            data = [dd.cast(convert_large_types_to_normal(dd.schema)) for dd in data]

    elif isinstance(data[0], pd.DataFrame):
        data = [pa.Table.from_pandas(dd, preserve_index=False) for dd in data]
        if concat:
            data = pa.concat_tables(data, promote_options="permissive")
            if unique:
                data = (
                    pl.from_arrow(data)
                    .unique(
                        subset=None if not isinstance(unique, (str, list)) else unique,
                        maintain_order=True,
                    )
                    .to_arrow()
                )
                data = data.cast(convert_large_types_to_normal(data.schema))

    elif isinstance(data[0], (pa.RecordBatch, Generator)):
        if concat:
            data = pa.Table.from_batches(data)
            if unique:
                data = (
                    pl.from_arrow(data)
                    .unique(
                        subset=None if not isinstance(unique, (str, list)) else unique,
                        maintain_order=True,
                    )
                    .to_arrow()
                )
                data = data.cast(convert_large_types_to_normal(data.schema))
        else:
            data = [pa.Table.from_batches([dd]) for dd in data]

    return data

fsspeckit.common.validate_columns

validate_columns(
    columns: list[str] | None, valid_columns: list[str]
) -> list[str] | None

Validate that requested columns exist in the schema.

This is a helper to prevent column injection in SQL-like operations.

Parameters:

Name Type Description Default
columns list[str] | None

List of column names to validate, or None.

required
valid_columns list[str]

List of valid column names from the schema.

required

Returns:

Type Description
list[str] | None

The validated columns list, or None if columns was None.

Raises:

Type Description
ValueError

If any column is not in the valid set.

Source code in src/fsspeckit/common/security.py
def validate_columns(columns: list[str] | None, valid_columns: list[str]) -> list[str] | None:
    """Validate that requested columns exist in the schema.

    This is a helper to prevent column injection in SQL-like operations.

    Args:
        columns: List of column names to validate, or None.
        valid_columns: List of valid column names from the schema.

    Returns:
        The validated columns list, or None if columns was None.

    Raises:
        ValueError: If any column is not in the valid set.
    """
    if columns is None:
        return None

    valid_set = set(valid_columns)
    invalid = [col for col in columns if col not in valid_set]

    if invalid:
        raise ValueError(
            f"Invalid column(s): {', '.join(invalid)}. "
            f"Valid columns are: {', '.join(sorted(valid_set))}"
        )

    return columns

fsspeckit.common.validate_compression_codec

validate_compression_codec(codec: str) -> str

Validate that a compression codec is in the allowed set.

This prevents injection of arbitrary values into SQL queries or filesystem operations that accept codec parameters.

Parameters:

Name Type Description Default
codec str

The compression codec name to validate.

required

Returns:

Type Description
str

The validated codec name (lowercased).

Raises:

Type Description
ValueError

If the codec is not in the allowed set.

Examples:

>>> validate_compression_codec("snappy")
'snappy'
>>> validate_compression_codec("GZIP")
'gzip'
>>> validate_compression_codec("malicious; DROP TABLE")
ValueError: Invalid compression codec
Source code in src/fsspeckit/common/security.py
def validate_compression_codec(codec: str) -> str:
    """Validate that a compression codec is in the allowed set.

    This prevents injection of arbitrary values into SQL queries or
    filesystem operations that accept codec parameters.

    Args:
        codec: The compression codec name to validate.

    Returns:
        The validated codec name (lowercased).

    Raises:
        ValueError: If the codec is not in the allowed set.

    Examples:
        >>> validate_compression_codec("snappy")
        'snappy'

        >>> validate_compression_codec("GZIP")
        'gzip'

        >>> validate_compression_codec("malicious; DROP TABLE")
        ValueError: Invalid compression codec
    """
    if not codec or not isinstance(codec, str):
        raise ValueError("Compression codec must be a non-empty string")

    normalized = codec.lower().strip()

    if normalized not in VALID_COMPRESSION_CODECS:
        valid_list = ", ".join(sorted(VALID_COMPRESSION_CODECS - {"none"}))
        raise ValueError(
            f"Invalid compression codec: '{codec}'. "
            f"Must be one of: {valid_list}"
        )

    return normalized

fsspeckit.common.validate_path

validate_path(
    path: str, base_dir: str | None = None
) -> str

Validate a filesystem path for security issues.

Checks for: - Embedded null bytes and control characters - Path traversal attempts (../ sequences escaping base_dir) - Empty or whitespace-only paths

Parameters:

Name Type Description Default
path str

The path to validate.

required
base_dir str | None

Optional base directory. If provided, the path must resolve to a location within this directory (prevents path traversal).

None

Returns:

Type Description
str

The validated path (unchanged if valid).

Raises:

Type Description
ValueError

If the path contains forbidden characters, is empty, or escapes the base directory.

Examples:

>>> validate_path("/data/file.parquet")
'/data/file.parquet'
>>> validate_path("../../../etc/passwd", base_dir="/data")
ValueError: Path escapes base directory
>>> validate_path("file\x00.parquet")
ValueError: Path contains forbidden characters
Source code in src/fsspeckit/common/security.py
def validate_path(path: str, base_dir: str | None = None) -> str:
    """Validate a filesystem path for security issues.

    Checks for:
    - Embedded null bytes and control characters
    - Path traversal attempts (../ sequences escaping base_dir)
    - Empty or whitespace-only paths

    Args:
        path: The path to validate.
        base_dir: Optional base directory. If provided, the path must resolve
            to a location within this directory (prevents path traversal).

    Returns:
        The validated path (unchanged if valid).

    Raises:
        ValueError: If the path contains forbidden characters, is empty,
            or escapes the base directory.

    Examples:
        >>> validate_path("/data/file.parquet")
        '/data/file.parquet'

        >>> validate_path("../../../etc/passwd", base_dir="/data")
        ValueError: Path escapes base directory

        >>> validate_path("file\\x00.parquet")
        ValueError: Path contains forbidden characters
    """
    if not path or not path.strip():
        raise ValueError("Path cannot be empty or whitespace-only")

    # Check for forbidden control characters
    for char in path:
        if char in _FORBIDDEN_PATH_CHARS:
            raise ValueError(
                f"Path contains forbidden control character: {repr(char)}"
            )

    # Check for path traversal when base_dir is specified
    if base_dir is not None:
        import os

        # Normalize both paths for comparison
        base_resolved = os.path.normpath(os.path.abspath(base_dir))

        # Handle relative paths by joining with base
        if not os.path.isabs(path):
            full_path = os.path.join(base_dir, path)
        else:
            full_path = path

        path_resolved = os.path.normpath(os.path.abspath(full_path))

        # Check if resolved path starts with base directory
        if not path_resolved.startswith(base_resolved + os.sep) and path_resolved != base_resolved:
            raise ValueError(
                f"Path '{path}' escapes base directory '{base_dir}'"
            )

    return path

Modules

fsspeckit.common.datetime

Functions
fsspeckit.common.datetime.get_timedelta_str
get_timedelta_str(
    timedelta_string: str, to: str = "polars"
) -> str

Convert timedelta strings between different formats.

Converts timedelta strings between Polars and DuckDB formats, with graceful fallback for unknown units. Never raises errors for unknown units - instead returns a reasonable string representation.

Parameters:

Name Type Description Default
timedelta_string str

Input timedelta string (e.g., "1h", "2d", "5invalid").

required
to str

Target format - "polars" or "duckdb". Defaults to "polars".

'polars'

Returns:

Type Description
str

String in the target format. For unknown units, returns "value unit"

str

format without raising errors.

Examples:

>>> # Valid Polars units
>>> get_timedelta_str("1h", to="polars")
'1h'
>>> get_timedelta_str("1d", to="polars")
'1d'
>>>
>>> # Convert to DuckDB format
>>> get_timedelta_str("1h", to="duckdb")
'1 hour'
>>> get_timedelta_str("1s", to="duckdb")
'1 second'
>>>
>>> # Unknown units - graceful fallback
>>> get_timedelta_str("1invalid", to="polars")
'1 invalid'
>>> get_timedelta_str("5unknown", to="duckdb")
'5 unknown'
Source code in src/fsspeckit/common/datetime.py
def get_timedelta_str(timedelta_string: str, to: str = "polars") -> str:
    """Convert timedelta strings between different formats.

    Converts timedelta strings between Polars and DuckDB formats, with graceful
    fallback for unknown units. Never raises errors for unknown units - instead
    returns a reasonable string representation.

    Args:
        timedelta_string: Input timedelta string (e.g., "1h", "2d", "5invalid").
        to: Target format - "polars" or "duckdb". Defaults to "polars".

    Returns:
        String in the target format. For unknown units, returns "value unit"
        format without raising errors.

    Examples:
        >>> # Valid Polars units
        >>> get_timedelta_str("1h", to="polars")
        '1h'
        >>> get_timedelta_str("1d", to="polars")
        '1d'
        >>>
        >>> # Convert to DuckDB format
        >>> get_timedelta_str("1h", to="duckdb")
        '1 hour'
        >>> get_timedelta_str("1s", to="duckdb")
        '1 second'
        >>>
        >>> # Unknown units - graceful fallback
        >>> get_timedelta_str("1invalid", to="polars")
        '1 invalid'
        >>> get_timedelta_str("5unknown", to="duckdb")
        '5 unknown'
    """
    polars_timedelta_units = [
        "ns",
        "us",
        "ms",
        "s",
        "m",
        "h",
        "d",
        "w",
        "mo",
        "y",
    ]
    duckdb_timedelta_units = [
        "nanosecond",
        "microsecond",
        "millisecond",
        "second",
        "minute",
        "hour",
        "day",
        "week",
        "month",
        "year",
    ]

    unit = re.sub("[0-9]", "", timedelta_string).strip()
    val = timedelta_string.replace(unit, "").strip()
    base_unit = re.sub("s$", "", unit)

    if to == "polars":
        # Already a valid Polars unit
        if unit in polars_timedelta_units:
            return timedelta_string
        # Try to map known DuckDB units to Polars units
        mapping = dict(zip(duckdb_timedelta_units, polars_timedelta_units))
        target = mapping.get(base_unit)
        if target is not None:
            return f"{val}{target}"
        # Fallback for unknown units: preserve value and unit as-is
        return f"{val} {unit}".strip()

    # DuckDB branch
    if unit in polars_timedelta_units:
        mapping = dict(zip(polars_timedelta_units, duckdb_timedelta_units))
        return f"{val} {mapping[unit]}"

    # Unknown Polars-style unit when targeting DuckDB: keep unit without trailing "s"
    return f"{val} {base_unit}".strip()
fsspeckit.common.datetime.get_timestamp_column
get_timestamp_column(df: Any) -> Union[str, list[str]]

Get timestamp column names from a DataFrame or PyArrow Table.

Automatically detects and normalizes different DataFrame types to work with pandas DataFrames, Polars DataFrames/LazyFrames, and PyArrow Tables.

Parameters:

Name Type Description Default
df Any

A Polars DataFrame/LazyFrame, PyArrow Table, or pandas DataFrame. The function automatically converts pandas DataFrames and PyArrow Tables to Polars LazyFrames for consistent timestamp detection.

required

Returns:

Type Description
Union[str, list[str]]

List of strings containing timestamp column names. Returns an empty list

Union[str, list[str]]

if no timestamp columns are found.

Examples:

>>> import pandas as pd
>>> import polars as pl
>>> import pyarrow as pa
>>>
>>> # Works with pandas DataFrames
>>> df_pd = pd.DataFrame({"ts": pd.date_range("2023-01-01", periods=3)})
>>> get_timestamp_column(df_pd)
['ts']
>>>
>>> # Works with Polars DataFrames
>>> df_pl = pl.DataFrame({"ts": [datetime(2023, 1, 1)]})
>>> get_timestamp_column(df_pl)
['ts']
>>>
>>> # Works with PyArrow Tables
>>> table = pa.table({"ts": pa.array([datetime(2023, 1, 1)])})
>>> get_timestamp_column(table)
['ts']
Source code in src/fsspeckit/common/datetime.py
def get_timestamp_column(df: Any) -> Union[str, list[str]]:
    """Get timestamp column names from a DataFrame or PyArrow Table.

    Automatically detects and normalizes different DataFrame types to work with
    pandas DataFrames, Polars DataFrames/LazyFrames, and PyArrow Tables.

    Args:
        df: A Polars DataFrame/LazyFrame, PyArrow Table, or pandas DataFrame.
            The function automatically converts pandas DataFrames and PyArrow Tables
            to Polars LazyFrames for consistent timestamp detection.

    Returns:
        List of strings containing timestamp column names. Returns an empty list
        if no timestamp columns are found.

    Examples:
        >>> import pandas as pd
        >>> import polars as pl
        >>> import pyarrow as pa
        >>>
        >>> # Works with pandas DataFrames
        >>> df_pd = pd.DataFrame({"ts": pd.date_range("2023-01-01", periods=3)})
        >>> get_timestamp_column(df_pd)
        ['ts']
        >>>
        >>> # Works with Polars DataFrames
        >>> df_pl = pl.DataFrame({"ts": [datetime(2023, 1, 1)]})
        >>> get_timestamp_column(df_pl)
        ['ts']
        >>>
        >>> # Works with PyArrow Tables
        >>> table = pa.table({"ts": pa.array([datetime(2023, 1, 1)])})
        >>> get_timestamp_column(table)
        ['ts']
    """
    from fsspeckit.common.optional import (
        _import_pandas,
        _import_polars,
        _import_pyarrow,
    )

    pl = _import_polars()
    pa = _import_pyarrow()
    pd = _import_pandas()

    # Import polars.selectors at runtime
    import polars.selectors as cs

    # Normalise supported input types to a Polars LazyFrame
    if isinstance(df, pa.Table):
        df = pl.from_arrow(df).lazy()
    elif isinstance(df, pd.DataFrame):
        df = pl.from_pandas(df).lazy()

    return df.select(cs.datetime() | cs.date()).collect_schema().names()
fsspeckit.common.datetime.timestamp_from_string cached
timestamp_from_string(
    timestamp_str: str,
    tz: Union[str, None] = None,
    naive: bool = False,
) -> Union[datetime, date, time]

Converts a timestamp string (ISO 8601 format) into a datetime, date, or time object using only standard Python libraries.

Handles strings with or without timezone information (e.g., '2023-01-01T10:00:00+02:00', '2023-01-01', '10:00:00'). Supports timezone offsets like '+HH:MM' or '+HHMM'. For named timezones (e.g., 'Europe/Paris'), requires Python 3.9+ and the 'tzdata' package to be installed.

Parameters:

Name Type Description Default
timestamp_str str

The string representation of the timestamp (ISO 8601 format).

required
tz str

Target timezone identifier (e.g., 'UTC', '+02:00', 'Europe/Paris'). If provided, the output datetime/time will be localized or converted to this timezone. Defaults to None.

None
naive bool

If True, return a naive datetime/time (no timezone info), even if the input string or tz parameter specifies one. Defaults to False.

False

Returns:

Type Description
Union[datetime, date, time]

Union[dt.datetime, dt.date, dt.time]: The parsed datetime, date, or time object.

Raises:

Type Description
ValueError

If the timestamp string format is invalid or the timezone is invalid/unsupported.

Source code in src/fsspeckit/common/datetime.py
@lru_cache(maxsize=128)
def timestamp_from_string(
    timestamp_str: str,
    tz: Union[str, None] = None,
    naive: bool = False,
) -> Union[dt.datetime, dt.date, dt.time]:
    """
    Converts a timestamp string (ISO 8601 format) into a datetime, date, or time object
    using only standard Python libraries.

    Handles strings with or without timezone information (e.g., '2023-01-01T10:00:00+02:00',
    '2023-01-01', '10:00:00'). Supports timezone offsets like '+HH:MM' or '+HHMM'.
    For named timezones (e.g., 'Europe/Paris'), requires Python 3.9+ and the 'tzdata'
    package to be installed.

    Args:
        timestamp_str (str): The string representation of the timestamp (ISO 8601 format).
        tz (str, optional): Target timezone identifier (e.g., 'UTC', '+02:00', 'Europe/Paris').
            If provided, the output datetime/time will be localized or converted to this timezone.
            Defaults to None.
        naive (bool, optional): If True, return a naive datetime/time (no timezone info),
            even if the input string or `tz` parameter specifies one. Defaults to False.

    Returns:
        Union[dt.datetime, dt.date, dt.time]: The parsed datetime, date, or time object.

    Raises:
        ValueError: If the timestamp string format is invalid or the timezone is
                    invalid/unsupported.
    """

    # Regex to parse timezone offsets like +HH:MM or +HHMM
    _TZ_OFFSET_REGEX = re.compile(r"([+-])(\d{2}):?(\d{2})")

    def _parse_tz_offset(tz_str: str) -> dt.tzinfo | None:
        """Parses a timezone offset string into a timezone object."""
        match = _TZ_OFFSET_REGEX.fullmatch(tz_str)
        if match:
            sign, hours, minutes = match.groups()
            offset_seconds = (int(hours) * 3600 + int(minutes) * 60) * (
                -1 if sign == "-" else 1
            )
            if abs(offset_seconds) >= 24 * 3600:
                raise ValueError(f"Invalid timezone offset: {tz_str}")
            return dt.timezone(dt.timedelta(seconds=offset_seconds), name=tz_str)
        return None

    def _get_tzinfo(tz_identifier: str | None) -> dt.tzinfo | None:
        """Gets a tzinfo object from a string (offset or IANA name)."""
        if tz_identifier is None:
            return None
        if tz_identifier.upper() == "UTC":
            return dt.timezone.utc

        # Try parsing as offset first
        offset_tz = _parse_tz_offset(tz_identifier)
        if offset_tz:
            return offset_tz

        # Try parsing as IANA name using zoneinfo (if available)
        if ZoneInfo:
            try:
                return ZoneInfo(tz_identifier)
            except ZoneInfoNotFoundError:
                raise ValueError(
                    f"Timezone '{tz_identifier}' not found. Install 'tzdata' or use offset format."
                )
            except Exception as e:  # Catch other potential zoneinfo errors
                raise ValueError(f"Error loading timezone '{tz_identifier}': {e}")
        else:
            # zoneinfo not available
            raise ValueError(
                f"Invalid timezone: '{tz_identifier}'. Use offset format (e.g., '+02:00') "
                "or run Python 3.9+ with 'tzdata' installed for named timezones."
            )

    target_tz: dt.tzinfo | None = _get_tzinfo(tz)
    parsed_obj: dt.datetime | dt.date | dt.time | None = None

    # Preprocess: Replace space separator, strip whitespace
    processed_str = timestamp_str.strip().replace(" ", "T")

    # Attempt parsing (datetime, date, time) using fromisoformat
    try:
        # Python < 3.11 fromisoformat has limitations (e.g., no Z, no +HHMM offset)
        # This implementation assumes Python 3.11+ for full ISO 8601 support via fromisoformat
        # or that input strings use formats compatible with older versions (e.g., +HH:MM)
        parsed_obj = dt.datetime.fromisoformat(processed_str)
    except ValueError:
        try:
            parsed_obj = dt.date.fromisoformat(processed_str)
        except ValueError:
            try:
                # Time parsing needs care, especially with offsets in older Python
                parsed_obj = dt.time.fromisoformat(processed_str)
            except ValueError:
                # Add fallback for simple HH:MM:SS if needed, though less robust
                # try:
                #     parsed_obj = dt.datetime.strptime(processed_str, "%H:%M:%S").time()
                # except ValueError:
                raise ValueError(f"Invalid timestamp format: '{timestamp_str}'")

    # Apply timezone logic if we have a datetime or time object
    if isinstance(parsed_obj, (dt.datetime, dt.time)):
        is_aware = (
            parsed_obj.tzinfo is not None
            and parsed_obj.tzinfo.utcoffset(
                parsed_obj if isinstance(parsed_obj, dt.datetime) else None
            )
            is not None
        )

        if target_tz:
            if is_aware:
                # Convert existing aware object to target timezone (only for datetime)
                if isinstance(parsed_obj, dt.datetime):
                    parsed_obj = parsed_obj.astimezone(target_tz)
                # else: dt.time cannot be converted without a date context. Keep original tz.
            else:
                # Localize naive object to target timezone
                parsed_obj = parsed_obj.replace(tzinfo=target_tz)
            is_aware = True  # Object is now considered aware

        # Handle naive flag: remove tzinfo if requested
        if naive and is_aware:
            parsed_obj = parsed_obj.replace(tzinfo=None)

    # If it's a date object, tz/naive flags are ignored
    elif isinstance(parsed_obj, dt.date):
        pass

    return parsed_obj

fsspeckit.common.logging

Logging configuration utilities for fsspeckit.

Functions
fsspeckit.common.logging.get_logger
get_logger(name: str = 'fsspeckit') -> logger

Get a logger instance for the given name.

Parameters:

Name Type Description Default
name str

Logger name, typically the module name.

'fsspeckit'

Returns:

Type Description
logger

Configured logger instance.

Example
logger = get_logger(__name__)
logger.info("This is a log message")
Source code in src/fsspeckit/common/logging.py
def get_logger(name: str = "fsspeckit") -> "logger":
    """Get a logger instance for the given name.

    Args:
        name: Logger name, typically the module name.

    Returns:
        Configured logger instance.

    Example:
        ```python
        logger = get_logger(__name__)
        logger.info("This is a log message")
        ```
    """
    return logger.bind(name=name)
fsspeckit.common.logging.setup_logging
setup_logging(
    level: Optional[str] = None,
    disable: bool = False,
    format_string: Optional[str] = None,
) -> None

Configure the Loguru logger for fsspeckit.

Removes the default handler and adds a new one targeting stderr with customizable level and format.

Parameters:

Name Type Description Default
level Optional[str]

Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL). If None, uses fsspeckit_LOG_LEVEL environment variable or defaults to "INFO".

None
disable bool

Whether to disable logging for fsspeckit package.

False
format_string Optional[str]

Custom format string for log messages. If None, uses a default comprehensive format.

None
Example
1
2
3
4
5
6
7
8
# Basic setup
setup_logging()

# Custom level and format
setup_logging(level="DEBUG", format_string="{time} | {level} | {message}")

# Disable logging
setup_logging(disable=True)
Source code in src/fsspeckit/common/logging.py
def setup_logging(
    level: Optional[str] = None,
    disable: bool = False,
    format_string: Optional[str] = None,
) -> None:
    """Configure the Loguru logger for fsspeckit.

    Removes the default handler and adds a new one targeting stderr
    with customizable level and format.

    Args:
        level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
               If None, uses fsspeckit_LOG_LEVEL environment variable
               or defaults to "INFO".
        disable: Whether to disable logging for fsspeckit package.
        format_string: Custom format string for log messages.
                      If None, uses a default comprehensive format.

    Example:
        ```python
        # Basic setup
        setup_logging()

        # Custom level and format
        setup_logging(level="DEBUG", format_string="{time} | {level} | {message}")

        # Disable logging
        setup_logging(disable=True)
        ```
    """
    # Determine log level
    if level is None:
        level = os.getenv("fsspeckit_LOG_LEVEL", "INFO")

    # Default format if none provided
    if format_string is None:
        format_string = (
            "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
            "<level>{level: <8}</level> | "
            "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
            "<level>{message}</level>"
        )

    # Remove the default handler added by Loguru
    logger.remove()

    # Add new handler with custom configuration
    logger.add(
        sys.stderr,
        level=level.upper(),
        format=format_string,
    )

    # Optionally disable logging for this package
    if disable:
        logger.disable("fsspeckit")

fsspeckit.common.logging_config

Logging configuration utilities for fsspeckit using Python's standard logging module.

Functions
fsspeckit.common.logging_config.get_logger
get_logger(name: str) -> Logger

Get a properly configured logger for a module.

Parameters:

Name Type Description Default
name str

Module name (typically name)

required

Returns:

Type Description
Logger

Configured logger instance

Source code in src/fsspeckit/common/logging_config.py
def get_logger(name: str) -> logging.Logger:
    """
    Get a properly configured logger for a module.

    Args:
        name: Module name (typically __name__)

    Returns:
        Configured logger instance
    """
    # Auto-configure if not already done
    if not _configured:
        setup_logging()

    return logging.getLogger(f"fsspeckit.{name}")
fsspeckit.common.logging_config.setup_logging
setup_logging(
    level: str = "INFO",
    format_string: Optional[str] = None,
    include_timestamp: bool = True,
    enable_console: bool = True,
    enable_file: bool = False,
    file_path: Optional[str] = None,
) -> None

Configure logging for the fsspeckit package.

This should be called once at application startup.

Parameters:

Name Type Description Default
level str

Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

'INFO'
format_string Optional[str]

Custom log format string

None
include_timestamp bool

Whether to include timestamp in logs

True
enable_console bool

Whether to output to console

True
enable_file bool

Whether to output to file

False
file_path Optional[str]

Path for log file output

None
Source code in src/fsspeckit/common/logging_config.py
def setup_logging(
    level: str = "INFO",
    format_string: Optional[str] = None,
    include_timestamp: bool = True,
    enable_console: bool = True,
    enable_file: bool = False,
    file_path: Optional[str] = None
) -> None:
    """
    Configure logging for the fsspeckit package.

    This should be called once at application startup.

    Args:
        level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
        format_string: Custom log format string
        include_timestamp: Whether to include timestamp in logs
        enable_console: Whether to output to console
        enable_file: Whether to output to file
        file_path: Path for log file output
    """
    global _configured

    if _configured:
        return

    # Parse level from environment if not provided
    if not level:
        level = os.getenv('FSSPECKIT_LOG_LEVEL', 'INFO')

    # Set default format
    if not format_string:
        timestamp_part = "%(asctime)s - " if include_timestamp else ""
        format_string = f"{timestamp_part}%(name)s - %(levelname)s - %(message)s"

    # Configure root logger
    root_logger = logging.getLogger('fsspeckit')
    root_logger.setLevel(getattr(logging, level.upper()))

    # Clear existing handlers
    root_logger.handlers.clear()

    # Console handler
    if enable_console:
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(logging.Formatter(format_string))
        root_logger.addHandler(console_handler)

    # File handler
    if enable_file and file_path:
        file_handler = logging.FileHandler(file_path)
        file_handler.setFormatter(logging.Formatter(format_string))
        root_logger.addHandler(file_handler)

    _configured = True

fsspeckit.common.misc

Miscellaneous utility functions for fsspeckit.

Functions
fsspeckit.common.misc.check_fs_identical
check_fs_identical(
    fs1: AbstractFileSystem, fs2: AbstractFileSystem
) -> bool

Check if two fsspec filesystems are identical.

Parameters:

Name Type Description Default
fs1 AbstractFileSystem

First filesystem (fsspec AbstractFileSystem)

required
fs2 AbstractFileSystem

Second filesystem (fsspec AbstractFileSystem)

required

Returns:

Name Type Description
bool bool

True if filesystems are identical, False otherwise

Source code in src/fsspeckit/common/misc.py
def check_fs_identical(fs1: AbstractFileSystem, fs2: AbstractFileSystem) -> bool:
    """Check if two fsspec filesystems are identical.

    Args:
        fs1: First filesystem (fsspec AbstractFileSystem)
        fs2: Second filesystem (fsspec AbstractFileSystem)

    Returns:
        bool: True if filesystems are identical, False otherwise
    """

    def _get_root_fs(fs: AbstractFileSystem) -> AbstractFileSystem:
        while hasattr(fs, "fs"):
            fs = fs.fs
        return fs

    fs1 = _get_root_fs(fs1)
    fs2 = _get_root_fs(fs2)
    return fs1 == fs2
fsspeckit.common.misc.get_partitions_from_path
get_partitions_from_path(
    path: str, partitioning: Union[str, list, None] = None
) -> Dict[str, str]

Extract dataset partitions from a file path.

Parses file paths to extract partition information based on different partitioning schemes. By default, uses Hive-style partitioning.

Parameters:

Name Type Description Default
path str

File path potentially containing partition information.

required
partitioning Union[str, list, None]

Partitioning scheme: - "hive": Hive-style partitioning (key=value) - str: Single partition column name - list[str]: Multiple partition column names - None: Default to Hive-style partitioning

None

Returns:

Type Description
Dict[str, str]

Dictionary mapping partition keys to their values.

Examples:

1
2
3
>>> # Default Hive-style partitioning
>>> get_partitions_from_path("data/year=2023/month=01/file.parquet")
{'year': '2023', 'month': '01'}
1
2
3
>>> # Explicit Hive-style partitioning
>>> get_partitions_from_path("data/year=2023/month=01/file.parquet", "hive")
{'year': '2023', 'month': '01'}
1
2
3
>>> # Single partition column
>>> get_partitions_from_path("data/2023/01/file.parquet", "year")
{'year': '2023'}
1
2
3
>>> # Multiple partition columns
>>> get_partitions_from_path("data/2023/01/file.parquet", ["year", "month"])
{'year': '2023', 'month': '01'}
Source code in src/fsspeckit/common/misc.py
def get_partitions_from_path(
    path: str, partitioning: Union[str, list, None] = None
) -> Dict[str, str]:
    """Extract dataset partitions from a file path.

    Parses file paths to extract partition information based on
    different partitioning schemes. By default, uses Hive-style partitioning.

    Args:
        path: File path potentially containing partition information.
        partitioning: Partitioning scheme:
            - "hive": Hive-style partitioning (key=value)
            - str: Single partition column name
            - list[str]: Multiple partition column names
            - None: Default to Hive-style partitioning

    Returns:
        Dictionary mapping partition keys to their values.

    Examples:
        >>> # Default Hive-style partitioning
        >>> get_partitions_from_path("data/year=2023/month=01/file.parquet")
        {'year': '2023', 'month': '01'}

        >>> # Explicit Hive-style partitioning
        >>> get_partitions_from_path("data/year=2023/month=01/file.parquet", "hive")
        {'year': '2023', 'month': '01'}

        >>> # Single partition column
        >>> get_partitions_from_path("data/2023/01/file.parquet", "year")
        {'year': '2023'}

        >>> # Multiple partition columns
        >>> get_partitions_from_path("data/2023/01/file.parquet", ["year", "month"])
        {'year': '2023', 'month': '01'}
    """
    # Normalize path to handle Windows and relative paths
    normalized_path = Path(path).as_posix().replace("\\", "/")

    # Remove filename if present
    if "." in normalized_path:
        normalized_path = str(Path(normalized_path).parent)

    parts = normalized_path.split("/")

    # Default to Hive-style partitioning when partitioning is None
    if partitioning is None or partitioning == "hive":
        partitions = {}
        for part in parts:
            if "=" in part:
                key, value = part.split("=", 1)  # Split only on first =
                partitions[key] = value
        return partitions
    elif isinstance(partitioning, str):
        # Single partition column
        return {partitioning: parts[0]} if parts else {}
    elif isinstance(partitioning, list):
        # Multiple partition columns
        result = {}
        for i, col_name in enumerate(partitioning):
            if i < len(parts):
                result[col_name] = parts[-(len(partitioning) - i)]
        return result
    else:
        return {}
fsspeckit.common.misc.path_to_glob
path_to_glob(
    path: str, format: Union[str, None] = None
) -> str

Convert a path to a glob pattern for file matching.

Intelligently converts paths to glob patterns that match files of the specified format, handling various directory and wildcard patterns.

Parameters:

Name Type Description Default
path str

Base path to convert. Can include wildcards (* or ). Examples: "data/", "data/*.json", "data/"

required
format Union[str, None]

File format to match (without dot). If None, inferred from path. Examples: "json", "csv", "parquet"

None

Returns:

Name Type Description
str str

Glob pattern that matches files of specified format. Examples: "data/**/.json", "data/.csv"

Example
# Basic directory
print(path_to_glob("data", "json"))
# 'data/**/*.json'

# With wildcards
print(path_to_glob("data/**", "csv"))
# 'data/**/*.csv'

# Format inference
print(path_to_glob("data/file.parquet"))
# 'data/file.parquet'
Source code in src/fsspeckit/common/misc.py
def path_to_glob(path: str, format: Union[str, None] = None) -> str:
    """Convert a path to a glob pattern for file matching.

    Intelligently converts paths to glob patterns that match files of the specified
    format, handling various directory and wildcard patterns.

    Args:
        path: Base path to convert. Can include wildcards (* or **).
            Examples: "data/", "data/*.json", "data/**"
        format: File format to match (without dot). If None, inferred from path.
            Examples: "json", "csv", "parquet"

    Returns:
        str: Glob pattern that matches files of specified format.
            Examples: "data/**/*.json", "data/*.csv"

    Example:
        ```python
        # Basic directory
        print(path_to_glob("data", "json"))
        # 'data/**/*.json'

        # With wildcards
        print(path_to_glob("data/**", "csv"))
        # 'data/**/*.csv'

        # Format inference
        print(path_to_glob("data/file.parquet"))
        # 'data/file.parquet'
        ```
    """
    path = path.rstrip("/")
    if format is None:
        if ".json" in path:
            format = "json"
        elif ".csv" in path:
            format = "csv"
        elif ".parquet" in path:
            format = "parquet"

    if format is not None and format in path:
        return path
    else:
        if path.endswith("**"):
            return posixpath.join(path, f"*.{format}")
        elif path.endswith("*"):
            if path.endswith("*/*"):
                return path + f".{format}"
            return posixpath.join(path.rstrip("/*"), f"*.{format}")
        return posixpath.join(path, f"**/*.{format}")
fsspeckit.common.misc.sync_dir
sync_dir(
    src_fs: AbstractFileSystem,
    dst_fs: AbstractFileSystem,
    src_path: str = "",
    dst_path: str = "",
    server_side: bool = True,
    chunk_size: int = 8 * 1024 * 1024,
    parallel: bool = False,
    n_jobs: int = -1,
    verbose: bool = True,
) -> dict[str, list[str]]

Sync two directories between different filesystems.

Compares files in the source and destination directories, copies new or updated files from source to destination, and deletes stale files from destination.

Parameters:

Name Type Description Default
src_fs AbstractFileSystem

Source filesystem (fsspec AbstractFileSystem)

required
dst_fs AbstractFileSystem

Destination filesystem (fsspec AbstractFileSystem)

required
src_path str

Path in source filesystem to sync. Default is root ('').

''
dst_path str

Path in destination filesystem to sync. Default is root ('').

''
chunk_size int

Size of chunks to read/write files (in bytes). Default is 8MB.

8 * 1024 * 1024
parallel bool

Whether to perform copy/delete operations in parallel. Default is False.

False
n_jobs int

Number of parallel jobs if parallel=True. Default is -1 (all cores).

-1
verbose bool

Whether to show progress bars. Default is True.

True

Returns:

Name Type Description
dict dict[str, list[str]]

Summary of added and deleted files

Source code in src/fsspeckit/common/misc.py
def sync_dir(
    src_fs: AbstractFileSystem,
    dst_fs: AbstractFileSystem,
    src_path: str = "",
    dst_path: str = "",
    server_side: bool = True,
    chunk_size: int = 8 * 1024 * 1024,
    parallel: bool = False,
    n_jobs: int = -1,
    verbose: bool = True,
) -> dict[str, list[str]]:
    """Sync two directories between different filesystems.

    Compares files in the source and destination directories, copies new or updated files from source to destination,
    and deletes stale files from destination.

    Args:
        src_fs: Source filesystem (fsspec AbstractFileSystem)
        dst_fs: Destination filesystem (fsspec AbstractFileSystem)
        src_path: Path in source filesystem to sync. Default is root ('').
        dst_path: Path in destination filesystem to sync. Default is root ('').
        chunk_size: Size of chunks to read/write files (in bytes). Default is 8MB.
        parallel: Whether to perform copy/delete operations in parallel. Default is False.
        n_jobs: Number of parallel jobs if parallel=True. Default is -1 (all cores).
        verbose: Whether to show progress bars. Default is True.

    Returns:
        dict: Summary of added and deleted files
    """

    src_mapper = src_fs.get_mapper(src_path)
    dst_mapper = dst_fs.get_mapper(dst_path)

    add_files = sorted(src_mapper.keys() - dst_mapper.keys())
    delete_files = sorted(dst_mapper.keys() - src_mapper.keys())

    return sync_files(
        add_files=add_files,
        delete_files=delete_files,
        src_fs=src_fs,
        dst_fs=dst_fs,
        src_path=src_path,
        dst_path=dst_path,
        chunk_size=chunk_size,
        server_side=server_side,
        parallel=parallel,
        n_jobs=n_jobs,
        verbose=verbose,
    )
fsspeckit.common.misc.sync_files
sync_files(
    add_files: list[str],
    delete_files: list[str],
    src_fs: AbstractFileSystem,
    dst_fs: AbstractFileSystem,
    src_path: str = "",
    dst_path: str = "",
    server_side: bool = False,
    chunk_size: int = 8 * 1024 * 1024,
    parallel: bool = False,
    n_jobs: int = -1,
    verbose: bool = True,
) -> dict[str, list[str]]

Sync files between two filesystems by copying new files and deleting old ones.

Parameters:

Name Type Description Default
add_files list[str]

List of file paths to add (copy from source to destination)

required
delete_files list[str]

List of file paths to delete from destination

required
src_fs AbstractFileSystem

Source filesystem (fsspec AbstractFileSystem)

required
dst_fs AbstractFileSystem

Destination filesystem (fsspec AbstractFileSystem)

required
src_path str

Base path in source filesystem. Default is root ('').

''
dst_path str

Base path in destination filesystem. Default is root ('').

''
server_side bool

Whether to use server-side copy if supported. Default is False.

False
chunk_size int

Size of chunks to read/write files (in bytes). Default is 8MB.

8 * 1024 * 1024
parallel bool

Whether to perform copy/delete operations in parallel. Default is False.

False
n_jobs int

Number of parallel jobs if parallel=True. Default is -1 (all cores).

-1
verbose bool

Whether to show progress bars. Default is True.

True

Returns:

Name Type Description
dict dict[str, list[str]]

Summary of added and deleted files

Source code in src/fsspeckit/common/misc.py
def sync_files(
    add_files: list[str],
    delete_files: list[str],
    src_fs: AbstractFileSystem,
    dst_fs: AbstractFileSystem,
    src_path: str = "",
    dst_path: str = "",
    server_side: bool = False,
    chunk_size: int = 8 * 1024 * 1024,
    parallel: bool = False,
    n_jobs: int = -1,
    verbose: bool = True,
) -> dict[str, list[str]]:
    """Sync files between two filesystems by copying new files and deleting old ones.

    Args:
        add_files: List of file paths to add (copy from source to destination)
        delete_files: List of file paths to delete from destination
        src_fs: Source filesystem (fsspec AbstractFileSystem)
        dst_fs: Destination filesystem (fsspec AbstractFileSystem)
        src_path: Base path in source filesystem. Default is root ('').
        dst_path: Base path in destination filesystem. Default is root ('').
        server_side: Whether to use server-side copy if supported. Default is False.
        chunk_size: Size of chunks to read/write files (in bytes). Default is 8MB.
        parallel: Whether to perform copy/delete operations in parallel. Default is False.
        n_jobs: Number of parallel jobs if parallel=True. Default is -1 (all cores).
        verbose: Whether to show progress bars. Default is True.

    Returns:
        dict: Summary of added and deleted files
    """
    CHUNK = chunk_size
    RETRIES = 3

    server_side = check_fs_identical(src_fs, dst_fs) and server_side

    src_mapper = src_fs.get_mapper(src_path)
    dst_mapper = dst_fs.get_mapper(dst_path)

    def server_side_copy_file(key, src_mapper, dst_mapper, RETRIES):
        last_exc = None
        for attempt in range(1, RETRIES + 1):
            try:
                dst_mapper[key] = src_mapper[key]
                break
            except (OSError, IOError) as e:
                last_exc = e
                if attempt == RETRIES:
                    logger.error(
                        "Failed to copy file %s after %d attempts: %s",
                        key,
                        RETRIES,
                        str(e),
                        exc_info=True,
                    )
                    raise RuntimeError(f"Failed to copy file {key} after {RETRIES} attempts") from e
            except Exception as e:
                last_exc = e
                if attempt == RETRIES:
                    logger.error(
                        "Unexpected error copying file %s: %s",
                        key,
                        str(e),
                        exc_info=True,
                    )
                    raise RuntimeError(f"Unexpected error copying file {key}: {e}") from e

    def copy_file(key, src_fs, dst_fs, src_path, dst_path, CHUNK, RETRIES):
        last_exc = None
        for attempt in range(1, RETRIES + 1):
            try:
                with (
                    src_fs.open(posixpath.join(src_path, key), "rb") as r,
                    dst_fs.open(posixpath.join(dst_path, key), "wb") as w,
                ):
                    while True:
                        chunk = r.read(CHUNK)
                        if not chunk:
                            break
                        w.write(chunk)
                break
            except (OSError, IOError) as e:
                last_exc = e
                if attempt == RETRIES:
                    logger.error(
                        "Failed to copy file %s after %d attempts: %s",
                        key,
                        RETRIES,
                        str(e),
                        exc_info=True,
                    )
                    raise RuntimeError(f"Failed to copy file {key} after {RETRIES} attempts") from e
            except Exception as e:
                last_exc = e
                if attempt == RETRIES:
                    logger.error(
                        "Unexpected error copying file %s: %s",
                        key,
                        str(e),
                        exc_info=True,
                    )
                    raise RuntimeError(f"Unexpected error copying file {key}: {e}") from e

    def delete_file(key, dst_fs, dst_path, RETRIES):
        last_exc = None
        for attempt in range(1, RETRIES + 1):
            try:
                dst_fs.rm(posixpath.join(dst_path, key))
                break
            except (OSError, IOError) as e:
                last_exc = e
                if attempt == RETRIES:
                    logger.error(
                        "Failed to delete file %s after %d attempts: %s",
                        key,
                        RETRIES,
                        str(e),
                        exc_info=True,
                    )
                    raise RuntimeError(f"Failed to delete file {key} after {RETRIES} attempts") from e
            except Exception as e:
                last_exc = e
                if attempt == RETRIES:
                    logger.error(
                        "Unexpected error deleting file %s: %s",
                        key,
                        str(e),
                        exc_info=True,
                    )
                    raise RuntimeError(f"Unexpected error deleting file {key}: {e}") from e

    if len(add_files):
        # Copy new files
        if parallel:
            if server_side:
                try:
                    run_parallel(
                        server_side_copy_file,
                        add_files,
                        src_mapper=src_mapper,
                        dst_mapper=dst_mapper,
                        RETRIES=RETRIES,
                        n_jobs=n_jobs,
                        verbose=verbose,
                    )
                except (RuntimeError, OSError) as e:
                    logger.warning(
                        "Server-side copy failed for some files, falling back to client-side: %s",
                        str(e),
                    )
                    # Fallback to client-side copy if server-side fails
                    run_parallel(
                        copy_file,
                        add_files,
                        src_fs=src_fs,
                        dst_fs=dst_fs,
                        src_path=src_path,
                        dst_path=dst_path,
                        CHUNK=CHUNK,
                        RETRIES=RETRIES,
                        n_jobs=n_jobs,
                        verbose=verbose,
                    )

            else:
                run_parallel(
                    copy_file,
                    add_files,
                    src_fs=src_fs,
                    dst_fs=dst_fs,
                    src_path=src_path,
                    dst_path=dst_path,
                    CHUNK=CHUNK,
                    RETRIES=RETRIES,
                    n_jobs=n_jobs,
                    verbose=verbose,
                )
        else:
            if verbose:
                from rich.progress import track

                for key in track(
                    add_files,
                    description="Copying new files...",
                    total=len(add_files),
                ):
                    if server_side:
                        try:
                            server_side_copy_file(key, src_mapper, dst_mapper, RETRIES)
                        except (RuntimeError, OSError):
                            copy_file(
                                key, src_fs, dst_fs, src_path, dst_path, CHUNK, RETRIES
                            )
                    else:
                        copy_file(
                            key, src_fs, dst_fs, src_path, dst_path, CHUNK, RETRIES
                        )
            else:
                for key in add_files:
                    if server_side:
                        try:
                            server_side_copy_file(key, src_mapper, dst_mapper, RETRIES)
                        except (RuntimeError, OSError):
                            copy_file(
                                key, src_fs, dst_fs, src_path, dst_path, CHUNK, RETRIES
                            )
                    else:
                        copy_file(
                            key, src_fs, dst_fs, src_path, dst_path, CHUNK, RETRIES
                        )

    if len(delete_files):
        # Delete old files from destination
        if parallel:
            run_parallel(
                delete_file,
                delete_files,
                dst_fs=dst_fs,
                dst_path=dst_path,
                RETRIES=RETRIES,
                n_jobs=n_jobs,
                verbose=verbose,
            )
        else:
            if verbose:
                from rich.progress import track

                for key in track(
                    delete_files,
                    description="Deleting stale files...",
                    total=len(delete_files),
                ):
                    delete_file(key, dst_fs, dst_path, RETRIES)
            else:
                for key in delete_files:
                    delete_file(key, dst_fs, dst_path, RETRIES)

    return {"added_files": add_files, "deleted_files": delete_files}

fsspeckit.common.optional

Optional dependency management utilities.

This module provides utilities for managing optional dependencies in fsspeckit, implementing lazy loading patterns that allow core functionality to work without requiring all optional dependencies to be installed.

Functions
fsspeckit.common.optional.check_optional_dependency
check_optional_dependency(
    package_name: str, feature_name: Optional[str] = None
) -> None

Check if an optional dependency is available and raise helpful error if not.

Parameters:

Name Type Description Default
package_name str

Name of the required package

required
feature_name Optional[str]

Name of the feature that requires this package (optional)

None

Raises:

Type Description
ImportError

If the package is not available

Source code in src/fsspeckit/common/optional.py
def check_optional_dependency(
    package_name: str, feature_name: Optional[str] = None
) -> None:
    """Check if an optional dependency is available and raise helpful error if not.

    Args:
        package_name: Name of the required package
        feature_name: Name of the feature that requires this package (optional)

    Raises:
        ImportError: If the package is not available
    """
    if not importlib.util.find_spec(package_name):
        extra = _get_install_extra(package_name)
        feature_msg = f" for {feature_name}" if feature_name else ""
        raise ImportError(
            f"{package_name} is required{feature_msg}. "
            f"Install with: pip install fsspeckit[{extra}]"
        )

fsspeckit.common.partitions

Shared partition utilities for fsspeckit.

This module provides canonical implementations for partition parsing and related operations across all backends. It consolidates partition-related logic that was previously scattered across different modules.

Key responsibilities: 1. Partition extraction from file paths 2. Support for multiple partitioning schemes (Hive, directory-based) 3. Partition validation and normalization 4. Path manipulation for partitioned datasets

Architecture: - Functions are designed to work with string paths and fsspec filesystems - Support for common partitioning patterns used in data lakes - Consistent behavior across all backends - Extensible design for custom partitioning schemes

Usage: Backend implementations should delegate to this module rather than implementing their own partition parsing logic. This ensures consistent behavior across DuckDB, PyArrow, and future backends.

Functions
fsspeckit.common.partitions.apply_partition_pruning
apply_partition_pruning(
    paths: list[str],
    partition_filters: dict[str, Any],
    partitioning: str | list[str] | None = None,
) -> list[str]

Apply partition pruning to reduce the set of files to scan.

This is an optimization that eliminates files based on partition values before any I/O operations.

Parameters:

Name Type Description Default
paths list[str]

List of all file paths.

required
partition_filters dict[str, Any]

Dictionary of partition filters to apply.

required
partitioning str | list[str] | None

Partitioning scheme.

None

Returns:

Type Description
list[str]

Pruned list of paths.

Source code in src/fsspeckit/common/partitions.py
def apply_partition_pruning(
    paths: list[str],
    partition_filters: dict[str, Any],
    partitioning: str | list[str] | None = None,
) -> list[str]:
    """
    Apply partition pruning to reduce the set of files to scan.

    This is an optimization that eliminates files based on partition
    values before any I/O operations.

    Args:
        paths: List of all file paths.
        partition_filters: Dictionary of partition filters to apply.
        partitioning: Partitioning scheme.

    Returns:
        Pruned list of paths.
    """
    if not partition_filters:
        return paths

    return filter_paths_by_partitions(paths, partition_filters, partitioning)
fsspeckit.common.partitions.build_partition_path
build_partition_path(
    base_path: str,
    partitions: list[tuple[str, str]],
    partitioning: str = "hive",
) -> str

Build a file path with partition directories.

Parameters:

Name Type Description Default
base_path str

Base directory path.

required
partitions list[tuple[str, str]]

List of (column, value) tuples.

required
partitioning str

Partitioning scheme ("hive" or "directory").

'hive'

Returns:

Type Description
str

Path string with partition directories.

Source code in src/fsspeckit/common/partitions.py
def build_partition_path(
    base_path: str, partitions: list[tuple[str, str]], partitioning: str = "hive"
) -> str:
    """
    Build a file path with partition directories.

    Args:
        base_path: Base directory path.
        partitions: List of (column, value) tuples.
        partitioning: Partitioning scheme ("hive" or "directory").

    Returns:
        Path string with partition directories.
    """
    if not partitions:
        return base_path

    if partitioning == "hive":
        # Hive-style: column=value/column=value
        partition_dirs = [f"{col}={val}" for col, val in partitions]
    else:
        # Directory-style: value/value (order matters)
        partition_dirs = [val for _, val in partitions]

    return "/".join([base_path.rstrip("/")] + partition_dirs)
fsspeckit.common.partitions.create_partition_expression
create_partition_expression(
    partitions: list[tuple[str, str]],
    backend: str = "pyarrow",
) -> Any

Create a partition filter expression for different backends.

Parameters:

Name Type Description Default
partitions list[tuple[str, str]]

List of (column, value) tuples.

required
backend str

Target backend ("pyarrow", "duckdb").

'pyarrow'

Returns:

Type Description
Any

Backend-specific filter expression.

Source code in src/fsspeckit/common/partitions.py
def create_partition_expression(
    partitions: list[tuple[str, str]], backend: str = "pyarrow"
) -> Any:
    """
    Create a partition filter expression for different backends.

    Args:
        partitions: List of (column, value) tuples.
        backend: Target backend ("pyarrow", "duckdb").

    Returns:
        Backend-specific filter expression.
    """
    if not partitions:
        return None

    if backend == "pyarrow":
        import pyarrow.dataset as ds

        # Build PyArrow dataset filter expression
        expressions = []
        for col, val in partitions:
            expressions.append(ds.field(col) == val)

        # Combine with AND logic
        result = expressions[0]
        for expr in expressions[1:]:
            result = result & expr
        return result

    elif backend == "duckdb":
        # Build DuckDB WHERE clause
        conditions = []
        for col, val in partitions:
            if isinstance(val, str):
                conditions.append(f"\"{col}\" = '{val}'")
            else:
                conditions.append(f'"{col}" = {val}')

        return " AND ".join(conditions)

    else:
        raise ValueError(f"Unsupported backend: {backend}")
fsspeckit.common.partitions.extract_partition_filters
extract_partition_filters(
    paths: list[str],
    partitioning: str | list[str] | None = None,
) -> dict[str, set[str]]

Extract unique partition values from a list of paths.

Parameters:

Name Type Description Default
paths list[str]

List of file paths.

required
partitioning str | list[str] | None

Partitioning scheme.

None

Returns:

Type Description
dict[str, set[str]]

Dictionary mapping column names to sets of unique values.

Source code in src/fsspeckit/common/partitions.py
def extract_partition_filters(
    paths: list[str], partitioning: str | list[str] | None = None
) -> dict[str, set[str]]:
    """
    Extract unique partition values from a list of paths.

    Args:
        paths: List of file paths.
        partitioning: Partitioning scheme.

    Returns:
        Dictionary mapping column names to sets of unique values.
    """
    partition_values = {}

    for path in paths:
        partitions = get_partitions_from_path(path, partitioning)
        for col, val in partitions:
            if col not in partition_values:
                partition_values[col] = set()
            partition_values[col].add(val)

    return partition_values
fsspeckit.common.partitions.filter_paths_by_partitions
filter_paths_by_partitions(
    paths: list[str],
    partition_filters: dict[str, str | list[str]],
    partitioning: str | list[str] | None = None,
) -> list[str]

Filter paths based on partition values.

Parameters:

Name Type Description Default
paths list[str]

List of file paths to filter.

required
partition_filters dict[str, str | list[str]]

Dictionary mapping column names to filter values.

required
partitioning str | list[str] | None

Partitioning scheme.

None

Returns:

Type Description
list[str]

Filtered list of paths.

Source code in src/fsspeckit/common/partitions.py
def filter_paths_by_partitions(
    paths: list[str],
    partition_filters: dict[str, str | list[str]],
    partitioning: str | list[str] | None = None,
) -> list[str]:
    """
    Filter paths based on partition values.

    Args:
        paths: List of file paths to filter.
        partition_filters: Dictionary mapping column names to filter values.
        partitioning: Partitioning scheme.

    Returns:
        Filtered list of paths.
    """
    filtered_paths = []

    for path in paths:
        partitions = dict(get_partitions_from_path(path, partitioning))

        # Check if path matches all filters
        matches = True
        for col, filter_val in partition_filters.items():
            if col not in partitions:
                matches = False
                break

            path_val = partitions[col]

            # Handle list of allowed values
            if isinstance(filter_val, list):
                if path_val not in filter_val:
                    matches = False
                    break
            else:
                # Single value comparison
                if path_val != filter_val:
                    matches = False
                    break

        if matches:
            filtered_paths.append(path)

    return filtered_paths
fsspeckit.common.partitions.get_partition_columns_from_paths
get_partition_columns_from_paths(
    paths: list[str],
    partitioning: str | list[str] | None = None,
) -> list[str]

Get all unique partition column names from a list of paths.

Parameters:

Name Type Description Default
paths list[str]

List of file paths.

required
partitioning str | list[str] | None

Partitioning scheme.

None

Returns:

Type Description
list[str]

List of unique partition column names.

Source code in src/fsspeckit/common/partitions.py
def get_partition_columns_from_paths(
    paths: list[str], partitioning: str | list[str] | None = None
) -> list[str]:
    """
    Get all unique partition column names from a list of paths.

    Args:
        paths: List of file paths.
        partitioning: Partitioning scheme.

    Returns:
        List of unique partition column names.
    """
    columns = set()

    for path in paths:
        partitions = get_partitions_from_path(path, partitioning)
        for col, _ in partitions:
            columns.add(col)

    return sorted(list(columns))
fsspeckit.common.partitions.get_partitions_from_path
get_partitions_from_path(
    path: str,
    partitioning: Union[str, list[str], None] = None,
) -> list[tuple]

Extract dataset partitions from a file path.

Parses file paths to extract partition information based on different partitioning schemes. This is the canonical implementation used across all fsspeckit backends.

Parameters:

Name Type Description Default
path str

File path potentially containing partition information.

required
partitioning Union[str, list[str], None]

Partitioning scheme: - "hive": Hive-style partitioning (key=value) - str: Single partition column name - list[str]: Multiple partition column names - None: Return empty list

None

Returns:

Type Description
list[tuple]

List of tuples containing (column, value) pairs.

Examples:

1
2
3
>>> # Hive-style partitioning
>>> get_partitions_from_path("data/year=2023/month=01/file.parquet", "hive")
[('year', '2023'), ('month', '01')]
1
2
3
>>> # Single partition column
>>> get_partitions_from_path("data/2023/01/file.parquet", "year")
[('year', '2023')]
1
2
3
>>> # Multiple partition columns
>>> get_partitions_from_path("data/2023/01/file.parquet", ["year", "month"])
[('year', '2023'), ('month', '01')]
1
2
3
>>> # No partitioning
>>> get_partitions_from_path("data/file.parquet", None)
[]
Source code in src/fsspeckit/common/partitions.py
def get_partitions_from_path(
    path: str, partitioning: Union[str, list[str], None] = None
) -> list[tuple]:
    """
    Extract dataset partitions from a file path.

    Parses file paths to extract partition information based on
    different partitioning schemes. This is the canonical implementation
    used across all fsspeckit backends.

    Args:
        path: File path potentially containing partition information.
        partitioning: Partitioning scheme:
            - "hive": Hive-style partitioning (key=value)
            - str: Single partition column name
            - list[str]: Multiple partition column names
            - None: Return empty list

    Returns:
        List of tuples containing (column, value) pairs.

    Examples:
        >>> # Hive-style partitioning
        >>> get_partitions_from_path("data/year=2023/month=01/file.parquet", "hive")
        [('year', '2023'), ('month', '01')]

        >>> # Single partition column
        >>> get_partitions_from_path("data/2023/01/file.parquet", "year")
        [('year', '2023')]

        >>> # Multiple partition columns
        >>> get_partitions_from_path("data/2023/01/file.parquet", ["year", "month"])
        [('year', '2023'), ('month', '01')]

        >>> # No partitioning
        >>> get_partitions_from_path("data/file.parquet", None)
        []
    """
    if "." in path:
        path = os.path.dirname(path)

    parts = path.split("/")

    if isinstance(partitioning, str):
        if partitioning == "hive":
            return [tuple(p.split("=")) for p in parts if "=" in p]
        else:
            # Single partition column - take the first directory that looks like a value
            # This is a simple heuristic for cases like data/2023/file.parquet
            if parts:
                return [(partitioning, parts[0])]
            return []
    elif isinstance(partitioning, list):
        # Multiple partition columns - map column names to path parts from right to left
        if not parts:
            return []

        # Take the last N parts where N is the number of partition columns
        partition_parts = (
            parts[-len(partitioning) :] if len(parts) >= len(partitioning) else parts
        )
        return list(zip(partitioning, partition_parts))
    else:
        return []
fsspeckit.common.partitions.infer_partitioning_scheme
infer_partitioning_scheme(
    paths: list[str], max_samples: int = 100
) -> dict[str, Any]

Infer the partitioning scheme from a sample of paths.

Parameters:

Name Type Description Default
paths list[str]

List of file paths to analyze.

required
max_samples int

Maximum number of paths to sample.

100

Returns:

Type Description
dict[str, Any]

Dictionary with inferred scheme information.

Source code in src/fsspeckit/common/partitions.py
def infer_partitioning_scheme(
    paths: list[str], max_samples: int = 100
) -> dict[str, Any]:
    """
    Infer the partitioning scheme from a sample of paths.

    Args:
        paths: List of file paths to analyze.
        max_samples: Maximum number of paths to sample.

    Returns:
        Dictionary with inferred scheme information.
    """
    if not paths:
        return {"scheme": None, "confidence": 0.0}

    # Sample paths for analysis
    sample_paths = paths[:max_samples] if len(paths) > max_samples else paths

    # Check for Hive-style partitioning
    hive_partitions = []
    directory_partitions = []

    for path in sample_paths:
        # Remove filename and get directory parts
        dir_path = os.path.dirname(path)
        parts = dir_path.split("/")

        # Look for key=value patterns
        hive_parts = [p for p in parts if "=" in p and p.split("=")[0].strip()]
        if hive_parts:
            hive_partitions.append(len(hive_parts))

        # Look for directory-style partitions (numeric dates, etc.)
        dir_parts = [
            p
            for p in parts
            if p.replace("/", "").replace("-", "").replace("_", "").isdigit()
        ]
        if dir_parts:
            directory_partitions.append(len(dir_parts))

    # Determine most likely scheme
    result = {"scheme": None, "confidence": 0.0}

    if hive_partitions:
        avg_hive_parts = sum(hive_partitions) / len(hive_partitions)
        if avg_hive_parts >= 1:  # At least 1 partition level on average
            result["scheme"] = "hive"
            result["confidence"] = min(
                1.0, avg_hive_parts / 3.0
            )  # Normalize by expected max
            result["avg_partitions"] = avg_hive_parts
            return result

    if directory_partitions:
        avg_dir_parts = sum(directory_partitions) / len(directory_partitions)
        if avg_dir_parts >= 1:  # At least 1 partition level on average
            result["scheme"] = "directory"
            result["confidence"] = min(
                1.0, avg_dir_parts / 3.0
            )  # Normalize by expected max
            result["avg_partitions"] = avg_dir_parts
            return result

    # No clear partitioning detected
    return result
fsspeckit.common.partitions.normalize_partition_value
normalize_partition_value(value: str) -> str

Normalize a partition value for consistent comparison.

Parameters:

Name Type Description Default
value str

Raw partition value from path.

required

Returns:

Type Description
str

Normalized partition value.

Source code in src/fsspeckit/common/partitions.py
def normalize_partition_value(value: str) -> str:
    """
    Normalize a partition value for consistent comparison.

    Args:
        value: Raw partition value from path.

    Returns:
        Normalized partition value.
    """
    return value.strip().strip("\"'").replace("\\", "")
fsspeckit.common.partitions.validate_partition_columns
validate_partition_columns(
    partitions: list[tuple[str, str]],
    expected_columns: list[str] | None = None,
) -> bool

Validate partition columns against expected schema.

Parameters:

Name Type Description Default
partitions list[tuple[str, str]]

List of (column, value) tuples.

required
expected_columns list[str] | None

Optional list of expected column names.

None

Returns:

Type Description
bool

True if partitions are valid, False otherwise.

Source code in src/fsspeckit/common/partitions.py
def validate_partition_columns(
    partitions: list[tuple[str, str]], expected_columns: list[str] | None = None
) -> bool:
    """
    Validate partition columns against expected schema.

    Args:
        partitions: List of (column, value) tuples.
        expected_columns: Optional list of expected column names.

    Returns:
        True if partitions are valid, False otherwise.
    """
    if not partitions:
        return True

    if expected_columns is not None:
        partition_columns = {col for col, _ in partitions}
        expected_set = set(expected_columns)

        # Check if all partition columns are expected
        if not partition_columns.issubset(expected_set):
            return False

        # Check if all expected columns are present (if strict validation needed)
        # This is optional - some datasets might have missing partitions
        # return partition_columns == expected_set

    # Validate that no column names are empty
    for col, val in partitions:
        if not col or not col.strip():
            return False

    return True

fsspeckit.common.polars

Functions
fsspeckit.common.polars.drop_null_columns
drop_null_columns(
    df: DataFrame | LazyFrame,
) -> DataFrame | LazyFrame

Remove columns with all null values from the DataFrame.

Source code in src/fsspeckit/common/polars.py
def drop_null_columns(df: pl.DataFrame | pl.LazyFrame) -> pl.DataFrame | pl.LazyFrame:
    """Remove columns with all null values from the DataFrame."""
    return df.select([col for col in df.columns if df[col].null_count() < df.height])
fsspeckit.common.polars.opt_dtype
opt_dtype(
    df: DataFrame,
    include: str | list[str] | None = None,
    exclude: str | list[str] | None = None,
    time_zone: str | None = None,
    shrink_numerics: bool = False,
    allow_unsigned: bool = True,
    allow_null: bool = True,
    sample_size: int | None = 1024,
    sample_method: SampleMethod = "first",
    strict: bool = False,
    *,
    force_timezone: str | None = None,
) -> DataFrame

Optimize data types of a Polars DataFrame for performance and memory efficiency.

This function analyzes each column and converts it to the most appropriate data type based on content, handling string-to-type conversions and numeric type downcasting.

Parameters:

Name Type Description Default
df DataFrame

The Polars DataFrame to optimize.

required
include str | list[str] | None

Column(s) to include in optimization (default: all columns).

None
exclude str | list[str] | None

Column(s) to exclude from optimization.

None
time_zone str | None

Optional time zone hint during datetime parsing.

None
shrink_numerics bool

Whether to downcast numeric types when possible.

False
allow_unsigned bool

Whether to allow unsigned integer types.

True
allow_null bool

Whether to allow columns with all null values to be cast to Null type.

True
sample_size int | None

Maximum number of cleaned values to inspect for regex-based inference. Use None to inspect the entire column.

1024
sample_method SampleMethod

Which subset to inspect ("first" or "random").

'first'
strict bool

If True, will raise an error if any column cannot be optimized.

False
force_timezone str | None

If set, ensure all parsed datetime columns end up with this timezone.

None

Returns:

Type Description
DataFrame

DataFrame with optimized data types.

Source code in src/fsspeckit/common/polars.py
def opt_dtype(
    df: pl.DataFrame,
    include: str | list[str] | None = None,
    exclude: str | list[str] | None = None,
    time_zone: str | None = None,
    shrink_numerics: bool = False,
    allow_unsigned: bool = True,
    allow_null: bool = True,
    sample_size: int | None = 1024,
    sample_method: SampleMethod = "first",
    strict: bool = False,
    *,
    force_timezone: str | None = None,
) -> pl.DataFrame:
    """
    Optimize data types of a Polars DataFrame for performance and memory efficiency.

    This function analyzes each column and converts it to the most appropriate
    data type based on content, handling string-to-type conversions and
    numeric type downcasting.

    Args:
        df: The Polars DataFrame to optimize.
        include: Column(s) to include in optimization (default: all columns).
        exclude: Column(s) to exclude from optimization.
        time_zone: Optional time zone hint during datetime parsing.
        shrink_numerics: Whether to downcast numeric types when possible.
        allow_unsigned: Whether to allow unsigned integer types.
        allow_null: Whether to allow columns with all null values to be cast to Null type.
        sample_size: Maximum number of cleaned values to inspect for regex-based inference. Use None to inspect the entire column.
        sample_method: Which subset to inspect (`"first"` or `"random"`).
        strict: If True, will raise an error if any column cannot be optimized.
        force_timezone: If set, ensure all parsed datetime columns end up with this timezone.

    Returns:
        DataFrame with optimized data types.
    """
    if sample_method not in ("first", "random"):
        raise ValueError("sample_method must be 'first' or 'random'")

    if isinstance(df, pl.LazyFrame):
        return opt_dtype(
            df.collect(),
            include=include,
            exclude=exclude,
            time_zone=time_zone,
            shrink_numerics=shrink_numerics,
            allow_unsigned=allow_unsigned,
            allow_null=allow_null,
            sample_size=sample_size,
            sample_method=sample_method,
            strict=strict,
            force_timezone=force_timezone,
        ).lazy()

    # Normalize include/exclude parameters
    if isinstance(include, str):
        include = [include]
    if isinstance(exclude, str):
        exclude = [exclude]

    # Determine columns to process
    cols_to_process = df.columns
    if include:
        cols_to_process = [col for col in include if col in df.columns]
    if exclude:
        cols_to_process = [col for col in cols_to_process if col not in exclude]

    # Generate optimization expressions for all columns
    expressions = []
    for col_name in cols_to_process:
        try:
            expressions.append(
                _get_column_expr(
                    df,
                    col_name,
                    shrink_numerics,
                    allow_unsigned,
                    allow_null,
                    time_zone,
                    force_timezone,
                    sample_size,
                    sample_method,
                    strict,
                )
            )
        except Exception as e:
            if strict:
                raise e
            # If strict mode is off, just keep the original column
            continue

    # Apply all transformations at once if any exist
    return df if not expressions else df.with_columns(expressions)

fsspeckit.common.schema

Shared schema utilities for fsspeckit.

This module provides canonical implementations for schema compatibility, unification, timezone handling, and type optimization across all backends. It consolidates schema-related logic that was previously scattered across dataset-specific modules.

Key responsibilities: 1. Schema unification with intelligent conflict resolution 2. Timezone standardization and detection 3. Large type conversion to standard types 4. Schema casting and validation 5. Data type optimization 6. Empty column handling

Architecture: - Functions are designed to work with PyArrow schemas and tables - All operations preserve metadata when possible - Timezone handling supports multiple strategies (auto, explicit, removal) - Type optimization includes safety checks and fallback strategies - Schema unification uses multiple fallback strategies for maximum compatibility

Usage: Backend implementations should delegate to this module rather than implementing their own schema logic. This ensures consistent behavior across DuckDB, PyArrow, and future backends.

Functions
fsspeckit.common.schema.cast_schema
cast_schema(table: Table, schema: Schema) -> Table

Cast a PyArrow table to a given schema, updating the schema to match the table's columns.

Parameters:

Name Type Description Default
table Table

The PyArrow table to cast.

required
schema Schema

The target schema to cast table to.

required

Returns:

Type Description
Table

pa.Table: A new PyArrow table with the specified schema.

Source code in src/fsspeckit/common/schema.py
def cast_schema(table: pa.Table, schema: pa.Schema) -> pa.Table:
    """
    Cast a PyArrow table to a given schema, updating the schema to match the table's columns.

    Args:
        table (pa.Table): The PyArrow table to cast.
        schema (pa.Schema): The target schema to cast table to.

    Returns:
        pa.Table: A new PyArrow table with the specified schema.
    """
    table_columns = set(table.schema.names)
    for field in schema:
        if field.name not in table_columns:
            table = table.append_column(
                field.name, pa.nulls(table.num_rows, type=field.type)
            )
    return table.cast(schema)
fsspeckit.common.schema.convert_large_types_to_normal
convert_large_types_to_normal(schema: Schema) -> Schema

Convert large types in a PyArrow schema to their standard types.

Parameters:

Name Type Description Default
schema Schema

The PyArrow schema to convert.

required

Returns:

Type Description
Schema

pa.Schema: A new PyArrow schema with large types converted to standard types.

Source code in src/fsspeckit/common/schema.py
def convert_large_types_to_normal(schema: pa.Schema) -> pa.Schema:
    """
    Convert large types in a PyArrow schema to their standard types.

    Args:
        schema (pa.Schema): The PyArrow schema to convert.

    Returns:
        pa.Schema: A new PyArrow schema with large types converted to standard types.
    """
    # Define mapping of large types to standard types
    type_mapping = {
        pa.large_string(): pa.string(),
        pa.large_binary(): pa.binary(),
        pa.large_utf8(): pa.utf8(),
        pa.large_list(pa.null()): pa.list_(pa.null()),
        pa.large_list_view(pa.null()): pa.list_view(pa.null()),
    }
    # Convert fields
    new_fields = []
    for field in schema:
        field_type = field.type
        # Check if type exists in mapping
        if field_type in type_mapping:
            new_field = pa.field(
                name=field.name,
                type=type_mapping[field_type],
                nullable=field.nullable,
                metadata=field.metadata,
            )
            new_fields.append(new_field)
        # Handle large lists with nested types
        elif isinstance(field_type, pa.LargeListType):
            new_field = pa.field(
                name=field.name,
                type=pa.list_(
                    type_mapping[field_type.value_type]
                    if field_type.value_type in type_mapping
                    else field_type.value_type
                ),
                nullable=field.nullable,
                metadata=field.metadata,
            )
            new_fields.append(new_field)
        # Handle dictionary with large_string, large_utf8, or large_binary values
        elif isinstance(field_type, pa.DictionaryType):
            new_field = pa.field(
                name=field.name,
                type=pa.dictionary(
                    field_type.index_type,
                    type_mapping[field_type.value_type]
                    if field_type.value_type in type_mapping
                    else field_type.value_type,
                    field_type.ordered,
                ),
                metadata=field.metadata,
            )
            new_fields.append(new_field)
        else:
            new_fields.append(field)

    return pa.schema(new_fields)
fsspeckit.common.schema.dominant_timezone_per_column
dominant_timezone_per_column(
    schemas: list[Schema],
) -> dict[str, tuple[str | None, str | None]]

For each timestamp column (by name) across all schemas, detect the most frequent timezone (including None). If None and a timezone are tied, prefer the timezone. Returns a dict: {column_name: dominant_timezone}

Source code in src/fsspeckit/common/schema.py
def dominant_timezone_per_column(
    schemas: list[pa.Schema],
) -> dict[str, tuple[str | None, str | None]]:
    """
    For each timestamp column (by name) across all schemas, detect the most frequent timezone (including None).
    If None and a timezone are tied, prefer the timezone.
    Returns a dict: {column_name: dominant_timezone}
    """
    tz_counts: defaultdict[str, Counter[str | None]] = defaultdict(Counter)
    units: dict[str, str | None] = {}

    for schema in schemas:
        for field in schema:
            if pa.types.is_timestamp(field.type):
                tz = field.type.tz
                name = field.name
                tz_counts[name][tz] += 1
                # Track unit for each column (assume consistent)
                if name not in units:
                    units[name] = field.type.unit

    dominant = {}
    for name, counter in tz_counts.items():
        most_common = counter.most_common()
        if not most_common:
            continue
        top_count = most_common[0][1]
        # Find all with top_count
        top_tzs = [tz for tz, cnt in most_common if cnt == top_count]
        # If tie and one is not None, prefer not-None
        if len(top_tzs) > 1 and any(tz is not None for tz in top_tzs):
            tz = next(tz for tz in top_tzs if tz is not None)
        else:
            tz = most_common[0][0]
        dominant[name] = (units[name], tz)
    return dominant
fsspeckit.common.schema.remove_empty_columns
remove_empty_columns(table: Table) -> Table

Remove columns that are entirely empty from a PyArrow table.

Parameters:

Name Type Description Default
table Table

The PyArrow table to process.

required

Returns:

Type Description
Table

pa.Table: A new PyArrow table with empty columns removed.

Source code in src/fsspeckit/common/schema.py
def remove_empty_columns(table: pa.Table) -> pa.Table:
    """Remove columns that are entirely empty from a PyArrow table.

    Args:
        table (pa.Table): The PyArrow table to process.

    Returns:
        pa.Table: A new PyArrow table with empty columns removed.
    """
    if table.num_rows == 0:
        return table

    empty_cols = []
    for col_name in table.column_names:
        column = table.column(col_name)
        if column.null_count == table.num_rows:
            empty_cols.append(col_name)

    if not empty_cols:
        return table
    return table.drop(empty_cols)
fsspeckit.common.schema.standardize_schema_timezones
standardize_schema_timezones(
    schemas: Schema | list[Schema],
    timezone: str | None = None,
) -> Schema | list[Schema]

Standardize timezone info for all timestamp columns in a list of PyArrow schemas.

Parameters:

Name Type Description Default
schemas list of pa.Schema

List of PyArrow schemas.

required
timezone str or None

If None, remove timezone from all timestamp columns. If str, set this timezone for all timestamp columns. If "auto", use the most frequent timezone across schemas.

None

Returns:

Type Description
Schema | list[Schema]

list of pa.Schema: New schemas with standardized timezone info.

Source code in src/fsspeckit/common/schema.py
def standardize_schema_timezones(
    schemas: pa.Schema | list[pa.Schema], timezone: str | None = None
) -> pa.Schema | list[pa.Schema]:
    """
    Standardize timezone info for all timestamp columns in a list of PyArrow schemas.

    Args:
        schemas (list of pa.Schema): List of PyArrow schemas.
        timezone (str or None): If None, remove timezone from all timestamp columns.
                                If str, set this timezone for all timestamp columns.
                                If "auto", use the most frequent timezone across schemas.

    Returns:
        list of pa.Schema: New schemas with standardized timezone info.
    """
    if isinstance(schemas, pa.Schema):
        single_input = True
        schema_list: list[pa.Schema] = [schemas]
    else:
        single_input = False
        schema_list = list(schemas)
    if timezone == "auto":
        majority_schema = standardize_schema_timezones_by_majority(schema_list)
        result_list = [majority_schema for _ in schema_list]
        return majority_schema if single_input else result_list
    new_schemas = []
    for schema in schema_list:
        fields = []
        for field in schema:
            if pa.types.is_timestamp(field.type):
                fields.append(
                    pa.field(
                        field.name,
                        pa.timestamp(field.type.unit, timezone),
                        field.nullable,
                        field.metadata,
                    )
                )
            else:
                fields.append(field)
        new_schemas.append(pa.schema(fields, schema.metadata))
    return new_schemas[0] if single_input else new_schemas
fsspeckit.common.schema.standardize_schema_timezones_by_majority
standardize_schema_timezones_by_majority(
    schemas: list[Schema],
) -> Schema

For each timestamp column (by name) across all schemas, set the timezone to the most frequent (with tie-breaking). Returns a new list of schemas with updated timestamp timezones.

Source code in src/fsspeckit/common/schema.py
def standardize_schema_timezones_by_majority(
    schemas: list[pa.Schema],
) -> pa.Schema:
    """
    For each timestamp column (by name) across all schemas, set the timezone to the most frequent (with tie-breaking).
    Returns a new list of schemas with updated timestamp timezones.
    """
    dom = dominant_timezone_per_column(schemas)
    if not schemas:
        return pa.schema([])

    seen: set[str] = set()
    fields: list[pa.Field] = []
    for schema in schemas:
        for field in schema:
            if field.name in seen:
                continue
            seen.add(field.name)
            if pa.types.is_timestamp(field.type) and field.name in dom:
                unit, tz = dom[field.name]
                fields.append(
                    pa.field(
                        field.name,
                        pa.timestamp(unit, tz),
                        field.nullable,
                        field.metadata,
                    )
                )
            else:
                fields.append(field)
    return pa.schema(fields, schemas[0].metadata)
fsspeckit.common.schema.unify_schemas
unify_schemas(
    schemas: list[Schema],
    use_large_dtypes: bool = False,
    timezone: str | None = None,
    standardize_timezones: bool = True,
    verbose: bool = False,
    remove_conflicting_columns: bool = False,
) -> Schema

Unify a list of PyArrow schemas into a single schema using intelligent conflict resolution.

Parameters:

Name Type Description Default
schemas list[Schema]

List of PyArrow schemas to unify.

required
use_large_dtypes bool

If True, keep large types like large_string.

False
timezone str | None

If specified, standardize all timestamp columns to this timezone. If "auto", use the most frequent timezone across schemas. If None, remove timezone from all timestamp columns.

None
standardize_timezones bool

If True, standardize all timestamp columns to most frequent timezone.

True
verbose bool

If True, print conflict resolution details for debugging.

False
remove_conflicting_columns bool

If True, allows removal of columns with type conflicts as a fallback strategy instead of converting them. Defaults to False.

False

Returns:

Type Description
Schema

pa.Schema: A unified PyArrow schema.

Raises:

Type Description
ValueError

If no schemas are provided.

Source code in src/fsspeckit/common/schema.py
def unify_schemas(
    schemas: list[pa.Schema],
    use_large_dtypes: bool = False,
    timezone: str | None = None,
    standardize_timezones: bool = True,
    verbose: bool = False,
    remove_conflicting_columns: bool = False,
) -> pa.Schema:
    """
    Unify a list of PyArrow schemas into a single schema using intelligent conflict resolution.

    Args:
        schemas (list[pa.Schema]): List of PyArrow schemas to unify.
        use_large_dtypes (bool): If True, keep large types like large_string.
        timezone (str | None): If specified, standardize all timestamp columns to this timezone.
            If "auto", use the most frequent timezone across schemas.
            If None, remove timezone from all timestamp columns.
        standardize_timezones (bool): If True, standardize all timestamp columns to most frequent timezone.
        verbose (bool): If True, print conflict resolution details for debugging.
        remove_conflicting_columns (bool): If True, allows removal of columns with type conflicts as a fallback
            strategy instead of converting them. Defaults to False.

    Returns:
        pa.Schema: A unified PyArrow schema.

    Raises:
        ValueError: If no schemas are provided.
    """
    if not schemas:
        raise ValueError("At least one schema must be provided for unification")

    # Early exit for single schema
    unique_schemas = _unique_schemas(schemas)
    if len(unique_schemas) == 1:
        result_schema = unique_schemas[0]
        if standardize_timezones:
            result_schema = standardize_schema_timezones([result_schema], timezone)[0]
        return (
            result_schema
            if use_large_dtypes
            else convert_large_types_to_normal(result_schema)
        )

    # Step 1: Find and resolve conflicts first
    conflicts = _find_conflicting_fields(unique_schemas)
    if conflicts and verbose:
        _log_conflict_summary(conflicts, verbose)

    if conflicts:
        # Normalize schemas using intelligent promotion rules
        unique_schemas = _normalize_schema_types(unique_schemas, conflicts)

    # Step 2: Attempt unification with conflict-resolved schemas
    try:
        unified_schema = pa.unify_schemas(unique_schemas, promote_options="permissive")

        # Step 3: Apply timezone standardization to the unified result
        if standardize_timezones:
            unified_schema = standardize_schema_timezones([unified_schema], timezone)[0]

        return (
            unified_schema
            if use_large_dtypes
            else convert_large_types_to_normal(unified_schema)
        )

    except (pa.ArrowInvalid, pa.ArrowTypeError) as e:
        # Step 4: Intelligent fallback strategies
        if verbose:
            logger.debug("Primary unification failed: %s", e)
            logger.debug("Attempting fallback strategies...")

        # Fallback 1: Try aggressive string conversion for remaining conflicts
        try:
            fallback_schema = _aggressive_fallback_unification(unique_schemas)
            if standardize_timezones:
                fallback_schema = standardize_schema_timezones(
                    [fallback_schema], timezone
                )[0]
            if verbose:
                logger.debug("✓ Aggressive fallback succeeded")
            return (
                fallback_schema
                if use_large_dtypes
                else convert_large_types_to_normal(fallback_schema)
            )

        except (pa.ArrowInvalid, pa.ArrowTypeError, ValueError) as e:
            if verbose:
                logger.debug("✗ Aggressive fallback failed: %s", str(e), exc_info=True)

        # Fallback 2: Remove conflicting fields (if enabled)
        if remove_conflicting_columns:
            try:
                non_conflicting_schema = _remove_conflicting_fields(unique_schemas)
                if standardize_timezones:
                    non_conflicting_schema = standardize_schema_timezones(
                        [non_conflicting_schema], timezone
                    )[0]
                if verbose:
                    logger.debug("✓ Remove conflicting fields fallback succeeded")
                return (
                    non_conflicting_schema
                    if use_large_dtypes
                    else convert_large_types_to_normal(non_conflicting_schema)
                )

            except (pa.ArrowInvalid, pa.ArrowTypeError, ValueError) as e:
                if verbose:
                    logger.debug("✗ Remove conflicting fields fallback failed: %s", str(e), exc_info=True)

        # Fallback 3: Remove problematic fields that can't be unified
        try:
            minimal_schema = _remove_problematic_fields(unique_schemas)
            if standardize_timezones:
                minimal_schema = standardize_schema_timezones(
                    [minimal_schema], timezone
                )[0]
            if verbose:
                logger.debug("✓ Minimal schema (removed problematic fields) succeeded")
            return (
                minimal_schema
                if use_large_dtypes
                else convert_large_types_to_normal(minimal_schema)
            )

        except (pa.ArrowInvalid, pa.ArrowTypeError, ValueError) as e:
            if verbose:
                logger.debug("✗ Minimal schema fallback failed: %s", str(e), exc_info=True)

        # Fallback 4: Return first schema as last resort
        if verbose:
            logger.debug("✗ All fallback strategies failed, returning first schema")

        first_schema = unique_schemas[0]
        if standardize_timezones:
            first_schema = standardize_schema_timezones([first_schema], timezone)[0]
        return (
            first_schema
            if use_large_dtypes
            else convert_large_types_to_normal(first_schema)
        )

fsspeckit.common.security

Security validation helpers for fsspeckit.

This module provides basic validation for paths, codecs, and credential scrubbing to prevent common security issues like path traversal and credential leakage in logs.

Functions
fsspeckit.common.security.safe_format_error
safe_format_error(
    operation: str,
    path: str | None = None,
    error: BaseException | None = None,
    **context: Any,
) -> str

Format an error message with credentials scrubbed.

Parameters:

Name Type Description Default
operation str

Description of the operation that failed.

required
path str | None

Optional path involved in the operation.

None
error BaseException | None

Optional exception that occurred.

None
**context Any

Additional context key-value pairs.

{}

Returns:

Type Description
str

A formatted, credential-scrubbed error message.

Source code in src/fsspeckit/common/security.py
def safe_format_error(
    operation: str,
    path: str | None = None,
    error: BaseException | None = None,
    **context: Any,
) -> str:
    """Format an error message with credentials scrubbed.

    Args:
        operation: Description of the operation that failed.
        path: Optional path involved in the operation.
        error: Optional exception that occurred.
        **context: Additional context key-value pairs.

    Returns:
        A formatted, credential-scrubbed error message.
    """
    parts = [f"Failed to {operation}"]

    if path:
        parts.append(f"at '{path}'")

    if error:
        parts.append(f": {scrub_exception(error)}")

    if context:
        context_str = ", ".join(f"{k}={scrub_credentials(str(v))}" for k, v in context.items())
        parts.append(f" ({context_str})")

    return " ".join(parts)
fsspeckit.common.security.scrub_credentials
scrub_credentials(message: str) -> str

Remove or mask credential-like values from a string.

This is intended for use before logging error messages that might contain sensitive information like access keys or tokens.

Parameters:

Name Type Description Default
message str

The string to scrub.

required

Returns:

Type Description
str

The string with credential-like values replaced with [REDACTED].

Examples:

>>> scrub_credentials("Error: access_key_id=AKIAIOSFODNN7EXAMPLE")
'Error: access_key_id=[REDACTED]'
>>> scrub_credentials("Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...")
'[REDACTED]'
Source code in src/fsspeckit/common/security.py
def scrub_credentials(message: str) -> str:
    """Remove or mask credential-like values from a string.

    This is intended for use before logging error messages that might
    contain sensitive information like access keys or tokens.

    Args:
        message: The string to scrub.

    Returns:
        The string with credential-like values replaced with [REDACTED].

    Examples:
        >>> scrub_credentials("Error: access_key_id=AKIAIOSFODNN7EXAMPLE")
        'Error: access_key_id=[REDACTED]'

        >>> scrub_credentials("Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...")
        '[REDACTED]'
    """
    if not message:
        return message

    result = message

    for pattern in _CREDENTIAL_PATTERNS:
        # Replace matched groups with [REDACTED]
        def redact_match(match: re.Match) -> str:
            groups = match.groups()
            if len(groups) >= 2:
                # Pattern with key=value format - keep the key, redact the value
                return match.group(0).replace(groups[-1], "[REDACTED]")
            else:
                # Single match - redact entire thing
                return "[REDACTED]"

        result = pattern.sub(redact_match, result)

    return result
fsspeckit.common.security.scrub_exception
scrub_exception(exc: BaseException) -> str

Scrub credentials from an exception's string representation.

Parameters:

Name Type Description Default
exc BaseException

The exception to scrub.

required

Returns:

Type Description
str

A scrubbed string representation of the exception.

Source code in src/fsspeckit/common/security.py
def scrub_exception(exc: BaseException) -> str:
    """Scrub credentials from an exception's string representation.

    Args:
        exc: The exception to scrub.

    Returns:
        A scrubbed string representation of the exception.
    """
    return scrub_credentials(str(exc))
fsspeckit.common.security.validate_columns
validate_columns(
    columns: list[str] | None, valid_columns: list[str]
) -> list[str] | None

Validate that requested columns exist in the schema.

This is a helper to prevent column injection in SQL-like operations.

Parameters:

Name Type Description Default
columns list[str] | None

List of column names to validate, or None.

required
valid_columns list[str]

List of valid column names from the schema.

required

Returns:

Type Description
list[str] | None

The validated columns list, or None if columns was None.

Raises:

Type Description
ValueError

If any column is not in the valid set.

Source code in src/fsspeckit/common/security.py
def validate_columns(columns: list[str] | None, valid_columns: list[str]) -> list[str] | None:
    """Validate that requested columns exist in the schema.

    This is a helper to prevent column injection in SQL-like operations.

    Args:
        columns: List of column names to validate, or None.
        valid_columns: List of valid column names from the schema.

    Returns:
        The validated columns list, or None if columns was None.

    Raises:
        ValueError: If any column is not in the valid set.
    """
    if columns is None:
        return None

    valid_set = set(valid_columns)
    invalid = [col for col in columns if col not in valid_set]

    if invalid:
        raise ValueError(
            f"Invalid column(s): {', '.join(invalid)}. "
            f"Valid columns are: {', '.join(sorted(valid_set))}"
        )

    return columns
fsspeckit.common.security.validate_compression_codec
validate_compression_codec(codec: str) -> str

Validate that a compression codec is in the allowed set.

This prevents injection of arbitrary values into SQL queries or filesystem operations that accept codec parameters.

Parameters:

Name Type Description Default
codec str

The compression codec name to validate.

required

Returns:

Type Description
str

The validated codec name (lowercased).

Raises:

Type Description
ValueError

If the codec is not in the allowed set.

Examples:

>>> validate_compression_codec("snappy")
'snappy'
>>> validate_compression_codec("GZIP")
'gzip'
>>> validate_compression_codec("malicious; DROP TABLE")
ValueError: Invalid compression codec
Source code in src/fsspeckit/common/security.py
def validate_compression_codec(codec: str) -> str:
    """Validate that a compression codec is in the allowed set.

    This prevents injection of arbitrary values into SQL queries or
    filesystem operations that accept codec parameters.

    Args:
        codec: The compression codec name to validate.

    Returns:
        The validated codec name (lowercased).

    Raises:
        ValueError: If the codec is not in the allowed set.

    Examples:
        >>> validate_compression_codec("snappy")
        'snappy'

        >>> validate_compression_codec("GZIP")
        'gzip'

        >>> validate_compression_codec("malicious; DROP TABLE")
        ValueError: Invalid compression codec
    """
    if not codec or not isinstance(codec, str):
        raise ValueError("Compression codec must be a non-empty string")

    normalized = codec.lower().strip()

    if normalized not in VALID_COMPRESSION_CODECS:
        valid_list = ", ".join(sorted(VALID_COMPRESSION_CODECS - {"none"}))
        raise ValueError(
            f"Invalid compression codec: '{codec}'. "
            f"Must be one of: {valid_list}"
        )

    return normalized
fsspeckit.common.security.validate_path
validate_path(
    path: str, base_dir: str | None = None
) -> str

Validate a filesystem path for security issues.

Checks for: - Embedded null bytes and control characters - Path traversal attempts (../ sequences escaping base_dir) - Empty or whitespace-only paths

Parameters:

Name Type Description Default
path str

The path to validate.

required
base_dir str | None

Optional base directory. If provided, the path must resolve to a location within this directory (prevents path traversal).

None

Returns:

Type Description
str

The validated path (unchanged if valid).

Raises:

Type Description
ValueError

If the path contains forbidden characters, is empty, or escapes the base directory.

Examples:

>>> validate_path("/data/file.parquet")
'/data/file.parquet'
>>> validate_path("../../../etc/passwd", base_dir="/data")
ValueError: Path escapes base directory
>>> validate_path("file\x00.parquet")
ValueError: Path contains forbidden characters
Source code in src/fsspeckit/common/security.py
def validate_path(path: str, base_dir: str | None = None) -> str:
    """Validate a filesystem path for security issues.

    Checks for:
    - Embedded null bytes and control characters
    - Path traversal attempts (../ sequences escaping base_dir)
    - Empty or whitespace-only paths

    Args:
        path: The path to validate.
        base_dir: Optional base directory. If provided, the path must resolve
            to a location within this directory (prevents path traversal).

    Returns:
        The validated path (unchanged if valid).

    Raises:
        ValueError: If the path contains forbidden characters, is empty,
            or escapes the base directory.

    Examples:
        >>> validate_path("/data/file.parquet")
        '/data/file.parquet'

        >>> validate_path("../../../etc/passwd", base_dir="/data")
        ValueError: Path escapes base directory

        >>> validate_path("file\\x00.parquet")
        ValueError: Path contains forbidden characters
    """
    if not path or not path.strip():
        raise ValueError("Path cannot be empty or whitespace-only")

    # Check for forbidden control characters
    for char in path:
        if char in _FORBIDDEN_PATH_CHARS:
            raise ValueError(
                f"Path contains forbidden control character: {repr(char)}"
            )

    # Check for path traversal when base_dir is specified
    if base_dir is not None:
        import os

        # Normalize both paths for comparison
        base_resolved = os.path.normpath(os.path.abspath(base_dir))

        # Handle relative paths by joining with base
        if not os.path.isabs(path):
            full_path = os.path.join(base_dir, path)
        else:
            full_path = path

        path_resolved = os.path.normpath(os.path.abspath(full_path))

        # Check if resolved path starts with base directory
        if not path_resolved.startswith(base_resolved + os.sep) and path_resolved != base_resolved:
            raise ValueError(
                f"Path '{path}' escapes base directory '{base_dir}'"
            )

    return path

fsspeckit.common.types

Type conversion and data transformation utilities.

Functions
fsspeckit.common.types.dict_to_dataframe
dict_to_dataframe(
    data: Union[dict, list[dict]],
    unique: Union[bool, list[str], str] = False,
) -> Any

Convert a dictionary or list of dictionaries to a Polars DataFrame.

Handles various input formats: - Single dict with list values → DataFrame rows - Single dict with scalar values → Single row DataFrame - List of dicts with scalar values → Multi-row DataFrame - List of dicts with list values → DataFrame with list columns

Parameters:

Name Type Description Default
data Union[dict, list[dict]]

Dictionary or list of dictionaries to convert.

required
unique Union[bool, list[str], str]

If True, remove duplicate rows. Can also specify columns.

False

Returns:

Type Description
Any

Polars DataFrame containing the converted data.

Examples:

>>> # Single dict with list values
>>> data = {'a': [1, 2, 3], 'b': [4, 5, 6]}
>>> dict_to_dataframe(data)
shape: (3, 2)
┌─────┬─────┐
│ a   ┆ b   │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1   ┆ 4   │
│ 2   ┆ 5   │
│ 3   ┆ 6   │
└─────┴─────┘
>>> # Single dict with scalar values
>>> data = {'a': 1, 'b': 2}
>>> dict_to_dataframe(data)
shape: (1, 2)
┌─────┬─────┐
│ a   ┆ b   │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1   ┆ 2   │
└─────┴─────┘
>>> # List of dicts with scalar values
>>> data = [{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]
>>> dict_to_dataframe(data)
shape: (2, 2)
┌─────┬─────┐
│ a   ┆ b   │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1   ┆ 2   │
│ 3   ┆ 4   │
└─────┴─────┘
Source code in src/fsspeckit/common/types.py
def dict_to_dataframe(
    data: Union[dict, list[dict]], unique: Union[bool, list[str], str] = False
) -> Any:
    """Convert a dictionary or list of dictionaries to a Polars DataFrame.

    Handles various input formats:
    - Single dict with list values → DataFrame rows
    - Single dict with scalar values → Single row DataFrame
    - List of dicts with scalar values → Multi-row DataFrame
    - List of dicts with list values → DataFrame with list columns

    Args:
        data: Dictionary or list of dictionaries to convert.
        unique: If True, remove duplicate rows. Can also specify columns.

    Returns:
        Polars DataFrame containing the converted data.

    Examples:
        >>> # Single dict with list values
        >>> data = {'a': [1, 2, 3], 'b': [4, 5, 6]}
        >>> dict_to_dataframe(data)
        shape: (3, 2)
        ┌─────┬─────┐
        │ a   ┆ b   │
        │ --- ┆ --- │
        │ i64 ┆ i64 │
        ╞═════╪═════╡
        │ 1   ┆ 4   │
        │ 2   ┆ 5   │
        │ 3   ┆ 6   │
        └─────┴─────┘

        >>> # Single dict with scalar values
        >>> data = {'a': 1, 'b': 2}
        >>> dict_to_dataframe(data)
        shape: (1, 2)
        ┌─────┬─────┐
        │ a   ┆ b   │
        │ --- ┆ --- │
        │ i64 ┆ i64 │
        ╞═════╪═════╡
        │ 1   ┆ 2   │
        └─────┴─────┘

        >>> # List of dicts with scalar values
        >>> data = [{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]
        >>> dict_to_dataframe(data)
        shape: (2, 2)
        ┌─────┬─────┐
        │ a   ┆ b   │
        │ --- ┆ --- │
        │ i64 ┆ i64 │
        ╞═════╪═════╡
        │ 1   ┆ 2   │
        │ 3   ┆ 4   │
        └─────┴─────┘
    """
    from fsspeckit.common.optional import _import_polars

    pl = _import_polars()

    if isinstance(data, list):
        # If it's a single-element list, just use the first element
        if len(data) == 1:
            data = data[0]
        # If it's a list of dicts
        else:
            first_item = data[0]
            # Check if the dict values are lists/tuples
            if any(isinstance(v, (list, tuple)) for v in first_item.values()):
                # Each dict becomes a row with list/tuple values
                data = pl.DataFrame(data)
            else:
                # If values are scalars, convert list of dicts to DataFrame
                data = pl.DataFrame(data)

            if unique:
                data = data.unique(
                    subset=None if not isinstance(unique, (str, list)) else unique,
                    maintain_order=True,
                )
            return data

    # If it's a single dict
    if isinstance(data, dict):
        # Check if values are lists/tuples
        if any(isinstance(v, (list, tuple)) for v in data.values()):
            # Get the length of any list value (assuming all lists have same length)
            length = len(next(v for v in data.values() if isinstance(v, (list, tuple))))
            # Convert to DataFrame where each list element becomes a row
            data = pl.DataFrame(
                {
                    k: v if isinstance(v, (list, tuple)) else [v] * length
                    for k, v in data.items()
                }
            )
        else:
            # If values are scalars, wrap them in a list to create a single row
            data = pl.DataFrame({k: [v] for k, v in data.items()})

        if unique:
            data = data.unique(
                subset=None if not isinstance(unique, (str, list)) else unique,
                maintain_order=True,
            )
        return data

    raise ValueError("Input must be a dictionary or list of dictionaries")
fsspeckit.common.types.to_pyarrow_table
to_pyarrow_table(
    data: Union[Any, dict, list[Any]],
    concat: bool = False,
    unique: Union[bool, list[str], str] = False,
) -> Any

Convert various data formats to PyArrow Table.

Handles conversion from Polars DataFrames, Pandas DataFrames, dictionaries, and lists of these types to PyArrow Tables.

Parameters:

Name Type Description Default
data Union[Any, dict, list[Any]]

Input data to convert.

required
concat bool

Whether to concatenate multiple inputs into single table.

False
unique Union[bool, list[str], str]

Whether to remove duplicates. Can specify columns.

False

Returns:

Type Description
Any

PyArrow Table containing the converted data.

Example
1
2
3
4
5
df = pl.DataFrame({"a": [1, 2], "b": [3, 4]})
table = to_pyarrow_table(df)
print(table.schema)
# a: int64
# b: int64
Source code in src/fsspeckit/common/types.py
def to_pyarrow_table(
    data: Union[
        Any,  # pl.DataFrame, pl.LazyFrame, pd.DataFrame
        dict,
        list[Any],  # list of DataFrames or dicts
    ],
    concat: bool = False,
    unique: Union[bool, list[str], str] = False,
) -> Any:
    """Convert various data formats to PyArrow Table.

    Handles conversion from Polars DataFrames, Pandas DataFrames,
    dictionaries, and lists of these types to PyArrow Tables.

    Args:
        data: Input data to convert.
        concat: Whether to concatenate multiple inputs into single table.
        unique: Whether to remove duplicates. Can specify columns.

    Returns:
        PyArrow Table containing the converted data.

    Example:
        ```python
        df = pl.DataFrame({"a": [1, 2], "b": [3, 4]})
        table = to_pyarrow_table(df)
        print(table.schema)
        # a: int64
        # b: int64
        ```
    """
    from fsspeckit.common.optional import (
        _import_pandas,
        _import_polars,
        _import_pyarrow,
    )
    from fsspeckit.datasets.pyarrow import convert_large_types_to_normal

    pl = _import_polars()
    pd = _import_pandas()
    pa = _import_pyarrow()

    # Convert dict to DataFrame first
    if isinstance(data, dict):
        data = dict_to_dataframe(data)
    if isinstance(data, list):
        if isinstance(data[0], dict):
            data = dict_to_dataframe(data, unique=unique)

    # Ensure data is a list for uniform processing
    if not isinstance(data, list):
        data = [data]

    # Collect lazy frames
    if isinstance(data[0], pl.LazyFrame):
        data = [dd.collect() for dd in data]

    # Convert based on the first item's type
    if isinstance(data[0], pl.DataFrame):
        if concat:
            data = pl.concat(data, how="diagonal_relaxed")
            if unique:
                data = data.unique(
                    subset=None if not isinstance(unique, (str, list)) else unique,
                    maintain_order=True,
                )
            data = data.to_arrow()
            data = data.cast(convert_large_types_to_normal(data.schema))
        else:
            data = [dd.to_arrow() for dd in data]
            data = [dd.cast(convert_large_types_to_normal(dd.schema)) for dd in data]

    elif isinstance(data[0], pd.DataFrame):
        data = [pa.Table.from_pandas(dd, preserve_index=False) for dd in data]
        if concat:
            data = pa.concat_tables(data, promote_options="permissive")
            if unique:
                data = (
                    pl.from_arrow(data)
                    .unique(
                        subset=None if not isinstance(unique, (str, list)) else unique,
                        maintain_order=True,
                    )
                    .to_arrow()
                )
                data = data.cast(convert_large_types_to_normal(data.schema))

    elif isinstance(data[0], (pa.RecordBatch, Generator)):
        if concat:
            data = pa.Table.from_batches(data)
            if unique:
                data = (
                    pl.from_arrow(data)
                    .unique(
                        subset=None if not isinstance(unique, (str, list)) else unique,
                        maintain_order=True,
                    )
                    .to_arrow()
                )
                data = data.cast(convert_large_types_to_normal(data.schema))
        else:
            data = [pa.Table.from_batches([dd]) for dd in data]

    return data