Skip to content

API Patterns

This guide covers the new API patterns in Duckalog's modular architecture, including dependency injection, request-scoped caching, and advanced customization options.

Overview

The new Duckalog architecture provides:

  • Dependency Injection: Customizable components for configuration loading
  • Request-Scoped Caching: Performance optimization for batch operations
  • Modular Design: Clean separation of concerns with extensible interfaces
  • Backward Compatibility: All existing patterns continue to work

Basic vs Advanced Patterns

The basic pattern uses the high-level convenience functions:

from duckalog import load_config, build_catalog, generate_sql

# Load configuration (same as before)
config = load_config("catalog.yaml")

# Build catalog (same as before)  
build_catalog("catalog.yaml")

# Generate SQL (same as before)
sql = generate_sql("catalog.yaml")

Advanced Pattern (For Custom Implementations)

The advanced pattern provides fine-grained control over the loading process:

from duckalog.config.api import load_config
from duckalog.config.resolution.imports import request_cache_scope
import fsspec

# Custom filesystem for remote access
filesystem = fsspec.filesystem("s3", key="access_key", secret="secret_key")

# Enhanced configuration loading
config = load_config(
    "s3://bucket/config.yaml",
    filesystem=filesystem,
    load_dotenv=False,
    sql_file_loader=custom_loader
)

# Batch processing with caching
configs = []
with request_cache_scope() as context:
    for config_file in config_files:
        config = load_config(config_file)
        configs.append(config)

Dependency Injection Patterns

1. Custom Filesystem Implementation

Create custom filesystem backends for specialized storage systems:

import fsspec
from duckalog.config.api import load_config

# Custom filesystem with special configuration
custom_fs = fsspec.filesystem(
    "myprotocol",
    endpoint="https://my-storage.example.com",
    auth_token="secret-token",
    timeout=30
)

# Use custom filesystem for configuration loading
config = load_config(
    "myprotocol://configs/analytics.yaml",
    filesystem=custom_fs
)

2. Enhanced SQL API Usage

The enhanced SQL API provides better organization and discoverability for SQL-related functionality:

# Option 1: Direct imports (backward compatible)
from duckalog import generate_view_sql, quote_ident, quote_literal, SQLFileLoader, ViewConfig, DuckDBConfig, Config

# Option 2: Convenience groups for better organization
from duckalog import sql, utils, sql_files

# Generate SQL using convenience groups
view_config = ViewConfig(name="users", source="parquet", uri="data/users.parquet")
view_sql = sql.generate_view_sql(view_config)
quoted_identifier = utils.quote_ident("users table")
safe_literal = utils.quote_literal("path/to/file.parquet")

# Option 3: Unified SQL namespace (not yet available)
# A future `SQL` namespace may consolidate SQL functionality.
# For now, use the direct imports shown in Options 1 and 2.

3. Custom SQL File Loader

Implement custom SQL file loading for specialized template processing:

from typing import Dict, Any
from duckalog.config.api import load_config

class CustomSQLFileLoader:
    """Custom SQL loader with template processing."""

    def load_sql_file(self, file_path: str, context: Dict[str, Any]) -> str:
        # Custom template processing logic
        with open(file_path, 'r') as f:
            content = f.read()

        # Apply custom template variables
        for key, value in context.items():
            content = content.replace(f"{{{{{key}}}}}", str(value))

        return content

# Use with load_config
sql_loader = CustomSQLFileLoader()
config = load_config(
    "catalog.yaml",
    sql_file_loader=sql_loader
)

3. Custom Environment Processing

Implement custom environment variable resolution:

from duckalog.config.resolution.env import EnvProcessor, DefaultEnvProcessor
from duckalog.config.resolution.base import ImportContext

class VaultEnvProcessor(EnvProcessor):
    """Load environment variables from vault systems."""

    def process(self, config_data: dict, load_dotenv: bool = True) -> dict:
        # Process vault-based secrets first
        config_data = self._resolve_vault_secrets(config_data)

        # Fall back to standard environment processing
        default_processor = DefaultEnvProcessor()
        return default_processor.process(config_data, load_dotenv)

    def _resolve_vault_secrets(self, config_data: dict) -> dict:
        # Implement vault resolution logic
        # e.g., ${vault:secret/path:key} → actual value
        return config_data

# Use with the configuration loading system
env_processor = VaultEnvProcessor()

4. Custom Import Resolution

Implement specialized import resolution logic:

from duckalog.config.resolution.base import ImportResolver, ImportContext
from duckalog.config.resolution.imports import DefaultImportResolver

class DatabaseImportResolver(ImportResolver):
    """Load configuration imports from database instead of files."""

    def __init__(self, db_connection):
        self.db = db_connection

    def resolve(self, config_data: dict, context: ImportContext) -> dict:
        imports = config_data.get('imports', [])

        for import_spec in imports:
            if isinstance(import_spec, str) and import_spec.startswith('db://'):
                # Load from database: db://table_name/id
                table_name, config_id = import_spec[5:].split('/')
                imported_config = self._load_from_db(table_name, config_id)

                # Merge imported configuration
                config_data = self._merge_configs(config_data, imported_config)

        # Handle regular file imports with default resolver
        default_resolver = DefaultImportResolver(context.import_context)
        return default_resolver.resolve(config_data, context)

    def _load_from_db(self, table_name: str, config_id: str) -> dict:
        query = f"SELECT config_json FROM {table_name} WHERE id = ?"
        cursor = self.db.execute(query, (config_id,))
        result = cursor.fetchone()
        return json.loads(result[0]) if result else {}

    def _merge_configs(self, base: dict, imported: dict) -> dict:
        # Implement configuration merging logic
        return {**imported, **base}

# Use with request context
from duckalog.config.resolution.imports import request_cache_scope

with request_cache_scope() as context:
    db_resolver = DatabaseImportResolver(db_connection)
    # Integration would happen through custom loading logic

Request-Scoped Caching Patterns

1. Batch Configuration Loading

Efficiently load multiple related configurations:

from duckalog.config.api import load_config
from duckalog.config.resolution.imports import request_cache_scope

def load_catalog_configs(config_files: list[str]) -> list[Config]:
    """Load multiple configurations with shared caching."""
    configs = []

    with request_cache_scope() as context:
        for config_file in config_files:
            try:
                config = load_config(config_file)
                configs.append(config)
                print(f"Loaded: {config_file}")
            except Exception as e:
                print(f"Failed to load {config_file}: {e}")

    return configs

# Usage
config_files = [
    "base.yaml",
    "analytics/views.yaml", 
    "analytics/attachments.yaml",
    "reports/views.yaml"
]

configs = load_catalog_configs(config_files)

2. Performance Monitoring

Monitor caching effectiveness:

import time
from duckalog.config.resolution.imports import request_cache_scope, RequestContext

def timed_config_loading(config_files: list[str]) -> dict:
    """Load configurations with performance metrics."""
    metrics = {
        'total_time': 0,
        'configs_loaded': 0,
        'cache_hits': 0,
        'cache_misses': 0
    }

    start_time = time.time()

    with request_cache_scope() as context:
        # Track cache statistics
        initial_imports = len(context.import_context.visited_files)

        for config_file in config_files:
            file_start = time.time()
            try:
                config = load_config(config_file)
                file_time = time.time() - file_start
                metrics['configs_loaded'] += 1
                print(f"{config_file}: {file_time:.3f}s")
            except Exception as e:
                print(f"Error loading {config_file}: {e}")

        final_imports = len(context.import_context.visited_files)
        metrics['cache_hits'] = final_imports - len(config_files)
        metrics['cache_misses'] = len(config_files)

    metrics['total_time'] = time.time() - start_time
    return metrics

# Usage
metrics = timed_config_loading(config_files)
print(f"Total time: {metrics['total_time']:.3f}s")
print(f"Cache hits: {metrics['cache_hits']}")

3. Conditional Cache Usage

Use caching strategically based on configuration relationships:

from pathlib import Path
from duckalog.config.api import load_config
from duckalog.config.resolution.imports import request_cache_scope

def load_config_smart(config_path: str, use_cache: bool = True) -> Config:
    """Load configuration with optional caching based on complexity."""

    # Detect configuration complexity
    with open(config_path, 'r') as f:
        content = f.read()

    has_imports = 'imports:' in content
    is_large_file = len(content) > 10000  # 10KB threshold

    # Use caching for complex configurations
    if use_cache and (has_imports or is_large_file):
        with request_cache_scope() as context:
            print(f"Loading {config_path} with caching enabled")
            return load_config(config_path)
    else:
        print(f"Loading {config_path} without caching (simple config)")
        return load_config(config_path)

# Usage
config1 = load_config_smart("simple.yaml", use_cache=False)
config2 = load_config_smart("complex.yaml", use_cache=True)

Custom Implementation Patterns

1. Configuration Validation Pipeline

Create custom validation pipelines:

from duckalog.config.api import load_config
from duckalog.config.resolution.base import ImportResolver, ImportContext
from typing import List, Callable

class ValidationPipeline(ImportResolver):
    """Applies multiple validation steps during import resolution."""

    def __init__(self, validators: List[Callable], context: ImportContext):
        self.validators = validators
        self.context = context
        self.default_resolver = DefaultImportResolver(context)

    def resolve(self, config_data: dict, context: ImportContext) -> dict:
        # Apply validation pipeline
        for validator in self.validators:
            config_data = validator(config_data)

        # Continue with standard resolution
        return self.default_resolver.resolve(config_data, context)

def validate_naming_conventions(config_data: dict) -> dict:
    """Validate configuration naming conventions."""
    views = config_data.get('views', [])

    for view in views:
        view_name = view.get('name', '')
        if not view_name.islower():
            raise ValueError(f"View name '{view_name}' should be lowercase")

    return config_data

def validate_required_secrets(config_data: dict) -> dict:
    """Validate that required secrets are configured."""
    if 'secrets' not in config_data:
        views_with_secrets = [
            v for v in config_data.get('views', [])
            if v.get('source') in ['s3', 'gcs', 'azure']
        ]

        if views_with_secrets:
            raise ValueError("Secrets configuration required for cloud storage views")

    return config_data

# Usage
validators = [
    validate_naming_conventions,
    validate_required_secrets
]

with request_cache_scope() as context:
    pipeline = ValidationPipeline(validators, context.import_context)
    # Integration would need custom loading logic

2. Configuration Transformation Pipeline

Apply transformations during loading:

from typing import Dict, Any

class ConfigTransformer:
    """Applies transformations during configuration loading."""

    def __init__(self, transformations: Dict[str, Callable]):
        self.transformations = transformations

    def transform(self, config_data: dict) -> dict:
        """Apply all registered transformations."""
        for pattern, transformer in self.transformations.items():
            if self._matches_pattern(config_data, pattern):
                config_data = transformer(config_data)

        return config_data

    def _matches_pattern(self, config_data: dict, pattern: str) -> bool:
        """Check if configuration matches transformation pattern."""
        # Simple pattern matching - can be enhanced
        return pattern in str(config_data).lower()

# Example transformations
def add_environment_prefix(config_data: dict) -> dict:
    """Add environment prefix to all view names."""
    import os
    env = os.getenv('DEPLOYMENT_ENV', 'dev')

    views = config_data.get('views', [])
    for view in views:
        original_name = view.get('name', '')
        view['name'] = f"{env}_{original_name}"

    return config_data

def normalize_paths(config_data: dict) -> dict:
    """Normalize all file paths to use forward slashes."""
    def normalize_path(path):
        return path.replace('\\', '/') if path else path

    # Normalize paths in views
    for view in config_data.get('views', []):
        if 'uri' in view:
            view['uri'] = normalize_path(view['uri'])

    return config_data

# Usage
transformations = {
    'environment': add_environment_prefix,
    'windows': normalize_paths
}

transformer = ConfigTransformer(transformations)
# Apply during loading process

Error Handling Patterns

1. Graceful Degradation

Handle configuration errors gracefully:

from duckalog.config.api import load_config
from duckalog.errors import ConfigError, ImportError

def load_config_with_fallback(primary_path: str, fallback_path: str = None) -> Config:
    """Load configuration with fallback options."""

    try:
        return load_config(primary_path)
    except ConfigError as e:
        print(f"Failed to load primary config {primary_path}: {e}")

        if fallback_path:
            try:
                print(f"Attempting fallback config: {fallback_path}")
                return load_config(fallback_path)
            except ConfigError as fallback_error:
                print(f"Fallback config also failed: {fallback_error}")
                raise ConfigError(f"Both primary and fallback configs failed: {e}, {fallback_error}")

        raise

def load_partial_configs(config_paths: list[str]) -> dict:
    """Load multiple configurations, continuing on individual failures."""
    results = {
        'successful': [],
        'failed': [],
        'configs': {}
    }

    for config_path in config_paths:
        try:
            config = load_config(config_path)
            results['successful'].append(config_path)
            results['configs'][config_path] = config
        except Exception as e:
            results['failed'].append((config_path, str(e)))
            print(f"Failed to load {config_path}: {e}")

    return results

# Usage
try:
    config = load_config_with_fallback("production.yaml", "default.yaml")
except ConfigError as e:
    print(f"Configuration loading failed: {e}")
    # Handle appropriately

partial_results = load_partial_configs(config_files)
print(f"Loaded {len(partial_results['successful'])}/{len(config_files)} configs")

2. Comprehensive Error Reporting

Provide detailed error context:

import traceback
from typing import Dict, Any

class ConfigLoadError(Exception):
    """Enhanced configuration loading error with context."""

    def __init__(self, message: str, config_path: str, context: Dict[str, Any] = None):
        super().__init__(message)
        self.config_path = config_path
        self.context = context or {}
        self.traceback = traceback.format_exc()

def load_config_with_context(config_path: str, **kwargs) -> Config:
    """Load configuration with enhanced error context."""

    context = {
        'config_path': config_path,
        'load_time': time.time(),
        'kwargs': kwargs,
        'working_directory': os.getcwd()
    }

    try:
        config = load_config(config_path, **kwargs)
        context['load_success'] = True
        return config

    except Exception as e:
        context['load_success'] = False
        context['error_type'] = type(e).__name__
        context['error_message'] = str(e)

        # Add additional context for debugging
        if os.path.exists(config_path):
            stat = os.stat(config_path)
            context['file_size'] = stat.st_size
            context['file_modified'] = stat.st_mtime

        raise ConfigLoadError(
            f"Failed to load configuration: {e}",
            config_path,
            context
        ) from e

# Usage
try:
    config = load_config_with_context("complex.yaml", filesystem=custom_fs)
except ConfigLoadError as e:
    print(f"Configuration error: {e}")
    print(f"Config path: {e.config_path}")
    print(f"Context: {e.context}")
    if e.traceback:
        print(f"Traceback:\n{e.traceback}")

Testing Patterns

1. Mock Configuration Loading

Create mock configurations for testing:

from unittest.mock import Mock, patch
from tempfile import NamedTemporaryFile
import yaml
import json

def create_test_config(config_data: dict, format: str = 'yaml') -> str:
    """Create temporary configuration file for testing."""
    suffix = f'.{format}'

    with NamedTemporaryFile(mode='w', suffix=suffix, delete=False) as f:
        if format == 'yaml':
            yaml.dump(config_data, f)
        elif format == 'json':
            json.dump(config_data, f)
        else:
            raise ValueError(f"Unsupported format: {format}")

        return f.name

def test_config_loading():
    """Test configuration loading with mock data."""

    # Create test configuration
    test_config = {
        'version': 1,
        'duckdb': {'database': ':memory:'},
        'views': [
            {
                'name': 'test_view',
                'sql': 'SELECT 1 as test_column'
            }
        ]
    }

    # Create temporary config file
    config_path = create_test_config(test_config)

    try:
        # Test loading
        config = load_config(config_path)

        # Assertions
        assert config.version == 1
        assert len(config.views) == 1
        assert config.views[0].name == 'test_view'

        print("Configuration loading test passed")

    finally:
        # Cleanup
        import os
        os.unlink(config_path)

# Usage
test_config_loading()

2. Mock External Dependencies

Test with mocked filesystem and other dependencies:

from unittest.mock import Mock, patch
import pytest

def test_config_with_mock_filesystem():
    """Test configuration loading with mocked filesystem."""

    # Mock filesystem
    mock_fs = Mock()
    mock_fs.open.return_value.__enter__.return_value.read.return_value = """
    version: 1
    duckdb:
      database: :memory:
    views: []
    """

    # Mock config content
    test_config = {
        'version': 1,
        'duckdb': {'database': ':memory:'},
        'views': []
    }

    with patch('duckalog.config.api._load_config_from_local_file') as mock_load:
        mock_load.return_value = Mock(**test_config)

        config = load_config(
            "s3://bucket/config.yaml",
            filesystem=mock_fs
        )

        # Verify the mock was called with correct parameters
        mock_load.assert_called_once()
        call_args = mock_load.call_args
        assert 'filesystem' in call_args.kwargs
        assert call_args.kwargs['filesystem'] == mock_fs

# Usage
test_config_with_mock_filesystem()

Migration Patterns

1. Gradual Migration from Legacy Patterns

Migrate existing code gradually while maintaining compatibility:

# Legacy code pattern (to be migrated)
def load_and_build_legacy(config_path: str):
    """Legacy pattern using old imports."""
    from duckalog import load_config, build_catalog

    config = load_config(config_path)
    build_catalog(config_path)
    return config

# Migrated code with new patterns
def load_and_build_modern(config_path: str, use_new_features: bool = False):
    """Modern pattern with optional new features."""
    from duckalog.config.api import load_config as new_load_config
    from duckalog import build_catalog

    if use_new_features:
        # Use new enhanced loading
        config = new_load_config(
            config_path,
            resolve_paths=True,
            load_dotenv=True
        )
    else:
        # Use legacy-compatible loading
        from duckalog import load_config as legacy_load_config
        config = legacy_load_config(config_path)

    build_catalog(config_path)
    return config

# Wrapper for backward compatibility
def load_and_build(config_path: str, enable_new_features: bool = False):
    """Unified interface supporting both legacy and new patterns."""

    if enable_new_features:
        return load_and_build_modern(config_path, use_new_features=True)
    else:
        return load_and_build_legacy(config_path)

# Usage
# Legacy usage (unchanged)
config = load_and_build("catalog.yaml")

# New features usage
config = load_and_build("catalog.yaml", enable_new_features=True)

2. Feature Detection

Detect available features and adapt accordingly:

def detect_new_features() -> dict:
    """Detect which new features are available."""
    features = {
        'dependency_injection': False,
        'request_scoped_caching': False,
        'custom_resolvers': False,
        'enhanced_errors': False
    }

    try:
        from duckalog.config.api import load_config
        features['dependency_injection'] = True
    except ImportError:
        pass

    try:
        from duckalog.config.resolution.imports import request_cache_scope
        features['request_scoped_caching'] = True
    except ImportError:
        pass

    try:
        from duckalog.config.resolution.base import ImportResolver
        features['custom_resolvers'] = True
    except ImportError:
        pass

    try:
        from duckalog.errors import ConfigLoadError
        features['enhanced_errors'] = True
    except ImportError:
        pass

    return features

def adaptive_config_loading(config_path: str):
    """Adapt configuration loading based on available features."""
    features = detect_new_features()

    if features['dependency_injection'] and features['request_scoped_caching']:
        # Use new advanced pattern
        from duckalog.config.api import load_config
        from duckalog.config.resolution.imports import request_cache_scope

        with request_cache_scope() as context:
            return load_config(config_path)

    else:
        # Fall back to legacy pattern
        from duckalog import load_config
        return load_config(config_path)

# Usage
features = detect_new_features()
print(f"Available features: {features}")

config = adaptive_config_loading("catalog.yaml")

Best Practices

1. Configuration Organization

Organize configurations for maintainability:

# Configuration loading utilities
class ConfigManager:
    """Centralized configuration management."""

    def __init__(self, base_config_path: str):
        self.base_config_path = base_config_path
        self._config_cache = {}

    def load_config_with_cache(self, config_path: str, use_cache: bool = True) -> Config:
        """Load configuration with optional caching."""

        if use_cache and config_path in self._config_cache:
            return self._config_cache[config_path]

        config = load_config(config_path)

        if use_cache:
            self._config_cache[config_path] = config

        return config

    def reload_config(self, config_path: str) -> Config:
        """Force reload of configuration."""
        if config_path in self._config_cache:
            del self._config_cache[config_path]

        return self.load_config_with_cache(config_path, use_cache=True)

# Usage
config_manager = ConfigManager("base.yaml")
config = config_manager.load_config_with_cache("analytics.yaml")

2. Resource Management

Proper resource cleanup for advanced usage:

from contextlib import contextmanager

@contextmanager
def config_session(config_paths: list[str]):
    """Context manager for configuration loading sessions."""

    with request_cache_scope() as context:
        try:
            configs = {}
            for config_path in config_paths:
                configs[config_path] = load_config(config_path)

            yield configs, context

        finally:
            # Cleanup handled automatically by request_cache_scope
            pass

# Usage
with config_session(config_files) as (configs, context):
    # Use configurations
    for path, config in configs.items():
        print(f"Loaded {path}: {len(config.views)} views")

    # Context automatically cleaned up

3. Performance Optimization

Optimize configuration loading for performance:

import concurrent.futures
from typing import List

def parallel_config_loading(config_paths: List[str], max_workers: int = 4) -> List[Config]:
    """Load multiple configurations in parallel when appropriate."""

    # Check if configs are independent (no shared imports)
    independent_configs = []
    for config_path in config_paths:
        # Simple heuristic - can be enhanced
        if not _has_imports(config_path):
            independent_configs.append(config_path)

    if not independent_configs:
        # Use sequential loading with caching
        with request_cache_scope() as context:
            return [load_config(path) for path in config_paths]

    # Parallel loading for independent configs
    configs = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_path = {
            executor.submit(load_config, path): path 
            for path in independent_configs
        }

        for future in concurrent.futures.as_completed(future_to_path):
            path = future_to_path[future]
            try:
                config = future.result()
                configs.append(config)
            except Exception as e:
                print(f"Failed to load {path}: {e}")

    return configs

def _has_imports(config_path: str) -> bool:
    """Check if configuration has imports."""
    try:
        with open(config_path, 'r') as f:
            content = f.read()
            return 'imports:' in content
    except:
        return True  # Assume has imports if can't check

# Usage
configs = parallel_config_loading(config_files, max_workers=8)

These patterns provide comprehensive guidance for using both the basic and advanced features of Duckalog's new architecture while maintaining backward compatibility with existing code.