Healpyxel
  • Home
  • Quickstart
  • Source Code
  • Report a Bug
  1. API Reference
  2. Accumulator
  • HealPyxel
  • Examples
    • Quickstart
    • Complete workflow
    • Gaussian PSF - WIP!
    • Streaming Accumulation - WIP!
    • Streaming - WIP!
  • API Reference
    • Package Structure
    • HEALPix Sidecar
    • HEALPix Aggregate
    • Accumulator
    • Usage Example
    • Generate HEALPix sidecar
    • Optional Dependencies
    • Development: opportunistic cache use (default)

On this page

  • Features:
  • Example Usage:
  • Incremental Update with Subsequent Batch:
  • With Data Filtering:
  • StreamingStats
  • CellAccumulator
  • accumulate_batch
  • save_state
  • load_state
  • validate_accumulator_sidecar_compatibility
  • find_sidecar
  • main
  • Usage Example
  • Report an issue

Other Formats

  • CommonMark
  1. API Reference
  2. Accumulator

Accumulator

This module provides functionality for accumulating streaming planetary science data.

This module provides functionality for accumulating streaming planetary science data into HEALPix cells. It is designed to handle incremental updates to aggregated statistics, enabling efficient processing of large datasets in a streaming or batch context. The module supports robust statistical calculations, approximate percentiles, and efficient state management.

Key Components:

  • StreamingStats: A class for maintaining running statistics (mean, std, min, max) without storing raw data.
  • CellAccumulator: Manages statistics for a single HEALPix cell, including optional T-Digest for percentiles.
  • accumulate_batch: Processes a batch of data and updates the accumulator state.
  • save_state: Saves the accumulator state to a Parquet file with metadata.
  • load_state: Loads the accumulator state from a Parquet file.
  • validate_accumulator_sidecar_compatibility: Ensures compatibility between state and sidecar metadata.
  • find_sidecar: Locates the appropriate sidecar file for input data.

Features:

  • Streaming Statistics: Uses Welford’s algorithm for mean and standard deviation, avoiding the need to store raw data.
  • Percentile Tracking: Supports approximate median and percentiles via T-Digest (optional).
  • State Management: Saves and loads accumulator state as Parquet files with embedded HEALPix metadata.
  • Sidecar Integration: Maps observations to HEALPix cells using a sidecar file.
  • Filter Support: Allows filtering of input data using pandas query expressions.
  • Parallel Processing: Optional support for Dask for distributed or parallel computation.

Example Usage:

Initialize State from First Batch:

healpix_accumulator \
    --input observations_day001.parquet \
    --sidecar sidecars/day001_nside-512.parquet \
    --columns r750 r950 vis_slope \
    --state-output state/state_v001.parquet

Incremental Update with Subsequent Batch:

healpix_accumulator \
    --input observations_day002.parquet \
    --sidecar sidecars/day002_nside-512.parquet \
    --columns r750 r950 vis_slope \
    --state-input state/state_v001.parquet \
    --state-output state/state_v002.parquet

With Data Filtering:

healpix_accumulator \
    --input observations_day003.parquet \
    --sidecar sidecars/day003_nside-512.parquet \
    --columns r750 r950 vis_slope \
    --state-input state/state_v002.parquet \
    --state-output state/state_v003.parquet \
    --filter "quality > 0.5 and solar_zenith < 80"

StreamingStats

 StreamingStats ()

*Container for streaming statistics using Welford’s algorithm.

Maintains running statistics (mean, std, min, max) without storing raw data.*


CellAccumulator

 CellAccumulator (use_tdigest:bool=True)

*Accumulator for a single HEALPix cell.

Maintains streaming statistics for multiple columns plus optional T-Digest for approximate percentile computation.*


accumulate_batch

 accumulate_batch (new_data:pandas.core.frame.DataFrame,
                   sidecar:pandas.core.frame.DataFrame,
                   value_columns:List[str], existing_state:Optional[Dict[i
                   nt,__main__.CellAccumulator]]=None,
                   use_tdigest:bool=True, filter_expr:Optional[str]=None)

*Process one batch of data and update accumulator state.

Args: new_data: DataFrame with observations (index = source_id, implicit or explicit) sidecar: HEALPix mapping with columns [‘source_id’, ‘healpix_id’] (source_id may be duplicated in fuzzy mode—one row per cell) value_columns: Columns to accumulate existing_state: Previous accumulator state (None for first batch) use_tdigest: Enable T-Digest for approximate percentiles filter_expr: Optional pandas query expression to filter data

Returns: Updated state dictionary {healpix_id: CellAccumulator}

Note: In fuzzy mode, sidecar has multiple rows per source_id (one per touched HEALPix cell). This function handles the many-to-one relationship correctly by grouping on healpix_id.*


save_state

 save_state (state, output_path, meta, processing_metadata=None)

*Serialize accumulated state dict to Parquet with metadata.

Args: state: Dict[int, CellAccumulator] from accumulate_batch() output_path: Path to write parquet file meta: HEALPyxelxMetadata instance (provides nside, order, lon_convention) processing_metadata: Optional dict with batch info*


load_state

 load_state (input_path:pathlib.Path, use_tdigest:bool=True)

*Load accumulator state and HEALPix metadata from parquet file.

Attempts to load embedded or companion HEALPyxelxMetadata to validate state file consistency.

Args: input_path: Path to state parquet file use_tdigest: Whether to restore T-Digest data

Returns: Tuple of (state dict, metadata) where metadata may be None if not found

Raises: FileNotFoundError: If state file does not exist*


validate_accumulator_sidecar_compatibility

 validate_accumulator_sidecar_compatibility
                                             (state_meta:healpyxel.metadat
                                             a.HEALPyxelxMetadata, sidecar
                                             _meta:healpyxel.metadata.HEAL
                                             PyxelxMetadata)

*Validate that accumulator state is compatible with sidecar file.

Checks that nside, mode, and order match to prevent silent corruption from mixing incompatible files.

Args: state_meta: Metadata from loaded state file sidecar_meta: Metadata from sidecar file

Returns: dict with validation results

Raises: AssertionError: If critical parameters mismatch*


find_sidecar

 find_sidecar (input_path:pathlib.Path, nside:Optional[int]=None,
               mode:str='fuzzy')

*Attempt to find matching sidecar file for input data.

Args: input_path: Path to input parquet file nside: Desired nside (if None, finds any matching sidecar) mode: Assignment mode (‘fuzzy’ or ‘strict’)

Returns: Path to matching sidecar file, or None if not found*


main

 main (argv=None)

Usage Example

See the main() function for CLI usage, or import functions directly for programmatic use.

  • Report an issue