Healpyxel
  • Home
  • Quickstart
  • Source Code
  • Report a Bug
  1. Examples
  2. Streaming Accumulation - WIP!
  • HealPyxel
  • Examples
    • Quickstart
    • Complete workflow
    • Gaussian PSF - WIP!
    • Streaming Accumulation - WIP!
    • Streaming - WIP!
  • API Reference
    • Package Structure
    • HEALPix Sidecar
    • HEALPix Aggregate
    • Accumulator
    • Usage Example
    • Generate HEALPix sidecar
    • Optional Dependencies
    • Development: opportunistic cache use (default)

On this page

  • Test Data Discovery
  • Step 1: Create Sidecar for Each Batch
  • Step 2: Aggregate Data by HEALPix Cells
  • Step 3: Apply healpyxel.accumulate on Raw Data
  • Step 4: Load accumulated state and extract statistics for comparison
  • Summary
    • For Large Datasets
    • Next Steps
  • Report an issue

Other Formats

  • CommonMark
  1. Examples
  2. Streaming Accumulation - WIP!

Streaming Accumulation - WIP!

print(f"healpyxel version: {healpyxel.__version__}")
healpyxel version: 0.1.0

Test Data Discovery

Find available batch data files:

# # Look for test data
test_data_dir = Path('../test_data')
batches_dir = test_data_dir / 'batches'

if batches_dir.exists():
    batch_files = sorted(batches_dir.glob('batch_*.parquet'))
    print(f"Found {len(batch_files)} batch files!")
    print(f"\nFirst few batches:")
    for f in batch_files:
        size_mb = f.stat().st_size / 1024 / 1024
        print(f"  {f.name}: {size_mb:.2f} MB")
else:
    print("Batch data not found. Run create_test_data.sh to generate test data.")
    batch_files = []
Batch data not found. Run create_test_data.sh to generate test data.
from healpyxel.geospatial import is_geometry_valid

batch_files_loaded = [
    gdf[gdf['geometry'].apply(is_geometry_valid)]
    for gdf in (gpd.read_parquet(f) for f in batch_files)
]

import rich 
print("Number of geometries in each batch:")
rich.print({k.stem : len(gdf) for k, gdf in zip(batch_files, batch_files_loaded)})
Number of geometries in each batch:
{}
fig, ax = plt.subplots(figsize=(20, 10))
colors = plt.cm.tab10.colors  # Use a colormap for distinct colors
for i, gdf in enumerate(batch_files_loaded):
    gdf.plot(ax=ax, color=colors[i % len(colors)], linewidth=3)
    # axs[i].set_xlim(150, 180)
ax.set_aspect(0.25)  # Ensure the aspect ratio of the figure is respected

# fig, axs = plt.subplots(nrows=2,ncols=5, figsize=(20, 10))
# colors = plt.cm.tab10.colors  # Use a colormap for distinct colors
# axs = axs.flatten()  # Flatten the 2D array of axes for easy iteration
# for i, gdf in enumerate(batch_files_loaded):
#     gdf.plot(ax=axs[i], color=colors[i % len(colors)], linewidth=3)
#     # axs[i].set_xlim(150, 180)
#     axs[i].set_aspect(0.25)  # Ensure the aspect ratio of the figure is respected
#     axs[i].set_title(f"{batch_files[i].stem} (n={len(gdf)})")

# plt.tight_layout()

Step 1: Create Sidecar for Each Batch

Process each batch file to create HEALPix assignments:

# Configuration
nside = 32  # HEALPix resolution
mode = 'fuzzy'  # Assignment mode
lon_convention = '0_360'  # Longitude range of input data

# Process first few batches as demonstration (adjust as needed)
n_batches_to_process = len(batch_files_loaded)

sidecars = []
for i, gdf in enumerate(batch_files_loaded[:n_batches_to_process]):
    print(f"\n[{i+1}/{n_batches_to_process}] Processing batch {i+1}...")

    # Ensure the GeoDataFrame has valid geometries
    if gdf.empty or 'geometry' not in gdf.columns:
        print(f"  ⚠️  No valid geometry column found in batch {i+1}, skipping...")
        continue

    # Create sidecar
    sidecar = process_partition(
        gdf=gdf,
        nside=nside,
        mode=mode,
        base_index=0,
        lon_convention=lon_convention  # Use lon_convention instead of lat_min/lat_max
    )
    
    # Store with batch info
    sidecar['batch'] = i
    sidecars.append(sidecar)
    
    print(f"  ✓ Created {len(sidecar)} assignments from {len(gdf)} geometries")
    print(f"    Unique cells: {sidecar['healpix_id'].nunique()}")

[1/10] Processing batch 1...
  ✓ Created 3539 assignments from 3027 geometries
    Unique cells: 157

[2/10] Processing batch 2...
  ✓ Created 4782 assignments from 4124 geometries
    Unique cells: 140

[3/10] Processing batch 3...
  ✓ Created 4304 assignments from 3734 geometries
    Unique cells: 151

[4/10] Processing batch 4...
  ✓ Created 4565 assignments from 3989 geometries
    Unique cells: 156

[5/10] Processing batch 5...
  ✓ Created 5308 assignments from 4501 geometries
    Unique cells: 160

[6/10] Processing batch 6...
  ✓ Created 4552 assignments from 3885 geometries
    Unique cells: 160

[7/10] Processing batch 7...
  ✓ Created 4874 assignments from 4174 geometries
    Unique cells: 160

[8/10] Processing batch 8...
  ✓ Created 5205 assignments from 4479 geometries
    Unique cells: 147

[9/10] Processing batch 9...
  ✓ Created 5134 assignments from 4416 geometries
    Unique cells: 143

[10/10] Processing batch 10...
  ✓ Created 5632 assignments from 4931 geometries
    Unique cells: 155
# Combine all sidecars into a single DataFrame
if not sidecars:
    raise RuntimeError("No sidecars created. Ensure test_data/batches/ exists with batch_*.parquet files.")

combined_sidecar = pd.concat(sidecars, ignore_index=True)
print(f"\nCombined sidecar assignments: {len(combined_sidecar)} rows")
print(f"Unique cells in combined sidecar: {combined_sidecar['healpix_id'].nunique()}")
print(f"Columns in combined sidecar: {list(combined_sidecar.columns)}")

Combined sidecar assignments: 47895 rows
Unique cells in combined sidecar: 458
Columns in combined sidecar: ['source_id', 'healpix_id', 'weight', 'batch']

Step 2: Aggregate Data by HEALPix Cells

Now aggregate the original data values by HEALPix cell using the sidecar:

# Reload and combine data from processed batches (drop geometry for aggregation)
combined_data_list = []

for i, gdf in enumerate(batch_files_loaded[:n_batches_to_process]):
    print(f"\n[{i+1}/{n_batches_to_process}] Processing batch {i+1}...")

    # Drop geometry column if present (not needed for aggregation)
    if 'geometry' in gdf.columns:
        gdf = gdf.drop(columns=['geometry'])
    
    # Add source_id matching the sidecar creation
    gdf['source_id'] = gdf.index.astype(np.int64)
    gdf['batch'] = i
    combined_data_list.append(gdf)

combined_data = pd.concat(combined_data_list, ignore_index=True)
print(f"\nCombined data from {len(combined_data_list)} batches: {len(combined_data)} records")
print(f"Columns in combined data: {list(combined_data.columns)}")
# Choose a value column to aggregate (adjust based on your data)
# Common columns in MESSENGER MASCS data: r1050, r750, etc.
value_columns = ['r1050'] if 'r1050' in combined_data.columns else []

if not value_columns:
    # Try to find numeric columns
    numeric_cols = combined_data.select_dtypes(include=[np.number]).columns.tolist()
    # Exclude id and coordinate columns
    exclude = ['source_id', 'batch', 'lat_center', 'lon_center', 'lat', 'lon', 'latitude', 'longitude']
    value_columns = [c for c in numeric_cols if c not in exclude][:1]  # Take first suitable column

print(f"Aggregating column(s): {value_columns}")

if value_columns and sidecars:
    # Perform aggregation
    aggregated = aggregate_by_sidecar(
        original=combined_data,
        sidecar=combined_sidecar,
        value_columns=value_columns,
        aggs=['mean', 'median', 'std', 'mad', 'robust_std'],
        source_id_col='source_id',
        healpix_col='healpix_id',
        min_count=1,
       sentinel_threshold=1e30
    )
    
    print(f"\nAggregated {len(aggregated)} HEALPix cells")
    print(f"Columns: {list(aggregated.columns)}")
    display(aggregated.head(10))
else:
    print("Cannot aggregate: no suitable value columns or no sidecars created")
# Step 5a: Enrich Aggregated Data with HEALPix Cell Geometries

from healpyxel.geospatial import healpix_to_geodataframe

print("=" * 70)
print("STEP 5a: Enriching Aggregated Data with Cell Geometries")
print("=" * 70)

# Extract unique HEALPix cell IDs from aggregated results (data-driven, not hardcoded)
print("\n1. Extracting HEALPix cell IDs from aggregated data...")
cell_ids_in_aggregated = aggregated.index.unique() # healpix_id is the index
print(f"   ✓ Found {len(cell_ids_in_aggregated)} unique cells in aggregated data")

# Fetch cell geometries from HEALPix grid
print(f"\n2. Fetching cell geometries (nside={nside}, order='nested')...")
gdf_cells = healpix_to_geodataframe(
    nside=nside,
    order='nested',
    lon_convention=lon_convention,
    pixels=np.array(cell_ids_in_aggregated, dtype=np.int64),  # ← Actual cells from data
    fix_antimeridian=True,
    cache_mode='use'
).reset_index(drop=False)

print(f"   ✓ Retrieved {len(gdf_cells)} cell geometries")
print(f"   Columns: {list(gdf_cells.columns)}")
print(f"   Example: {gdf_cells.iloc[0][['healpix_id', 'geometry']]}")

# Merge aggregated statistics with cell geometries
print(f"\n3. Merging aggregated statistics with cell geometries...")
aggregated_with_geoms = aggregated.reset_index(drop=False).merge(
    gdf_cells[['healpix_id', 'geometry']],
    on='healpix_id',
    how='left'
)

print(f"   ✓ Merged {len(aggregated_with_geoms)} rows")
print(f"   Columns: {list(aggregated_with_geoms.columns)}")

# Verify no NaN geometries (sanity check)
nan_geoms = aggregated_with_geoms['geometry'].isna().sum()
if nan_geoms > 0:
    print(f"   ⚠️  Warning: {nan_geoms} rows have missing geometries (cells not found)")
else:
    print(f"   ✓ All cell geometries present")

# Convert to GeoDataFrame for spatial operations
aggregated_gdf = gpd.GeoDataFrame(
    aggregated_with_geoms,
    geometry='geometry',
    crs='EPSG:4326'  # WGS84, standard for lon/lat
)

print(f"\n4. Result as GeoDataFrame:")
print(f"   Shape: {aggregated_gdf.shape}")
print(f"   Geometry type: {aggregated_gdf.geometry.type.unique()}")
print(f"   CRS: {aggregated_gdf.crs}")
print(f"\n   First row:")
print(aggregated_gdf.iloc[0])

# Optional: Save enriched GeoDataFrame
enriched_output_path = Path("/tmp/aggregated_with_cells.parquet")
aggregated_gdf.to_parquet(enriched_output_path)
print(f"\n   ✓ Saved enriched data to {enriched_output_path}")
# DIAGNOSTIC: Validate pre-requisites for Step 5a

print("=" * 70)
print("DIAGNOSTIC: Verifying prerequisites for Step 5a")
print("=" * 70)

# Check 1: Does aggregated exist?
print("\n1. Checking if aggregated exists...")
try:
    assert 'aggregated' in dir(), "aggregated not defined"
    print(f"   ✓ aggregated exists: {type(aggregated)}")
except AssertionError as e:
    print(f"   ✗ FAILED: {e}")
    print(f"   → Did you run the aggregation cell (Step 2 or Step 4)?")
    raise

# Check 2: Is it a DataFrame?
print(f"\n2. Checking aggregated type...")
try:
    assert isinstance(aggregated, pd.DataFrame), f"Expected DataFrame, got {type(aggregated)}"
    print(f"   ✓ Is DataFrame: shape={aggregated.shape}")
except AssertionError as e:
    print(f"   ✗ FAILED: {e}")
    raise

# Check 3: Does it have healpix_id column?
print(f"\n3. Checking for healpix_id column...")
# Check 3: Does it have healpix_id as index?
print(f"\n3. Checking for healpix_id index...")
try:
    assert aggregated.index.name == 'healpix_id', f"Index name is '{aggregated.index.name}', expected 'healpix_id'"
    print(f"   ✓ healpix_id is index: dtype={aggregated.index.dtype}")
except AssertionError as e:
    print(f"   ✗ FAILED: {e}")
    raise

# Check 4: Required value columns
print(f"\n4. Checking for value column statistics...")
for col in value_columns:
    for stat in ['mean', 'std']:
        stat_col = f'{col}_{stat}'
        try:
            assert stat_col in aggregated.columns, f"{stat_col} missing"
        except AssertionError as e:
            print(f"   ✗ FAILED: {e}")
            print(f"   Available columns: {list(aggregated.columns)}")
            raise
print(f"   ✓ All required statistic columns present")

# Check 5: Data sanity
print(f"\n5. Checking data sanity...")
print(f"   Rows: {len(aggregated)}")
print(f"   Non-null healpix_id: {aggregated.index.notna().sum()}")
print(f"   Unique cells: {aggregated.index.nunique()}")
print(f"   ✓ Data looks reasonable")

print("\n" + "=" * 70)
print("✓ ALL PREREQUISITES VALIDATED")
print("=" * 70)

Step 3: Apply healpyxel.accumulate on Raw Data

# Step 3: Apply healpyxel.accumulate on Raw Data

from healpyxel.accumulator import accumulate_batch, save_state, load_state
from healpyxel.metadata import HEALPyxelxMetadata

# Configuration
state_output_dir = Path("/tmp/state/")
state_output_dir.mkdir(exist_ok=True)
value_columns = ['r1050']  # Adjust based on your data
use_tdigest = True  # Enable T-Digest for percentiles

# Initialize metadata for HEALPix grid
meta = HEALPyxelxMetadata(
    nside=nside,
    mode="nested",
    order="nested",
    npix=12 * nside**2,
    lon_convention=lon_convention
)

# Initialize accumulator state
accumulated_state = None
state_history = []  # Track growth of accumulation

# Process each BATCH OF RAW DATA incrementally
for i, gdf in enumerate(batch_files_loaded[:n_batches_to_process]):
    print(f"\n[{i+1}/{n_batches_to_process}] Processing batch {i+1}...")
    
    # Get the sidecar for THIS batch
    batch_sidecar = sidecars[i]  # Sidecar mapping for this batch
    
    # Get the raw DATA for THIS batch
    # IMPORTANT: Set index to source_id BEFORE passing to accumulate_batch
    batch_data = gdf.drop(columns=['geometry']).copy()
    batch_data.index = batch_data.index.astype(np.int64)  # ← Set index, don't add column
    batch_data.index.name = 'source_id'  # ← Name the index for clarity
    
    # Accumulate statistics from raw data
    accumulated_state = accumulate_batch(
        new_data=batch_data,           # ← Index will be converted to source_id column
        sidecar=batch_sidecar,         # ← Sidecar mapping for this batch
        value_columns=value_columns,
        existing_state=accumulated_state,  # ← Previous state (None for first batch)
        use_tdigest=use_tdigest
    )
    
    # Track state growth
    state_history.append({
        'batch': i + 1,
        'n_cells': len(accumulated_state),
        'observations': sum(
            acc.stats_by_column[value_columns[0]].n 
            for acc in accumulated_state.values()
            if value_columns[0] in acc.stats_by_column
        )
    })
    
    # Save intermediate state
    state_output_path = state_output_dir / f"state_v{i+1:03d}.parquet"
    save_state(
        state=accumulated_state,
        output_path=state_output_path,
        meta=meta,
        processing_metadata={
            "batch_id": f"batch_{i+1}",
            "columns": value_columns,
            "n_cells": len(accumulated_state)
        }
    )
    print(f"  ✓ Accumulated state: {len(accumulated_state)} cells, "
          f"{state_history[-1]['observations']:,} observations")
    print(f"  ✓ Saved state to {state_output_path}")

# Print accumulation growth
print("\n" + "="*60)
print("Accumulation Growth Summary:")
print("="*60)
for record in state_history:
    print(f"Batch {record['batch']:2d}: {record['n_cells']:5d} cells, "
          f"{record['observations']:8,d} observations")

print(f"\nFinal accumulated state: {len(accumulated_state)} cells")

Step 4: Load accumulated state and extract statistics for comparison

# Step 4: Load accumulated state and extract statistics for comparison

print("=" * 70)
print("STEP 4: Approximate (Streaming) vs Full Aggregation Comparison")
print("=" * 70)

# Reload the final accumulated state from disk
state_path = state_output_dir / f"state_v{n_batches_to_process:03d}.parquet"
print(f"\n1. Loading accumulated state from {state_path.name}...")

accumulated_state_loaded, state_meta = load_state(state_path, use_tdigest=True)
print(f"   ✓ Loaded {len(accumulated_state_loaded)} cells")

# Convert accumulated state (Dict[int, CellAccumulator]) → DataFrame for comparison
print(f"\n2. Extracting statistics from StreamingStats...")

accumulated_stats_rows = []
for healpix_id, accumulator in accumulated_state_loaded.items():
    row = {'healpix_id': int(healpix_id)}
    
    # Extract per-column statistics
    for col in value_columns:
        if col in accumulator.stats_by_column:
            stats = accumulator.stats_by_column[col]
            row[f'{col}_mean'] = stats.mean
            row[f'{col}_std'] = stats.std
            row[f'{col}_n'] = stats.n
            row[f'{col}_min'] = stats.min_val if np.isfinite(stats.min_val) else np.nan
            row[f'{col}_max'] = stats.max_val if np.isfinite(stats.max_val) else np.nan
    
    accumulated_stats_rows.append(row)

accumulated_df = pd.DataFrame(accumulated_stats_rows)
print(f"   ✓ Extracted {len(accumulated_df)} cells with statistics")
print(f"   Columns: {list(accumulated_df.columns)}")

# Perform full batch aggregation for ground truth
print(f"\n3. Computing full batch aggregation (ground truth)...")

full_aggregated = aggregate_by_sidecar(
    original=combined_data,
    sidecar=combined_sidecar,
    value_columns=value_columns,
    aggs=['mean', 'median', 'std', 'min', 'max', 'mad', 'robust_std'],
    source_id_col='source_id',
    healpix_col='healpix_id',
    min_count=1,
    sentinel_threshold=1e30
)
print(f"   ✓ Aggregated {len(full_aggregated)} cells")
print(f"   Columns: {list(full_aggregated.columns)}")

# Merge for comparison (inner join on healpix_id)
print(f"\n4. Merging for comparison...")

# Rename accumulated columns for clarity
accumulated_df_renamed = accumulated_df.copy()
for col in value_columns:
    accumulated_df_renamed = accumulated_df_renamed.rename(columns={
        f'{col}_mean': f'{col}_approx_mean',
        f'{col}_std': f'{col}_approx_std',
        f'{col}_n': f'{col}_approx_n',
    })

# Inner join on healpix_id
comparison = accumulated_df_renamed.merge(
    full_aggregated,
    on='healpix_id',
    how='inner',
    suffixes=('_approx', '_full')
)
print(f"   ✓ {len(comparison)} cells matched between methods")

# Statistical comparison
print(f"\n" + "=" * 70)
print("COMPARISON RESULTS")
print("=" * 70)

for col in value_columns:
    approx_mean_col = f'{col}_approx_mean'
    full_mean_col = f'{col}_mean'
    approx_n_col = f'{col}_approx_n'
    
    if approx_mean_col not in comparison.columns or full_mean_col not in comparison.columns:
        print(f"\n⚠️  Column {col}: Missing in comparison (skipped)")
        continue
    
    print(f"\n{col}:")
    
    # Observation count validation
    print(f"  Observation count:")
    print(f"    Approx (StreamingStats): {comparison[approx_n_col].sum():.0f} total")
    print(f"    Full (n_sources):        {comparison['n_sources'].sum():.0f} total")
    count_match = np.isclose(comparison[approx_n_col].sum(), comparison['n_sources'].sum())
    print(f"    {'✓ Counts match' if count_match else '✗ MISMATCH DETECTED'}")
    
    # Mean comparison
    print(f"  Mean comparison:")
    print(f"    Approximate mean:  {comparison[approx_mean_col].mean():.6f} (std: {comparison[approx_mean_col].std():.6f})")
    print(f"    Full aggregation:  {comparison[full_mean_col].mean():.6f} (std: {comparison[full_mean_col].std():.6f})")
    
    # Compute per-cell differences
    mean_diff = comparison[approx_mean_col] - comparison[full_mean_col]
    mean_pct_diff = np.abs(mean_diff) / np.abs(comparison[full_mean_col] + 1e-10) * 100
    
    print(f"  Absolute difference: {mean_diff.mean():.6e} (max: {mean_diff.abs().max():.6e})")
    print(f"  % difference:        {mean_pct_diff.mean():.4f}% (max: {mean_pct_diff.max():.4f}%)")
    
    # Show top discrepancies
    top_disc = mean_pct_diff.nlargest(3)
    if len(top_disc) > 0:
        print(f"  Top 3 discrepancies:")
        for idx in top_disc.index:
            cell_id = comparison.iloc[idx]['healpix_id']
            approx_val = comparison.iloc[idx][approx_mean_col]
            full_val = comparison.iloc[idx][full_mean_col]
            n_obs = comparison.iloc[idx]['n_sources']
            pct = mean_pct_diff.iloc[idx]
            print(f"    Cell {cell_id}: approx={approx_val:.6f}, full={full_val:.6f}, "
                  f"diff={pct:.4f}% (n_sources={n_obs:.0f})")

print("\n" + "=" * 70)
print("✓ COMPARISON COMPLETE")
print("=" * 70)
# Step 5: Visualize Results (Approximate vs Full)

import healpy as hp
import matplotlib.pyplot as plt

print("=" * 70)
print("STEP 5: Visualization - Approximate vs Full Aggregation")
print("=" * 70)

# Ensure healpix_id is a column (not index) in both DataFrames
accumulated_df_clean = accumulated_df_renamed.reset_index(drop=True)
full_agg_clean = comparison.reset_index(drop=True)

print(f"\n1. Verifying schema...")
if 'healpix_id' not in accumulated_df_clean.columns:
    raise ValueError("healpix_id missing from accumulated_df_clean")
if 'healpix_id' not in full_agg_clean.columns:
    raise ValueError("healpix_id missing from full_agg_clean")
print(f"   ✓ Schema validated")

# Create HEALPix maps for both methods
nside = state_meta.nside
npix = hp.nside2npix(nside)

print(f"\n2. Creating HEALPix maps (nside={nside}, npix={npix})...")

# Method 1: Approximate (StreamingStats)
approx_map = np.full(npix, hp.UNSEEN, dtype=np.float64)
for col in value_columns:
    approx_col = f'{col}_approx_mean'
    if approx_col in accumulated_df_clean.columns:
        healpix_ids = accumulated_df_clean['healpix_id'].values.astype(int)
        means = accumulated_df_clean[approx_col].values
        
        # Validate indices
        if np.any((healpix_ids < 0) | (healpix_ids >= npix)):
            bad_indices = healpix_ids[(healpix_ids < 0) | (healpix_ids >= npix)]
            raise ValueError(f"Invalid healpix_id values (out of range [0, {npix})): {bad_indices[:5]}")
        
        approx_map[healpix_ids] = means
        print(f"   ✓ Populated {np.sum(approx_map != hp.UNSEEN):.0f} cells in approx_map")
        break

# Method 2: Full Aggregation
full_map = np.full(npix, hp.UNSEEN, dtype=np.float64)
for col in value_columns:
    full_col = f'{col}_mean'
    if full_col in full_agg_clean.columns:
        healpix_ids = full_agg_clean['healpix_id'].values.astype(int)
        means = full_agg_clean[full_col].values
        
        # Validate indices
        if np.any((healpix_ids < 0) | (healpix_ids >= npix)):
            bad_indices = healpix_ids[(healpix_ids < 0) | (healpix_ids >= npix)]
            raise ValueError(f"Invalid healpix_id values (out of range [0, {npix})): {bad_indices[:5]}")
        
        full_map[healpix_ids] = means
        print(f"   ✓ Populated {np.sum(full_map != hp.UNSEEN):.0f} cells in full_map")
        break

# Compute difference map
print(f"\n3. Computing difference map...")
diff_map = approx_map.copy()
valid_mask = (approx_map != hp.UNSEEN) & (full_map != hp.UNSEEN)
diff_map[~valid_mask] = hp.UNSEEN
diff_map[valid_mask] = approx_map[valid_mask] - full_map[valid_mask]
print(f"   ✓ Difference map has {np.sum(diff_map != hp.UNSEEN):.0f} valid cells")

# Render three separate figures (healpy limitation workaround)
print(f"\n4. Rendering visualizations...")

# Approximate map
print("   Rendering approx_map...")
hp.mollview(approx_map, title="Approximate (StreamingStats)", unit=f"{value_columns[0]}")
# plt.savefig('comparison_approx.png', dpi=150, bbox_inches='tight')
# plt.close()

# Full aggregation map
print("   Rendering full_map...")
hp.mollview(full_map, title="Full Aggregation", unit=f"{value_columns[0]}")
# plt.savefig('comparison_full.png', dpi=150, bbox_inches='tight')
# plt.close()

# Difference map
print("   Rendering diff_map...")
hp.mollview(diff_map, title="Difference (Approx - Full)", unit=f"Δ{value_columns[0]}")
# plt.savefig('comparison_diff.png', dpi=150, bbox_inches='tight')
# plt.close()

# print("\n✓ Saved comparison visualizations:")
# print("  - comparison_approx.png")
# print("  - comparison_full.png")
# print("  - comparison_diff.png")

# # Optional: Create a composite image with PIL (if available)
# try:
#     from PIL import Image
#     print(f"\n5. Creating composite image...")
    
#     approx_img = Image.open('comparison_approx.png')
#     full_img = Image.open('comparison_full.png')
#     diff_img = Image.open('comparison_diff.png')
    
#     # Resize to same height for horizontal concat
#     target_height = 600
#     approx_img = approx_img.resize((int(approx_img.width * target_height / approx_img.height), target_height))
#     full_img = full_img.resize((int(full_img.width * target_height / full_img.height), target_height))
#     diff_img = diff_img.resize((int(diff_img.width * target_height / diff_img.height), target_height))
    
#     # Concatenate horizontally
#     total_width = approx_img.width + full_img.width + diff_img.width
#     max_height = target_height
#     composite = Image.new('RGB', (total_width, max_height))
#     composite.paste(approx_img, (0, 0))
#     composite.paste(full_img, (approx_img.width, 0))
#     composite.paste(diff_img, (approx_img.width + full_img.width, 0))
    
#     composite.save('comparison_maps_composite.png', dpi=(150, 150))
#     print(f"   ✓ Saved composite: comparison_maps_composite.png")
    
# except ImportError:
#     print(f"   ⚠️  PIL not available; skipping composite image creation")

# print("\n" + "=" * 70)
# print("✓ VISUALIZATION COMPLETE")
# print("=" * 70)

Summary

This notebook demonstrated the complete healpyxel workflow:

  1. Sidecar Creation: Assigned HEALPix cells to geometries using process_partition()
  2. Aggregation: Computed statistics per cell using aggregate_by_sidecar()
  3. Visualization: Displayed the final HEALPix map

For Large Datasets

For processing many batch files efficiently, use the CLI tools:

# Process all batches with sidecar creation
for batch in test_data/batches/batch_*.parquet; do
    healpyxel_sidecar --input "$batch" \
                      --nside 32 64 \
                      --mode fuzzy \
                      --lon-convention 0_360 \
                      --ncores 4
done

# Aggregate all data (process each batch with its sidecar)
for batch in test_data/batches/batch_*.parquet; do
    healpyxel_aggregate --input "$batch" \
                        --sidecar-dir test_data/batches/ \
                        --sidecar-index all \
                        --aggregate \
                        --columns r1050 \
                        --aggs mean median std
done

# Finalize maps (from accumulator state, not aggregate output)
healpyxel_finalize --state state_v010.parquet \
                   --output final_mosaic.parquet \
                   --nside 32

Next Steps

  • Experiment with different nside values for different resolutions
  • Try both ‘strict’ and ‘fuzzy’ assignment modes
  • Aggregate multiple value columns simultaneously
  • Use finalize_maps() to create production-ready map products
  • Report an issue