Healpyxel
  • Home
  • Quickstart
  • Source Code
  • Report a Bug
  1. Examples
  2. Streaming - WIP!
  • Start
  • Examples
    • Quickstart
    • Visualization
    • Visualization : Gaussian PSF - WIP!
    • Accumulation - WIP!
    • Streaming - WIP!
  • API Reference
    • Package Structure
    • HEALPix Sidecar
    • HEALPix Aggregate
    • HEALPix Accumulator
    • HEALPix Finalize
    • Generate HEALPix sidecar
    • Optional Dependencies
    • Geospatial

On this page

  • Step 1: Load First Batch
  • Step 2: Generate HEALPix Sidecar
  • Step 3: Streaming Accumulation Workflow
    • Conceptual Workflow
  • Step 4: Validation
  • Step 5: Performance Metrics
  • Summary
  • Report an issue

Other Formats

  • CommonMark
  1. Examples
  2. Streaming - WIP!

Streaming - WIP!

Large-scale parallel processing with incremental accumulation
from pathlib import Path
import pandas as pd
import numpy as np

# Get test data directory
pkg_root = Path().absolute()
if pkg_root.name == 'nbs':
    pkg_root = pkg_root.parent

test_data_dir = pkg_root / 'test_data'
batches_dir = test_data_dir / 'batches'
validation_dir = test_data_dir / 'validation'

Step 1: Load First Batch

Let’s inspect the first batch to understand the data structure:

batch_001 = pd.read_parquet(batches_dir / 'batch_001.parquet')

print(f"Batch 001 Info:")
print(f"  Observations: {len(batch_001):,}")
print(f"  Columns: {len(batch_001.columns)}")
print(f"  Memory: {batch_001.memory_usage(deep=True).sum() / 1024 / 1024:.1f} MB")
print(f"\n  Lon range: {batch_001['lon_center'].min():.2f}° to {batch_001['lon_center'].max():.2f}°")
print(f"  Lat range: {batch_001['lat_center'].min():.2f}° to {batch_001['lat_center'].max():.2f}°")

# Show spectral columns
spectral_cols = [col for col in batch_001.columns if col.startswith('r') and col[1:4].isdigit()]
print(f"\n  Spectral columns: {spectral_cols}")

batch_001.head(3)
Batch 001 Info:
  Observations: 3,029
  Columns: 61
  Memory: 4.5 MB

  Lon range: 175.00° to 176.00°
  Lat range: -74.63° to 74.38°

  Spectral columns: ['r310', 'r390', 'r750', 'r950', 'r1050', 'r1400', 'r415', 'r433_2', 'r479_9', 'r556_9', 'r628_8', 'r748_7', 'r828_4', 'r898_8', 'r996_2']
ref_id a b c d e f g h i ... lat_center lon_center surface width length ang_incidence ang_emission ang_phase azimuth geometry
0 0801419125400568 0 0 0 0 0 1 0 0 0 ... 1.069630 175.06512 54194396.0 5599.9224 12322.036 9.834717 73.333954 83.013830 154.51570 b'\x01\x03\x00\x00\x00\x01\x00\x00\x00\x05\x00...
1 0801419125400569 0 0 0 0 0 1 0 0 0 ... 1.121209 175.14010 54753300.0 5660.6910 12315.469 9.769003 73.381120 82.985110 154.42958 b'\x01\x03\x00\x00\x00\x01\x00\x00\x00\x05\x00...
2 0801419125400570 0 0 0 0 0 1 0 0 0 ... 1.173109 175.21364 54823684.0 5736.7520 12167.806 9.704690 73.427300 82.956314 154.33447 b'\x01\x03\x00\x00\x00\x01\x00\x00\x00\x05\x00...

3 rows × 61 columns

Step 2: Generate HEALPix Sidecar

For this example, we’ll use the existing healpix_sidecar.py script (will be converted to module later):

# Once healpix_sidecar is converted to a module, this will be:
# from healpyxel import sidecar
# sidecar_df = sidecar.generate(batch_001, nside=64, mode='fuzzy')

# For now, run the CLI:
print("Generate sidecar using CLI:")
print(f"  python healpix_sidecar.py --input {batches_dir / 'batch_001.parquet'} --nside 64 --mode fuzzy")
print("\nThis will create: batch_001.cell-healpix_assignment-fuzzy_nside-64_order-nested.parquet")
Generate sidecar using CLI:
  python healpix_sidecar.py --input /home/mariodamore/Documents/work/MESSENGER/MASCS_gn_dlr_processed/healpyxel/test_data/batches/batch_001.parquet --nside 64 --mode fuzzy

This will create: batch_001.cell-healpix_assignment-fuzzy_nside-64_order-nested.parquet

Step 3: Streaming Accumulation Workflow

Process multiple batches incrementally:

Conceptual Workflow

# Day 1: Initialize
from healpyxel import accumulator

state = accumulator.accumulate_batch(
    new_data=batch_001,
    sidecar=sidecar_001,
    value_columns=['r750', 'r950'],
    existing_state=None,  # First batch
    use_tdigest=True
)

# Day 2: Incremental update
state = accumulator.accumulate_batch(
    new_data=batch_002,
    sidecar=sidecar_002,
    value_columns=['r750', 'r950'],
    existing_state=state,  # Reuse previous state
    use_tdigest=True
)

# Finalize: Convert to statistics
from healpyxel import finalize

results = finalize.finalize_statistics(
    state=state,
    percentiles=[25, 50, 75],
    min_count=2
)

Step 4: Validation

Compare streaming results with batch processing:

# Load combined validation file
combined_file = validation_dir / 'combined_batch_001_003.parquet'

if combined_file.exists():
    combined = pd.read_parquet(combined_file)
    
    print("Validation Dataset:")
    print(f"  Combined (batches 1-3): {len(combined):,} obs")
    print(f"  Lon range: {combined['lon_center'].min():.2f}° to {combined['lon_center'].max():.2f}°")
    
    # Load individual batches
    batch_001_len = len(pd.read_parquet(batches_dir / 'batch_001.parquet'))
    batch_002_len = len(pd.read_parquet(batches_dir / 'batch_002.parquet'))
    batch_003_len = len(pd.read_parquet(batches_dir / 'batch_003.parquet'))
    
    total_individual = batch_001_len + batch_002_len + batch_003_len
    
    print(f"\n  Individual batches sum: {total_individual:,} obs")
    print(f"  Difference: {abs(len(combined) - total_individual)} obs")
    
    if len(combined) == total_individual:
        print("  ✓ Counts match perfectly!")
    else:
        print("  ⚠️  Count mismatch (may be due to filtering)")
else:
    print(f"⚠️  Validation file not found: {combined_file}")
Validation Dataset:
  Combined (batches 1-3): 10,890 obs
  Lon range: 175.00° to 178.00°

  Individual batches sum: 10,890 obs
  Difference: 0 obs
  ✓ Counts match perfectly!

Step 5: Performance Metrics

Compare memory usage between approaches:

import sys

# Memory for batch processing (loading all data at once)
if combined_file.exists():
    combined = pd.read_parquet(combined_file)
    batch_memory = combined.memory_usage(deep=True).sum() / 1024 / 1024
    
    print("Memory Comparison:")
    print(f"  Batch processing: {batch_memory:.1f} MB (load all data)")
    print(f"  Streaming: ~5-10 MB per batch + state file (~10-20 MB)")
    print(f"\n  Memory savings: ~{batch_memory - 20:.1f} MB")
    print(f"  Efficiency: {20 / batch_memory * 100:.1f}% of batch memory")
Memory Comparison:
  Batch processing: 16.0 MB (load all data)
  Streaming: ~5-10 MB per batch + state file (~10-20 MB)

  Memory savings: ~-4.0 MB
  Efficiency: 124.7% of batch memory

Summary

This notebook demonstrates:

  1. ✅ Loading test batch data
  2. ✅ Understanding data structure
  3. ⏳ Sidecar generation (to be implemented)
  4. ⏳ Streaming accumulation (to be implemented)
  5. ⏳ Finalization to statistics (to be implemented)
  6. ✅ Validation approach
  7. ✅ Performance benefits

Next steps: - Convert healpix_*.py scripts to nbdev modules - Implement Python API for all functions - Add comprehensive tests - Create more example notebooks

  • Report an issue