Source code for adamops.data.loaders

"""
AdamOps Data Loaders Module

Provides comprehensive data loading capabilities from various sources:
- CSV files with auto-encoding detection
- Excel files (.xlsx, .xls)
- JSON files
- SQL databases (SQLite, PostgreSQL, MySQL)
- API/URL endpoints
- Compressed files (.zip, .gz)
"""

import os
import io
import json
import gzip
import zipfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from urllib.parse import urlparse

import pandas as pd
import numpy as np

try:
    import chardet
    CHARDET_AVAILABLE = True
except ImportError:
    CHARDET_AVAILABLE = False

try:
    import requests
    REQUESTS_AVAILABLE = True
except ImportError:
    REQUESTS_AVAILABLE = False

try:
    from sqlalchemy import create_engine, text
    SQLALCHEMY_AVAILABLE = True
except ImportError:
    SQLALCHEMY_AVAILABLE = False

from adamops.utils.logging import get_logger
from adamops.utils.helpers import ensure_dir

logger = get_logger(__name__)


# =============================================================================
# Encoding Detection
# =============================================================================

[docs] def detect_encoding(filepath: Union[str, Path], sample_size: int = 10000) -> str: """ Detect the encoding of a file. Args: filepath: Path to the file. sample_size: Number of bytes to sample for detection. Returns: str: Detected encoding (e.g., 'utf-8', 'latin-1'). Example: >>> encoding = detect_encoding("data.csv") >>> print(encoding) 'utf-8' """ if not CHARDET_AVAILABLE: logger.warning("chardet not available, defaulting to utf-8") return "utf-8" with open(filepath, "rb") as f: raw_data = f.read(sample_size) result = chardet.detect(raw_data) encoding = result.get("encoding", "utf-8") confidence = result.get("confidence", 0) logger.debug(f"Detected encoding: {encoding} (confidence: {confidence:.2%})") # Fall back to utf-8 if detection is uncertain if confidence < 0.5: encoding = "utf-8" return encoding or "utf-8"
# ============================================================================= # CSV Loading # =============================================================================
[docs] def load_csv( filepath: Union[str, Path], encoding: Optional[str] = None, auto_detect_encoding: bool = True, sep: str = ",", header: Union[int, List[int], str] = "infer", index_col: Optional[Union[int, str, List]] = None, usecols: Optional[List] = None, dtype: Optional[Dict] = None, parse_dates: Optional[Union[bool, List]] = None, na_values: Optional[List] = None, nrows: Optional[int] = None, skiprows: Optional[Union[int, List]] = None, low_memory: bool = True, **kwargs ) -> pd.DataFrame: """ Load data from a CSV file with auto-encoding detection. Args: filepath: Path to the CSV file. encoding: File encoding. If None and auto_detect_encoding is True, encoding will be detected automatically. auto_detect_encoding: Whether to auto-detect encoding. sep: Column separator. header: Row number(s) to use as column names. index_col: Column(s) to use as index. usecols: Columns to load. dtype: Data types for columns. parse_dates: Columns to parse as dates. na_values: Additional values to treat as NA. nrows: Number of rows to read. skiprows: Rows to skip. low_memory: Use low memory mode. **kwargs: Additional arguments passed to pd.read_csv. Returns: pd.DataFrame: Loaded data. Example: >>> df = load_csv("data.csv") >>> df = load_csv("data.csv", usecols=["id", "name", "value"]) >>> df = load_csv("data.csv", parse_dates=["date_column"]) """ filepath = Path(filepath) if not filepath.exists(): raise FileNotFoundError(f"File not found: {filepath}") # Detect encoding if not specified if encoding is None and auto_detect_encoding: encoding = detect_encoding(filepath) elif encoding is None: encoding = "utf-8" logger.info(f"Loading CSV: {filepath} (encoding: {encoding})") try: df = pd.read_csv( filepath, encoding=encoding, sep=sep, header=header, index_col=index_col, usecols=usecols, dtype=dtype, parse_dates=parse_dates, na_values=na_values, nrows=nrows, skiprows=skiprows, low_memory=low_memory, **kwargs ) logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") return df except UnicodeDecodeError: # Try with different encodings for fallback_encoding in ["latin-1", "cp1252", "iso-8859-1"]: try: logger.warning(f"Retrying with {fallback_encoding} encoding") df = pd.read_csv( filepath, encoding=fallback_encoding, sep=sep, header=header, index_col=index_col, usecols=usecols, dtype=dtype, parse_dates=parse_dates, na_values=na_values, nrows=nrows, skiprows=skiprows, low_memory=low_memory, **kwargs ) logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") return df except UnicodeDecodeError: continue raise
# ============================================================================= # Excel Loading # =============================================================================
[docs] def load_excel( filepath: Union[str, Path], sheet_name: Union[str, int, List, None] = 0, header: Union[int, List[int], None] = 0, index_col: Optional[Union[int, str, List]] = None, usecols: Optional[Union[str, List]] = None, dtype: Optional[Dict] = None, parse_dates: Optional[Union[bool, List]] = None, na_values: Optional[List] = None, nrows: Optional[int] = None, skiprows: Optional[Union[int, List]] = None, **kwargs ) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]: """ Load data from an Excel file (.xlsx, .xls). Args: filepath: Path to the Excel file. sheet_name: Sheet name or index, or list for multiple sheets. Use None to read all sheets. header: Row number(s) to use as column names. index_col: Column(s) to use as index. usecols: Columns to load. dtype: Data types for columns. parse_dates: Columns to parse as dates. na_values: Additional values to treat as NA. nrows: Number of rows to read. skiprows: Rows to skip. **kwargs: Additional arguments passed to pd.read_excel. Returns: pd.DataFrame or Dict[str, pd.DataFrame]: Loaded data. Example: >>> df = load_excel("data.xlsx") >>> df = load_excel("data.xlsx", sheet_name="Sheet1") >>> sheets = load_excel("data.xlsx", sheet_name=None) # All sheets """ filepath = Path(filepath) if not filepath.exists(): raise FileNotFoundError(f"File not found: {filepath}") logger.info(f"Loading Excel: {filepath}") result = pd.read_excel( filepath, sheet_name=sheet_name, header=header, index_col=index_col, usecols=usecols, dtype=dtype, parse_dates=parse_dates, na_values=na_values, nrows=nrows, skiprows=skiprows, **kwargs ) if isinstance(result, dict): for name, df in result.items(): logger.info(f"Sheet '{name}': {len(df)} rows, {len(df.columns)} columns") else: logger.info(f"Loaded {len(result)} rows, {len(result.columns)} columns") return result
[docs] def get_excel_sheet_names(filepath: Union[str, Path]) -> List[str]: """ Get sheet names from an Excel file. Args: filepath: Path to the Excel file. Returns: List[str]: List of sheet names. """ filepath = Path(filepath) if not filepath.exists(): raise FileNotFoundError(f"File not found: {filepath}") excel_file = pd.ExcelFile(filepath) return excel_file.sheet_names
# ============================================================================= # JSON Loading # =============================================================================
[docs] def load_json( filepath: Union[str, Path], orient: Optional[str] = None, lines: bool = False, encoding: str = "utf-8", **kwargs ) -> pd.DataFrame: """ Load data from a JSON file. Args: filepath: Path to the JSON file. orient: JSON structure orientation. Options: 'split', 'records', 'index', 'columns', 'values', 'table' lines: Read file as line-delimited JSON. encoding: File encoding. **kwargs: Additional arguments passed to pd.read_json. Returns: pd.DataFrame: Loaded data. Example: >>> df = load_json("data.json") >>> df = load_json("data.jsonl", lines=True) """ filepath = Path(filepath) if not filepath.exists(): raise FileNotFoundError(f"File not found: {filepath}") logger.info(f"Loading JSON: {filepath}") df = pd.read_json( filepath, orient=orient, lines=lines, encoding=encoding, **kwargs ) logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") return df
[docs] def load_json_nested( filepath: Union[str, Path], record_path: Optional[Union[str, List[str]]] = None, meta: Optional[List[str]] = None, max_level: Optional[int] = None, encoding: str = "utf-8", ) -> pd.DataFrame: """ Load nested JSON data and normalize it to a flat DataFrame. Args: filepath: Path to the JSON file. record_path: Path to the records in the JSON structure. meta: Fields to include from higher level. max_level: Maximum normalization depth. encoding: File encoding. Returns: pd.DataFrame: Normalized data. Example: >>> # For JSON like: {"data": [{"id": 1, "info": {"name": "A"}}]} >>> df = load_json_nested("data.json", record_path="data") """ filepath = Path(filepath) if not filepath.exists(): raise FileNotFoundError(f"File not found: {filepath}") logger.info(f"Loading nested JSON: {filepath}") with open(filepath, "r", encoding=encoding) as f: data = json.load(f) df = pd.json_normalize( data, record_path=record_path, meta=meta, max_level=max_level, ) logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") return df
# ============================================================================= # SQL Loading # =============================================================================
[docs] def load_sql( query: str, connection_string: str, params: Optional[Dict] = None, index_col: Optional[Union[str, List[str]]] = None, parse_dates: Optional[Union[List[str], Dict]] = None, chunksize: Optional[int] = None, **kwargs ) -> Union[pd.DataFrame, pd.io.sql.SQLiteDatabase]: """ Load data from a SQL database. Supports SQLite, PostgreSQL, MySQL, and other SQLAlchemy-compatible databases. Args: query: SQL query to execute. connection_string: Database connection string. Examples: - SQLite: "sqlite:///database.db" - PostgreSQL: "postgresql://user:pass@host:port/db" - MySQL: "mysql+pymysql://user:pass@host:port/db" params: Query parameters. index_col: Column(s) to use as index. parse_dates: Columns to parse as dates. chunksize: Number of rows per chunk (for large datasets). **kwargs: Additional arguments passed to pd.read_sql. Returns: pd.DataFrame or Iterator: Loaded data. Example: >>> df = load_sql("SELECT * FROM users", "sqlite:///app.db") >>> df = load_sql( ... "SELECT * FROM orders WHERE date > :date", ... "postgresql://user:pass@localhost:5432/shop", ... params={"date": "2023-01-01"} ... ) """ if not SQLALCHEMY_AVAILABLE: raise ImportError("SQLAlchemy is required for SQL loading. Install with: pip install sqlalchemy") logger.info(f"Loading from SQL database") engine = create_engine(connection_string) # Use text() for raw SQL queries with params if params: query = text(query) df = pd.read_sql( query, engine, params=params, index_col=index_col, parse_dates=parse_dates, chunksize=chunksize, **kwargs ) if chunksize is None: logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") else: logger.info(f"Created chunked reader with chunksize={chunksize}") return df
[docs] def load_sql_table( table_name: str, connection_string: str, schema: Optional[str] = None, columns: Optional[List[str]] = None, index_col: Optional[Union[str, List[str]]] = None, chunksize: Optional[int] = None, **kwargs ) -> pd.DataFrame: """ Load an entire table from a SQL database. Args: table_name: Name of the table to load. connection_string: Database connection string. schema: Database schema. columns: Columns to load (None for all). index_col: Column(s) to use as index. chunksize: Number of rows per chunk. **kwargs: Additional arguments. Returns: pd.DataFrame: Loaded data. """ if not SQLALCHEMY_AVAILABLE: raise ImportError("SQLAlchemy is required for SQL loading. Install with: pip install sqlalchemy") logger.info(f"Loading table: {table_name}") engine = create_engine(connection_string) df = pd.read_sql_table( table_name, engine, schema=schema, columns=columns, index_col=index_col, chunksize=chunksize, **kwargs ) if chunksize is None: logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") return df
# ============================================================================= # API/URL Loading # =============================================================================
[docs] def load_url( url: str, format: str = "csv", params: Optional[Dict] = None, headers: Optional[Dict] = None, auth: Optional[tuple] = None, timeout: int = 30, **kwargs ) -> pd.DataFrame: """ Load data from a URL. Args: url: URL to load data from. format: Data format ('csv', 'json', 'excel'). params: Query parameters. headers: HTTP headers. auth: Authentication tuple (username, password). timeout: Request timeout in seconds. **kwargs: Additional arguments for the format loader. Returns: pd.DataFrame: Loaded data. Example: >>> df = load_url("https://example.com/data.csv") >>> df = load_url( ... "https://api.example.com/data", ... format="json", ... headers={"Authorization": "Bearer token"} ... ) """ if not REQUESTS_AVAILABLE: raise ImportError("requests is required for URL loading. Install with: pip install requests") logger.info(f"Loading from URL: {url}") response = requests.get( url, params=params, headers=headers, auth=auth, timeout=timeout, ) response.raise_for_status() content = io.BytesIO(response.content) if format == "csv": df = pd.read_csv(content, **kwargs) elif format == "json": df = pd.read_json(content, **kwargs) elif format == "excel": df = pd.read_excel(content, **kwargs) else: raise ValueError(f"Unsupported format: {format}") logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") return df
[docs] def load_api( url: str, method: str = "GET", params: Optional[Dict] = None, data: Optional[Dict] = None, json_data: Optional[Dict] = None, headers: Optional[Dict] = None, auth: Optional[tuple] = None, timeout: int = 30, data_key: Optional[str] = None, paginate: bool = False, page_key: str = "page", limit_key: str = "limit", limit: int = 100, max_pages: int = 100, ) -> pd.DataFrame: """ Load data from a REST API with pagination support. Args: url: API endpoint URL. method: HTTP method. params: Query parameters. data: Form data. json_data: JSON body data. headers: HTTP headers. auth: Authentication tuple. timeout: Request timeout. data_key: Key in response containing the data array. paginate: Whether to paginate through results. page_key: Parameter name for page number. limit_key: Parameter name for page size. limit: Number of items per page. max_pages: Maximum number of pages to fetch. Returns: pd.DataFrame: Loaded data. Example: >>> df = load_api( ... "https://api.example.com/users", ... headers={"Authorization": "Bearer token"}, ... data_key="users", ... paginate=True ... ) """ if not REQUESTS_AVAILABLE: raise ImportError("requests is required for API loading. Install with: pip install requests") logger.info(f"Loading from API: {url}") all_data = [] page = 1 while True: # Build params for this request request_params = dict(params or {}) if paginate: request_params[page_key] = page request_params[limit_key] = limit response = requests.request( method=method, url=url, params=request_params, data=data, json=json_data, headers=headers, auth=auth, timeout=timeout, ) response.raise_for_status() result = response.json() # Extract data if data_key: page_data = result.get(data_key, []) else: page_data = result if isinstance(result, list) else [result] all_data.extend(page_data) # Check if we should continue paginating if not paginate or len(page_data) < limit or page >= max_pages: break page += 1 logger.debug(f"Fetching page {page}...") df = pd.DataFrame(all_data) logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") return df
# ============================================================================= # Compressed Files # =============================================================================
[docs] def load_compressed( filepath: Union[str, Path], format: str = "csv", compression: Optional[str] = None, **kwargs ) -> pd.DataFrame: """ Load data from a compressed file (.zip, .gz, .bz2, .xz). Args: filepath: Path to the compressed file. format: Data format inside the archive ('csv', 'json', 'excel'). compression: Compression type. Auto-detected if None. **kwargs: Additional arguments for the format loader. Returns: pd.DataFrame: Loaded data. Example: >>> df = load_compressed("data.csv.gz") >>> df = load_compressed("archive.zip", format="csv") """ filepath = Path(filepath) if not filepath.exists(): raise FileNotFoundError(f"File not found: {filepath}") # Auto-detect compression type if compression is None: suffix = filepath.suffix.lower() if suffix == ".gz": compression = "gzip" elif suffix == ".bz2": compression = "bz2" elif suffix == ".xz": compression = "xz" elif suffix == ".zip": compression = "zip" else: compression = "infer" logger.info(f"Loading compressed file: {filepath} ({compression})") if compression == "zip": return _load_from_zip(filepath, format, **kwargs) else: if format == "csv": df = pd.read_csv(filepath, compression=compression, **kwargs) elif format == "json": df = pd.read_json(filepath, compression=compression, **kwargs) else: raise ValueError(f"Unsupported format for compression: {format}") logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns") return df
def _load_from_zip( filepath: Union[str, Path], format: str = "csv", file_pattern: Optional[str] = None, **kwargs ) -> pd.DataFrame: """Load data from within a ZIP archive.""" with zipfile.ZipFile(filepath, "r") as z: file_list = z.namelist() # Filter files by pattern or extension if file_pattern: import fnmatch matching_files = [f for f in file_list if fnmatch.fnmatch(f, file_pattern)] else: ext = f".{format}" matching_files = [f for f in file_list if f.endswith(ext)] if not matching_files: raise ValueError(f"No {format} files found in archive") # Load the first matching file (or concatenate all) if len(matching_files) == 1: with z.open(matching_files[0]) as f: content = io.BytesIO(f.read()) if format == "csv": return pd.read_csv(content, **kwargs) elif format == "json": return pd.read_json(content, **kwargs) elif format == "excel": return pd.read_excel(content, **kwargs) else: # Concatenate all matching files dfs = [] for filename in matching_files: with z.open(filename) as f: content = io.BytesIO(f.read()) if format == "csv": df = pd.read_csv(content, **kwargs) elif format == "json": df = pd.read_json(content, **kwargs) dfs.append(df) return pd.concat(dfs, ignore_index=True) # ============================================================================= # Auto Loader # =============================================================================
[docs] def load_auto( source: Union[str, Path], **kwargs ) -> pd.DataFrame: """ Automatically detect and load data from various sources. Supports CSV, Excel, JSON, SQL, and compressed files. Automatically detects the format based on file extension or URL. Args: source: Path to file, URL, or SQL connection string. **kwargs: Additional arguments passed to the appropriate loader. Returns: pd.DataFrame: Loaded data. Example: >>> df = load_auto("data.csv") >>> df = load_auto("https://example.com/data.json") >>> df = load_auto("data.xlsx") """ source_str = str(source) # Check if it's a URL if source_str.startswith(("http://", "https://")): parsed = urlparse(source_str) path = parsed.path.lower() if path.endswith(".csv"): return load_url(source_str, format="csv", **kwargs) elif path.endswith(".json") or path.endswith(".jsonl"): return load_url(source_str, format="json", **kwargs) elif path.endswith((".xlsx", ".xls")): return load_url(source_str, format="excel", **kwargs) else: # Try JSON by default for API endpoints return load_url(source_str, format="json", **kwargs) # It's a file path filepath = Path(source) suffix = filepath.suffix.lower() # Remove compression suffix to get actual format if suffix in [".gz", ".bz2", ".xz", ".zip"]: if suffix == ".zip": return load_compressed(filepath, **kwargs) # Get the format from the second-to-last suffix stem = filepath.stem inner_suffix = Path(stem).suffix.lower() if inner_suffix == ".csv": return load_compressed(filepath, format="csv", **kwargs) elif inner_suffix == ".json": return load_compressed(filepath, format="json", **kwargs) else: return load_compressed(filepath, format="csv", **kwargs) # Standard file types if suffix == ".csv": return load_csv(filepath, **kwargs) elif suffix in [".xlsx", ".xls"]: return load_excel(filepath, **kwargs) elif suffix in [".json", ".jsonl"]: lines = suffix == ".jsonl" return load_json(filepath, lines=lines, **kwargs) elif suffix == ".parquet": return pd.read_parquet(filepath, **kwargs) elif suffix == ".feather": return pd.read_feather(filepath, **kwargs) elif suffix == ".pickle" or suffix == ".pkl": return pd.read_pickle(filepath, **kwargs) else: # Try CSV as default logger.warning(f"Unknown file format: {suffix}, trying CSV") return load_csv(filepath, **kwargs)
# ============================================================================= # Data Saving # =============================================================================
[docs] def save_csv( df: pd.DataFrame, filepath: Union[str, Path], index: bool = False, encoding: str = "utf-8", **kwargs ) -> None: """ Save DataFrame to CSV file. Args: df: DataFrame to save. filepath: Output file path. index: Whether to include index. encoding: File encoding. **kwargs: Additional arguments passed to df.to_csv. """ filepath = Path(filepath) ensure_dir(filepath.parent) df.to_csv(filepath, index=index, encoding=encoding, **kwargs) logger.info(f"Saved {len(df)} rows to {filepath}")
[docs] def save_excel( df: pd.DataFrame, filepath: Union[str, Path], sheet_name: str = "Sheet1", index: bool = False, **kwargs ) -> None: """ Save DataFrame to Excel file. Args: df: DataFrame to save. filepath: Output file path. sheet_name: Name of the sheet. index: Whether to include index. **kwargs: Additional arguments. """ filepath = Path(filepath) ensure_dir(filepath.parent) df.to_excel(filepath, sheet_name=sheet_name, index=index, **kwargs) logger.info(f"Saved {len(df)} rows to {filepath}")
[docs] def save_json( df: pd.DataFrame, filepath: Union[str, Path], orient: str = "records", indent: int = 2, **kwargs ) -> None: """ Save DataFrame to JSON file. Args: df: DataFrame to save. filepath: Output file path. orient: JSON structure orientation. indent: Indentation level. **kwargs: Additional arguments. """ filepath = Path(filepath) ensure_dir(filepath.parent) df.to_json(filepath, orient=orient, indent=indent, **kwargs) logger.info(f"Saved {len(df)} rows to {filepath}")