Skip to content

API Guide

This guide provides a comprehensive overview of the fsspeckit public API and how to use its main components.

Core Filesystem Factory

filesystem()

The main entry point for creating configured filesystems:

from fsspeckit import filesystem

# Basic local filesystem
fs = filesystem("file")

# S3 with caching
fs = filesystem("s3://my-bucket/", cached=True)

# With storage options
fs = filesystem("s3", storage_options={"region": "us-west-2"})

# With base filesystem (DirFileSystem hierarchy)
fs = filesystem("/data", dirfs=True, base_fs=parent_fs)

Parameters: - protocol_or_path - Protocol (e.g., 's3', 'gs') or path - storage_options - Dict or StorageOptions object - cached - Enable caching (default: False) - cache_storage - Cache directory location - verbose - Log cache operations (default: False) - dirfs - Wrap in DirFileSystem (default: True) - base_fs - Parent DirFileSystem for hierarchy - use_listings_cache - Use listings cache (default: True) - skip_instance_cache - Skip instance cache (default: False) - **kwargs - Protocol-specific options

Storage Options Classes

Storage options provide structured configuration for different providers:

from fsspeckit.storage_options import (
    AwsStorageOptions,
    GcsStorageOptions,
    AzureStorageOptions,
    GitHubStorageOptions,
    GitLabStorageOptions,
    LocalStorageOptions
)

# AWS S3
aws = AwsStorageOptions(
    region="us-east-1",
    access_key_id="key",
    secret_access_key="secret"
)
fs = aws.to_filesystem()

# Google Cloud Storage
gcs = GcsStorageOptions(
    project="my-project",
    token="path/to/service-account.json"
)
fs = gcs.to_filesystem()

# Azure
azure = AzureStorageOptions(
    account_name="myaccount",
    account_key="key..."
)
fs = azure.to_filesystem()

# GitHub Repository
github = GitHubStorageOptions(
    org="microsoft",
    repo="vscode",
    token="ghp_xxx"
)
fs = github.to_filesystem()

# GitLab Repository
gitlab = GitLabStorageOptions(
    project_id=12345,
    token="glpat_xxx"
)
fs = gitlab.to_filesystem()

# Local
local = LocalStorageOptions(auto_mkdir=True)
fs = local.to_filesystem()

StorageOptions Factories

Create storage options from various sources:

from fsspeckit.storage_options import (
    from_dict,
    from_env,
    merge_storage_options,
    infer_protocol_from_uri,
    storage_options_from_uri
)

# From dictionary
opts = from_dict("s3", {"region": "us-west-2"})

# From environment variables
opts = AwsStorageOptions.from_env()

# Merge multiple options
merged = merge_storage_options(opts1, opts2, overwrite=True)

# Infer protocol from URI
protocol = infer_protocol_from_uri("s3://bucket/path")

# Create from URI
opts = storage_options_from_uri("s3://my-bucket/data")

Extended I/O Operations

fsspeckit adds rich I/O methods to all fsspec filesystems through monkey-patching.

Reading Operations

JSON Operations

from fsspeckit import filesystem

fs = filesystem(".")

# Read single JSON file
data = fs.read_json_file("data.json")  # Returns dict
df = fs.read_json_file("data.json", as_dataframe=True)  # Returns Polars DF

# Read multiple JSON files with batching
for batch in fs.read_json("data/*.json", batch_size=5):
    # Process batch
    pass

# Read JSON Lines format
df = fs.read_json("data/lines.jsonl", as_dataframe=True)

# With threading
df = fs.read_json("data/*.json", use_threads=True, num_threads=4)

# Include source file path
df = fs.read_json("data/*.json", include_file_path=True)

CSV Operations

# Read single CSV
df = fs.read_csv_file("data.csv")

# Read multiple CSV files
df = fs.read_csv("data/*.csv", concat=True)

# Batch reading
for batch in fs.read_csv("data/*.csv", batch_size=10):
    pass

# Optimize data types
df = fs.read_csv("data/*.csv", opt_dtypes=True)

# With parallelism
df = fs.read_csv("data/*.csv", use_threads=True)

Parquet Operations

# Read single Parquet file
table = fs.read_parquet_file("data.parquet")

# Read multiple with schema unification
table = fs.read_parquet("data/*.parquet", concat=True)

# Batch reading
for batch in fs.read_parquet("data/*.parquet", batch_size=20):
    pass

# With partitioning support
table = fs.read_parquet("partitioned_data/**/*.parquet", concat=True)

# Include file path column
table = fs.read_parquet("data/*.parquet", include_file_path=True)

Universal Reader

1
2
3
4
5
6
7
8
9
# Auto-detect format from file extension
df = fs.read_files("data/mixed/*", format="auto")

# Explicit format
df = fs.read_files("data/*.csv", format="csv")

# Control result type
df_polars = fs.read_files("data/*.parquet", as_dataframe=True)
table_arrow = fs.read_files("data/*.parquet", as_dataframe=False)

Dataset Operations

# Create PyArrow dataset
dataset = fs.pyarrow_dataset("data/")

# Optimized for Parquet with metadata
dataset = fs.pyarrow_parquet_dataset("partitioned_data/")

# Query dataset with filtering
filtered = dataset.to_table(filter=pyarrow.compute.greater(dataset.column("age"), 25))

# Schema inspection
print(dataset.schema)

# Statistics and metadata
print(dataset.count_rows())

Writing Operations

Parquet Writing

import polars as pl

# From Polars DataFrame
df = pl.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
fs.write_parquet(df, "output.parquet")

# From Pandas
import pandas as pd
df = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
fs.write_parquet(df, "output.parquet")

# Compression
fs.write_parquet(df, "output.parquet", compression="zstd")

# Append mode
fs.write_parquet(df, "output.parquet", mode="append")

CSV Writing

1
2
3
4
5
# Write DataFrame
df.write_csv("output.csv")

# Append to existing file
fs.write_csv(new_df, "output.csv", mode="append")

JSON Writing

1
2
3
4
5
6
7
8
# Write DataFrame to JSON
df.write_json("output.json")

# JSON Lines format
fs.write_json(df, "output.jsonl", format="json_lines")

# Include file path metadata
fs.write_json(df, "output.json", include_file_path=True)

PyArrow Dataset Writing

import pyarrow as pa

# Write partitioned dataset
table = pa.table({"year": [2023, 2023, 2024], "value": [10, 20, 30]})
fs.write_pyarrow_dataset(
    data=table,
    path="partitioned_data",
    partition_by=["year"],
    format="parquet",
    compression="zstd"
)

# Result structure: partitioned_data/year=2023/...parquet files

Cache Management

1
2
3
4
5
6
7
8
# Clear all caches
fs.clear_cache()

# Check cache size
size = fs.get_cache_size()

# Sync cache (ensure data is written)
fs.sync_cache()

Helper Utilities

Parallel Processing

from fsspeckit.utils import run_parallel

def process(item):
    return len(item)

results = run_parallel(
    process,
    ["item1", "item2", "item3"],
    n_jobs=4,
    verbose=True
)

Type Conversions

from fsspeckit.utils import (
    dict_to_dataframe,
    to_pyarrow_table
)

# Dict to DataFrame
df = dict_to_dataframe({"col1": [1, 2], "col2": [3, 4]})

# Any to PyArrow
table = to_pyarrow_table(df)

Data Type Optimization

1
2
3
4
5
6
7
8
9
import polars as pl
from fsspeckit.utils import opt_dtype_pl

# Optimize DataFrame
df = pl.read_csv("data.csv")
df_opt = opt_dtype_pl(df)

# Or use extension
df_opt = df.opt_dtype

SQL Filtering

1
2
3
4
5
6
from fsspeckit.utils import sql2pyarrow_filter
import pyarrow as pa

schema = pa.schema([("age", pa.int32()), ("name", pa.string())])
expr = sql2pyarrow_filter("age > 25 AND name = 'Alice'", schema)
filtered_table = dataset.to_table(filter=expr)

File Synchronization

1
2
3
4
5
6
7
8
from fsspeckit.utils import sync_dir

# Sync directories
sync_dir(
    fs_source, "/source/",
    fs_target, "/target/",
    overwrite=False
)

Filesystem Classes

Custom Implementations

GitLabFileSystem

Read-only filesystem for GitLab repositories:

from fsspeckit.core import filesystem

fs = filesystem(
    "gitlab",
    storage_options={
        "project_name": "group/project",
        "token": "glpat_xxx",
        "ref": "main"
    }
)

# List files
files = fs.ls("/")

# Read file
content = fs.cat("README.md")

MonitoredSimpleCacheFileSystem

Enhanced cache with logging and monitoring:

1
2
3
4
5
6
# Automatically used when cached=True
fs = filesystem("s3", cached=True, verbose=True)

# Monitor cache operations
fs.sync_cache()
size = fs.get_cache_size()

Working with DirFileSystem

from fsspeckit import filesystem

# Create DirFileSystem wrapper
fs = filesystem("/data", dirfs=True)

# Access files within the base directory
fs.ls("/subdir")

# Create hierarchical filesystem (base_fs parameter)
parent_fs = filesystem("/datasets", dirfs=True)
child_fs = filesystem("/datasets/project1", dirfs=True, base_fs=parent_fs)

# Files are accessible only within the base directory
# Attempting to escape raises an error

Configuration Methods

All storage option classes provide conversion methods:

opts = AwsStorageOptions(...)

# Convert to fsspec kwargs
kwargs = opts.to_fsspec_kwargs()

# Convert to filesystem
fs = opts.to_filesystem()

# Convert to object store kwargs (for deltalake, etc.)
obj_store_kwargs = opts.to_object_store_kwargs()

# Convert to YAML
yaml_str = opts.to_yaml()

# Load from YAML
opts = AwsStorageOptions.from_yaml(yaml_str)

# Convert to environment variables
env = opts.to_env()

# Load from environment
opts = AwsStorageOptions.from_env()

Error Handling

from fsspeckit import filesystem
from fsspeckit.storage_options import AwsStorageOptions

try:
    fs = filesystem("s3", storage_options={"region": "invalid"})
    data = fs.cat("s3://bucket/file.txt")
except Exception as e:
    print(f"Error: {e}")

# Check optional dependencies
from fsspeckit.utils import check_optional_dependency
try:
    check_optional_dependency("deltalake")
except ImportError:
    print("Install with: pip install deltalake")

Best Practices

  1. Use StorageOptions - Type-safe and consistent configuration
  2. Enable Caching - For remote filesystems, always consider caching
  3. Batch Processing - Use batch_size for large datasets
  4. Parallel I/O - Enable threading for multi-file operations
  5. Type Optimization - Use opt_dtypes=True to reduce memory
  6. Error Handling - Wrap filesystem operations in try/except

See Also