Skip to content

fsspeckit.core.ext API Documentation

This module provides extended functionalities for fsspec.AbstractFileSystem, including methods for reading and writing various file formats (JSON, CSV, Parquet) with advanced options like batch processing, parallelization, and data type optimization. It also includes functions for creating PyArrow datasets.


path_to_glob()

Convert a path to a glob pattern for file matching.

Intelligently converts paths to glob patterns that match files of the specified format, handling various directory and wildcard patterns.

Parameter Type Description
path str Base path to convert. Can include wildcards (* or **). Examples: "data/", "data/.json", "data/*"
format str | None File format to match (without dot). If None, inferred from path. Examples: "json", "csv", "parquet"
Returns Type Description
str str Glob pattern that matches files of specified format. Examples: "data/**/.json", "data/.csv"

Example:

# Basic directory
path_to_glob("data", "json")
# 'data/**/*.json'

# With wildcards
path_to_glob("data/**", "csv")
# 'data/**/*.csv'

# Format inference
path_to_glob("data/file.parquet")
# 'data/file.parquet'

read_json_file()

Read a single JSON file from any filesystem.

A public wrapper around _read_json_file providing a clean interface for reading individual JSON files.

Parameter Type Description
self AbstractFileSystem Filesystem instance to use for reading
path str Path to JSON file to read
include_file_path bool Whether to return dict with filepath as key
jsonlines bool Whether to read as JSON Lines format
Returns Type Description
dict or list[dict] dict or list[dict] Parsed JSON data. For regular JSON, returns a dict. For JSON Lines, returns a list of dicts. If include_file_path=True, returns {filepath: data}.

Example:

from fsspec.implementations.local import LocalFileSystem

fs = LocalFileSystem()
# Read regular JSON
data = fs.read_json_file("config.json")
print(data["setting"])
# 'value'

# Read JSON Lines with filepath
data = fs.read_json_file(
    "logs.jsonl",
    include_file_path=True,
    jsonlines=True
)
print(list(data.keys())[0])
# 'logs.jsonl'

read_json()

Read JSON data from one or more files with powerful options.

Provides a flexible interface for reading JSON data with support for:

  • Single file or multiple files
  • Regular JSON or JSON Lines format
  • Batch processing for large datasets
  • Parallel processing
  • DataFrame conversion
  • File path tracking
Parameter Type Description
path str or list[str] Path(s) to JSON file(s). Can be: - Single path string (globs supported) - List of path strings
batch_size int | None If set, enables batch reading with this many files per batch
include_file_path bool Include source filepath in output
jsonlines bool Whether to read as JSON Lines format
as_dataframe bool Convert output to Polars DataFrame(s)
concat bool Combine multiple files/batches into single result
use_threads bool Enable parallel file reading
verbose bool Print progress information
opt_dtypes bool Optimize DataFrame dtypes for performance
**kwargs Any Additional arguments passed to DataFrame conversion
Returns Type Description
dict or list[dict] or pl.DataFrame or list[pl.DataFrame] or Generator Various types depending on arguments: - dict: Single JSON file as dictionary - list[dict]: Multiple JSON files as list of dictionaries - pl.DataFrame: Single or concatenated DataFrame - list[pl.DataFrame]: List of Dataframes (if concat=False) - Generator: If batch_size set, yields batches of above types

Example:

from fsspec.implementations.local import LocalFileSystem

fs = LocalFileSystem()
# Read all JSON files in directory
df = fs.read_json(
    "data/*.json",
    as_dataframe=True,
    concat=True
)
print(df.shape)
# (1000, 5)  # Combined data from all files

# Batch process large dataset
for batch_df in fs.read_json(
    "logs/*.jsonl",
    batch_size=100,
    jsonlines=True,
    include_file_path=True
):
    print(f"Processing {len(batch_df)} records")

# Parallel read with custom options
dfs = fs.read_json(
    ["file1.json", "file2.json"],
    use_threads=True,
    concat=False,
    verbose=True
)
print(f"Read {len(dfs)} files")

read_csv_file()

Read a single CSV file from any filesystem.

Internal function that handles reading individual CSV files and optionally adds the source filepath as a column.

Parameter Type Description
self AbstractFileSystem Filesystem instance to use for reading
path str Path to CSV file
include_file_path bool Add source filepath as a column
opt_dtypes bool Optimize DataFrame dtypes
**kwargs Any Additional arguments passed to pl.read_csv()
Returns Type Description
pl.DataFrame pl.DataFrame DataFrame containing CSV data

Example:

from fsspec.implementations.local import LocalFileSystem

fs = LocalFileSystem()
# This example assumes _read_csv_file is an internal method or needs to be called differently.
# For public use, you would typically use fs.read_csv().
# df = fs.read_csv_file(
#     "data.csv",
#     include_file_path=True,
#     delimiter="|"
# )
# print("file_path" in df.columns)
# True

read_csv()

Read CSV data from one or more files with powerful options.

Provides a flexible interface for reading CSV files with support for:

  • Single file or multiple files
  • Batch processing for large datasets
  • Parallel
  • File path tracking
  • Polars DataFrame output
Parameter Type Description
path str or list[str] Path(s) to CSV file(s). Can be: - Single path string (globs supported) - List of path strings
batch_size int | None If set, enables batch reading with this many files per batch
include_file_path bool Add source filepath as a column
concat bool Combine multiple files/batches into single DataFrame
use_threads bool Enable parallel file reading
verbose bool Print progress information
**kwargs Any Additional arguments passed to pl.read_csv()
Returns Type Description
pl.DataFrame or list[pl.DataFrame] or Generator Various types depending on arguments: - pl.DataFrame: Single or concatenated DataFrame - list[pl.DataFrame]: List of DataFrames (if concat=False) - Generator: If batch_size set, yields batches of above types

Example:

from fsspec.implementations.local import LocalFileSystem

fs = LocalFileSystem()
# Read all CSVs in directory
df = fs.read_csv(
    "data/*.csv",
    include_file_path=True
)
print(df.columns)
# ['file_path', 'col1', 'col2', ...]

# Batch process large dataset
for batch_df in fs.read_csv(
    "logs/*.csv",
    batch_size=100,
    use_threads=True,
    verbose=True
):
    print(f"Processing {len(batch_df)} rows")

# Multiple files without concatenation
dfs = fs.read_csv(
    ["file1.csv", "file2.csv"],
    concat=False,
    use_threads=True
)
print(f"Read {len(dfs)} files")

read_parquet_file()

Read a single Parquet file from any filesystem.

Internal function that handles reading individual Parquet files and optionally adds the source filepath as a column.

Parameter Type Description
self AbstractFileSystem Filesystem instance to use for reading
path str Path to Parquet file
include_file_path bool Add source filepath as a column
opt_dtypes bool Optimize DataFrame dtypes
**kwargs Any Additional arguments passed to pq.read_table()
Returns Type Description
pa.Table pa.Table PyArrow Table containing Parquet data

Example:

from fsspec.implementations.local import LocalFileSystem

fs = LocalFileSystem()
# This example assumes _read_parquet_file is an internal method or needs to be called differently.
# For public use, you would typically use fs.read_parquet().
# table = fs.read_parquet_file(
#     "data.parquet",
#     include_file_path=True,
#     use_threads=True
# )
# print("file_path" in table.column_names)
# True

read_parquet()

Read Parquet data with advanced features and optimizations.

Provides a high-performance interface for reading Parquet files with support for:

  • Single file or multiple files
  • Batch processing for large datasets
  • Parallel processing
  • File path tracking
  • Automatic concatenation
  • PyArrow Table output

The function automatically uses optimal reading strategies:

  • Direct dataset reading for simple cases
  • Parallel processing for multiple files
  • Batched reading for memory efficiency
Parameter Type Description
path str or list[str] Path(s) to Parquet file(s). Can be: - Single path string (globs supported) - List of path strings - Directory containing _metadata file
batch_size int | None If set, enables batch reading with this many files per batch
include_file_path bool Add source filepath as a column
concat bool Combine multiple files/batches into single Table
use_threads bool Enable parallel file reading
verbose bool Print progress information
opt_dtypes bool Optimize Table dtypes for performance
**kwargs Any Additional arguments passed to pq.read_table()
Returns Type Description
pa.Table or list[pa.Table] or Generator Various types depending on arguments: - pa.Table: Single or concatenated Table - list[pa.Table]: List of Tables (if concat=False) - Generator: If batch_size set, yields batches of above types

Example:

from fsspec.implementations.local import LocalFileSystem

fs = LocalFileSystem()
# Read all Parquet files in directory
table = fs.read_parquet(
    "data/*.parquet",
    include_file_path=True
)
print(table.column_names)
# ['file_path', 'col1', 'col2', ...]

# Batch process large dataset
for batch in fs.read_parquet(
    "data/*.parquet",
    batch_size=100,
    use_threads=True
):
    print(f"Processing {batch.num_rows} rows")

# Read from directory with metadata
table = fs.read_parquet(
    "data/",  # Contains _metadata
    use_threads=True
)
print(f"Total rows: {table.num_rows}")

read_files()

Universal interface for reading data files of any supported format.

A unified API that automatically delegates to the appropriate reading function based on file format, while preserving all advanced features like:

  • Batch processing
  • Parallel reading
  • File path tracking
  • Format-specific optimizations
Parameter Type Description
path str or list[str] Path(s) to data file(s). Can be: - Single path string (globs supported) - List of path strings
format str File format to read. Supported values: - "json": Regular JSON or JSON Lines - "csv": CSV files - "parquet": Parquet files
batch_size int | None If set, enables batch reading with this many files per batch
include_file_path bool Add source filepath as column/field
concat bool Combine multiple files/batches into single result
jsonlines bool For JSON format, whether to read as JSON Lines
use_threads bool Enable parallel file reading
verbose bool Print progress information
opt_dtypes bool Optimize DataFrame/Arrow Table dtypes for performance
**kwargs Any Additional format-specific arguments
Returns Type Description
pl.DataFrame or pa.Table or list[pl.DataFrame] or list[pa.Table] or Generator Various types depending on format and arguments: - pl.DataFrame: For CSV and optionally JSON - pa.Table: For Parquet - list[pl.DataFrame or pa.Table]: Without concatenation - Generator: If batch_size set, yields batches

Example:

from fsspec.implementations.local import LocalFileSystem

fs = LocalFileSystem()
# Read CSV files
df = fs.read_files(
    "data/*.csv",
    format="csv",
    include_file_path=True
)
print(type(df))
# <class 'polars.DataFrame'>

# Batch process Parquet files
for batch in fs.read_files(
    "data/*.parquet",
    format="parquet",
    batch_size=100,
    use_threads=True
):
    print(f"Batch type: {type(batch)}")

# Read JSON Lines
df = fs.read_files(
    "logs/*.jsonl",
    format="json",
    jsonlines=True,
    concat=True
)
print(df.columns)

pyarrow_dataset()

Create a PyArrow dataset from files in any supported format.

Creates a dataset that provides optimized reading and querying capabilities including:

  • Schema inference and enforcement
  • Partition discovery and pruning
  • Predicate pushdown
  • Column projection
Parameter Type Description
path str Base path to dataset files
format str File format. Currently supports: - "parquet" (default) - "csv" - "json" (experimental)
schema pa.Schema | None Optional schema to enforce. If None, inferred from data.
partitioning str or list[str] or pds.Partitioning How the dataset is partitioned. Can be: - str: Single partition field - list[str]: Multiple partition fields - pds.Partitioning: Custom partitioning scheme
**kwargs Any Additional arguments for dataset creation
Returns Type Description
pds.Dataset pds.Dataset PyArrow dataset instance

Example:

from fsspec.implementations.local import LocalFileSystem

fs = LocalFileSystem()
# Simple Parquet dataset
ds = fs.pyarrow_dataset("data/")
print(ds.schema)

# Partitioned dataset
ds = fs.pyarrow_dataset(
    "events/",
    partitioning=["year", "month"]
)
# Query with partition pruning
table = ds.to_table(
    filter=(ds.field("year") == 2024)
)

# CSV with schema
ds = fs.pyarrow_dataset(
    "logs/",
    format="csv",
    schema=pa.schema([
        ("timestamp", pa.timestamp("s")),
        ("level", pa.string()),
        ("message", pa.string())
    ])
)

pyarrow_parquet_dataset()

Create a PyArrow dataset optimized for Parquet files.

Creates a dataset specifically for Parquet data, automatically handling _metadata files for optimized reading.

This function is particularly useful for:

  • Datasets with existing _metadata files
  • Multi-file datasets that should be treated as one
  • Partitioned Parquet datasets
Parameter Type Description
path str Path to dataset directory or _metadata file
schema pa.Schema | None Optional schema to enforce. If None, inferred from data.
partitioning str or list[str] or pds.Partitioning How the dataset is partitioned. Can be: - str: Single partition field - list[str]: Multiple partition fields - pds.Partitioning: Custom partitioning scheme
**kwargs Any Additional dataset arguments
Returns Type Description
pds.Dataset pds.Dataset PyArrow dataset instance

Example:

from fsspec.implementations.local import LocalFileSystem

fs = LocalFileSystem()
# Dataset with _metadata
ds = fs.pyarrow_parquet_dataset("data/_metadata")
print(ds.files)  # Shows all data files

# Partitioned dataset directory
ds = fs.pyarrow_parquet_dataset(
    "sales/",
    partitioning=["year", "region"]
)
# Query with partition pruning
table = ds.to_table(
    filter=(
        (ds.field("year") == 2024) &
        (ds.field("region") == "EMEA")
    )
)