print(f"healpyxel version: {healpyxel.__version__}")healpyxel version: 0.1.0
Find available batch data files:
# # Look for test data
test_data_dir = Path('../test_data')
batches_dir = test_data_dir / 'batches'
if batches_dir.exists():
batch_files = sorted(batches_dir.glob('batch_*.parquet'))
print(f"Found {len(batch_files)} batch files!")
print(f"\nFirst few batches:")
for f in batch_files:
size_mb = f.stat().st_size / 1024 / 1024
print(f" {f.name}: {size_mb:.2f} MB")
else:
print("Batch data not found. Run create_test_data.sh to generate test data.")
batch_files = []Batch data not found. Run create_test_data.sh to generate test data.
from healpyxel.geospatial import is_geometry_valid
batch_files_loaded = [
gdf[gdf['geometry'].apply(is_geometry_valid)]
for gdf in (gpd.read_parquet(f) for f in batch_files)
]
import rich
print("Number of geometries in each batch:")
rich.print({k.stem : len(gdf) for k, gdf in zip(batch_files, batch_files_loaded)})Number of geometries in each batch:
{}
fig, ax = plt.subplots(figsize=(20, 10))
colors = plt.cm.tab10.colors # Use a colormap for distinct colors
for i, gdf in enumerate(batch_files_loaded):
gdf.plot(ax=ax, color=colors[i % len(colors)], linewidth=3)
# axs[i].set_xlim(150, 180)
ax.set_aspect(0.25) # Ensure the aspect ratio of the figure is respected
# fig, axs = plt.subplots(nrows=2,ncols=5, figsize=(20, 10))
# colors = plt.cm.tab10.colors # Use a colormap for distinct colors
# axs = axs.flatten() # Flatten the 2D array of axes for easy iteration
# for i, gdf in enumerate(batch_files_loaded):
# gdf.plot(ax=axs[i], color=colors[i % len(colors)], linewidth=3)
# # axs[i].set_xlim(150, 180)
# axs[i].set_aspect(0.25) # Ensure the aspect ratio of the figure is respected
# axs[i].set_title(f"{batch_files[i].stem} (n={len(gdf)})")
# plt.tight_layout()Process each batch file to create HEALPix assignments:
# Configuration
nside = 32 # HEALPix resolution
mode = 'fuzzy' # Assignment mode
lon_convention = '0_360' # Longitude range of input data
# Process first few batches as demonstration (adjust as needed)
n_batches_to_process = len(batch_files_loaded)
sidecars = []
for i, gdf in enumerate(batch_files_loaded[:n_batches_to_process]):
print(f"\n[{i+1}/{n_batches_to_process}] Processing batch {i+1}...")
# Ensure the GeoDataFrame has valid geometries
if gdf.empty or 'geometry' not in gdf.columns:
print(f" ⚠️ No valid geometry column found in batch {i+1}, skipping...")
continue
# Create sidecar
sidecar = process_partition(
gdf=gdf,
nside=nside,
mode=mode,
base_index=0,
lon_convention=lon_convention # Use lon_convention instead of lat_min/lat_max
)
# Store with batch info
sidecar['batch'] = i
sidecars.append(sidecar)
print(f" ✓ Created {len(sidecar)} assignments from {len(gdf)} geometries")
print(f" Unique cells: {sidecar['healpix_id'].nunique()}")
[1/10] Processing batch 1...
✓ Created 3539 assignments from 3027 geometries
Unique cells: 157
[2/10] Processing batch 2...
✓ Created 4782 assignments from 4124 geometries
Unique cells: 140
[3/10] Processing batch 3...
✓ Created 4304 assignments from 3734 geometries
Unique cells: 151
[4/10] Processing batch 4...
✓ Created 4565 assignments from 3989 geometries
Unique cells: 156
[5/10] Processing batch 5...
✓ Created 5308 assignments from 4501 geometries
Unique cells: 160
[6/10] Processing batch 6...
✓ Created 4552 assignments from 3885 geometries
Unique cells: 160
[7/10] Processing batch 7...
✓ Created 4874 assignments from 4174 geometries
Unique cells: 160
[8/10] Processing batch 8...
✓ Created 5205 assignments from 4479 geometries
Unique cells: 147
[9/10] Processing batch 9...
✓ Created 5134 assignments from 4416 geometries
Unique cells: 143
[10/10] Processing batch 10...
✓ Created 5632 assignments from 4931 geometries
Unique cells: 155
# Combine all sidecars into a single DataFrame
if not sidecars:
raise RuntimeError("No sidecars created. Ensure test_data/batches/ exists with batch_*.parquet files.")
combined_sidecar = pd.concat(sidecars, ignore_index=True)
print(f"\nCombined sidecar assignments: {len(combined_sidecar)} rows")
print(f"Unique cells in combined sidecar: {combined_sidecar['healpix_id'].nunique()}")
print(f"Columns in combined sidecar: {list(combined_sidecar.columns)}")
Combined sidecar assignments: 47895 rows
Unique cells in combined sidecar: 458
Columns in combined sidecar: ['source_id', 'healpix_id', 'weight', 'batch']
Now aggregate the original data values by HEALPix cell using the sidecar:
# Reload and combine data from processed batches (drop geometry for aggregation)
combined_data_list = []
for i, gdf in enumerate(batch_files_loaded[:n_batches_to_process]):
print(f"\n[{i+1}/{n_batches_to_process}] Processing batch {i+1}...")
# Drop geometry column if present (not needed for aggregation)
if 'geometry' in gdf.columns:
gdf = gdf.drop(columns=['geometry'])
# Add source_id matching the sidecar creation
gdf['source_id'] = gdf.index.astype(np.int64)
gdf['batch'] = i
combined_data_list.append(gdf)
combined_data = pd.concat(combined_data_list, ignore_index=True)
print(f"\nCombined data from {len(combined_data_list)} batches: {len(combined_data)} records")
print(f"Columns in combined data: {list(combined_data.columns)}")# Choose a value column to aggregate (adjust based on your data)
# Common columns in MESSENGER MASCS data: r1050, r750, etc.
value_columns = ['r1050'] if 'r1050' in combined_data.columns else []
if not value_columns:
# Try to find numeric columns
numeric_cols = combined_data.select_dtypes(include=[np.number]).columns.tolist()
# Exclude id and coordinate columns
exclude = ['source_id', 'batch', 'lat_center', 'lon_center', 'lat', 'lon', 'latitude', 'longitude']
value_columns = [c for c in numeric_cols if c not in exclude][:1] # Take first suitable column
print(f"Aggregating column(s): {value_columns}")
if value_columns and sidecars:
# Perform aggregation
aggregated = aggregate_by_sidecar(
original=combined_data,
sidecar=combined_sidecar,
value_columns=value_columns,
aggs=['mean', 'median', 'std', 'mad', 'robust_std'],
source_id_col='source_id',
healpix_col='healpix_id',
min_count=1,
sentinel_threshold=1e30
)
print(f"\nAggregated {len(aggregated)} HEALPix cells")
print(f"Columns: {list(aggregated.columns)}")
display(aggregated.head(10))
else:
print("Cannot aggregate: no suitable value columns or no sidecars created")# Step 5a: Enrich Aggregated Data with HEALPix Cell Geometries
from healpyxel.geospatial import healpix_to_geodataframe
print("=" * 70)
print("STEP 5a: Enriching Aggregated Data with Cell Geometries")
print("=" * 70)
# Extract unique HEALPix cell IDs from aggregated results (data-driven, not hardcoded)
print("\n1. Extracting HEALPix cell IDs from aggregated data...")
cell_ids_in_aggregated = aggregated.index.unique() # healpix_id is the index
print(f" ✓ Found {len(cell_ids_in_aggregated)} unique cells in aggregated data")
# Fetch cell geometries from HEALPix grid
print(f"\n2. Fetching cell geometries (nside={nside}, order='nested')...")
gdf_cells = healpix_to_geodataframe(
nside=nside,
order='nested',
lon_convention=lon_convention,
pixels=np.array(cell_ids_in_aggregated, dtype=np.int64), # ← Actual cells from data
fix_antimeridian=True,
cache_mode='use'
).reset_index(drop=False)
print(f" ✓ Retrieved {len(gdf_cells)} cell geometries")
print(f" Columns: {list(gdf_cells.columns)}")
print(f" Example: {gdf_cells.iloc[0][['healpix_id', 'geometry']]}")
# Merge aggregated statistics with cell geometries
print(f"\n3. Merging aggregated statistics with cell geometries...")
aggregated_with_geoms = aggregated.reset_index(drop=False).merge(
gdf_cells[['healpix_id', 'geometry']],
on='healpix_id',
how='left'
)
print(f" ✓ Merged {len(aggregated_with_geoms)} rows")
print(f" Columns: {list(aggregated_with_geoms.columns)}")
# Verify no NaN geometries (sanity check)
nan_geoms = aggregated_with_geoms['geometry'].isna().sum()
if nan_geoms > 0:
print(f" ⚠️ Warning: {nan_geoms} rows have missing geometries (cells not found)")
else:
print(f" ✓ All cell geometries present")
# Convert to GeoDataFrame for spatial operations
aggregated_gdf = gpd.GeoDataFrame(
aggregated_with_geoms,
geometry='geometry',
crs='EPSG:4326' # WGS84, standard for lon/lat
)
print(f"\n4. Result as GeoDataFrame:")
print(f" Shape: {aggregated_gdf.shape}")
print(f" Geometry type: {aggregated_gdf.geometry.type.unique()}")
print(f" CRS: {aggregated_gdf.crs}")
print(f"\n First row:")
print(aggregated_gdf.iloc[0])
# Optional: Save enriched GeoDataFrame
enriched_output_path = Path("/tmp/aggregated_with_cells.parquet")
aggregated_gdf.to_parquet(enriched_output_path)
print(f"\n ✓ Saved enriched data to {enriched_output_path}")# DIAGNOSTIC: Validate pre-requisites for Step 5a
print("=" * 70)
print("DIAGNOSTIC: Verifying prerequisites for Step 5a")
print("=" * 70)
# Check 1: Does aggregated exist?
print("\n1. Checking if aggregated exists...")
try:
assert 'aggregated' in dir(), "aggregated not defined"
print(f" ✓ aggregated exists: {type(aggregated)}")
except AssertionError as e:
print(f" ✗ FAILED: {e}")
print(f" → Did you run the aggregation cell (Step 2 or Step 4)?")
raise
# Check 2: Is it a DataFrame?
print(f"\n2. Checking aggregated type...")
try:
assert isinstance(aggregated, pd.DataFrame), f"Expected DataFrame, got {type(aggregated)}"
print(f" ✓ Is DataFrame: shape={aggregated.shape}")
except AssertionError as e:
print(f" ✗ FAILED: {e}")
raise
# Check 3: Does it have healpix_id column?
print(f"\n3. Checking for healpix_id column...")
# Check 3: Does it have healpix_id as index?
print(f"\n3. Checking for healpix_id index...")
try:
assert aggregated.index.name == 'healpix_id', f"Index name is '{aggregated.index.name}', expected 'healpix_id'"
print(f" ✓ healpix_id is index: dtype={aggregated.index.dtype}")
except AssertionError as e:
print(f" ✗ FAILED: {e}")
raise
# Check 4: Required value columns
print(f"\n4. Checking for value column statistics...")
for col in value_columns:
for stat in ['mean', 'std']:
stat_col = f'{col}_{stat}'
try:
assert stat_col in aggregated.columns, f"{stat_col} missing"
except AssertionError as e:
print(f" ✗ FAILED: {e}")
print(f" Available columns: {list(aggregated.columns)}")
raise
print(f" ✓ All required statistic columns present")
# Check 5: Data sanity
print(f"\n5. Checking data sanity...")
print(f" Rows: {len(aggregated)}")
print(f" Non-null healpix_id: {aggregated.index.notna().sum()}")
print(f" Unique cells: {aggregated.index.nunique()}")
print(f" ✓ Data looks reasonable")
print("\n" + "=" * 70)
print("✓ ALL PREREQUISITES VALIDATED")
print("=" * 70)# Step 3: Apply healpyxel.accumulate on Raw Data
from healpyxel.accumulator import accumulate_batch, save_state, load_state
from healpyxel.metadata import HEALPyxelxMetadata
# Configuration
state_output_dir = Path("/tmp/state/")
state_output_dir.mkdir(exist_ok=True)
value_columns = ['r1050'] # Adjust based on your data
use_tdigest = True # Enable T-Digest for percentiles
# Initialize metadata for HEALPix grid
meta = HEALPyxelxMetadata(
nside=nside,
mode="nested",
order="nested",
npix=12 * nside**2,
lon_convention=lon_convention
)
# Initialize accumulator state
accumulated_state = None
state_history = [] # Track growth of accumulation
# Process each BATCH OF RAW DATA incrementally
for i, gdf in enumerate(batch_files_loaded[:n_batches_to_process]):
print(f"\n[{i+1}/{n_batches_to_process}] Processing batch {i+1}...")
# Get the sidecar for THIS batch
batch_sidecar = sidecars[i] # Sidecar mapping for this batch
# Get the raw DATA for THIS batch
# IMPORTANT: Set index to source_id BEFORE passing to accumulate_batch
batch_data = gdf.drop(columns=['geometry']).copy()
batch_data.index = batch_data.index.astype(np.int64) # ← Set index, don't add column
batch_data.index.name = 'source_id' # ← Name the index for clarity
# Accumulate statistics from raw data
accumulated_state = accumulate_batch(
new_data=batch_data, # ← Index will be converted to source_id column
sidecar=batch_sidecar, # ← Sidecar mapping for this batch
value_columns=value_columns,
existing_state=accumulated_state, # ← Previous state (None for first batch)
use_tdigest=use_tdigest
)
# Track state growth
state_history.append({
'batch': i + 1,
'n_cells': len(accumulated_state),
'observations': sum(
acc.stats_by_column[value_columns[0]].n
for acc in accumulated_state.values()
if value_columns[0] in acc.stats_by_column
)
})
# Save intermediate state
state_output_path = state_output_dir / f"state_v{i+1:03d}.parquet"
save_state(
state=accumulated_state,
output_path=state_output_path,
meta=meta,
processing_metadata={
"batch_id": f"batch_{i+1}",
"columns": value_columns,
"n_cells": len(accumulated_state)
}
)
print(f" ✓ Accumulated state: {len(accumulated_state)} cells, "
f"{state_history[-1]['observations']:,} observations")
print(f" ✓ Saved state to {state_output_path}")
# Print accumulation growth
print("\n" + "="*60)
print("Accumulation Growth Summary:")
print("="*60)
for record in state_history:
print(f"Batch {record['batch']:2d}: {record['n_cells']:5d} cells, "
f"{record['observations']:8,d} observations")
print(f"\nFinal accumulated state: {len(accumulated_state)} cells")# Step 4: Load accumulated state and extract statistics for comparison
print("=" * 70)
print("STEP 4: Approximate (Streaming) vs Full Aggregation Comparison")
print("=" * 70)
# Reload the final accumulated state from disk
state_path = state_output_dir / f"state_v{n_batches_to_process:03d}.parquet"
print(f"\n1. Loading accumulated state from {state_path.name}...")
accumulated_state_loaded, state_meta = load_state(state_path, use_tdigest=True)
print(f" ✓ Loaded {len(accumulated_state_loaded)} cells")
# Convert accumulated state (Dict[int, CellAccumulator]) → DataFrame for comparison
print(f"\n2. Extracting statistics from StreamingStats...")
accumulated_stats_rows = []
for healpix_id, accumulator in accumulated_state_loaded.items():
row = {'healpix_id': int(healpix_id)}
# Extract per-column statistics
for col in value_columns:
if col in accumulator.stats_by_column:
stats = accumulator.stats_by_column[col]
row[f'{col}_mean'] = stats.mean
row[f'{col}_std'] = stats.std
row[f'{col}_n'] = stats.n
row[f'{col}_min'] = stats.min_val if np.isfinite(stats.min_val) else np.nan
row[f'{col}_max'] = stats.max_val if np.isfinite(stats.max_val) else np.nan
accumulated_stats_rows.append(row)
accumulated_df = pd.DataFrame(accumulated_stats_rows)
print(f" ✓ Extracted {len(accumulated_df)} cells with statistics")
print(f" Columns: {list(accumulated_df.columns)}")
# Perform full batch aggregation for ground truth
print(f"\n3. Computing full batch aggregation (ground truth)...")
full_aggregated = aggregate_by_sidecar(
original=combined_data,
sidecar=combined_sidecar,
value_columns=value_columns,
aggs=['mean', 'median', 'std', 'min', 'max', 'mad', 'robust_std'],
source_id_col='source_id',
healpix_col='healpix_id',
min_count=1,
sentinel_threshold=1e30
)
print(f" ✓ Aggregated {len(full_aggregated)} cells")
print(f" Columns: {list(full_aggregated.columns)}")
# Merge for comparison (inner join on healpix_id)
print(f"\n4. Merging for comparison...")
# Rename accumulated columns for clarity
accumulated_df_renamed = accumulated_df.copy()
for col in value_columns:
accumulated_df_renamed = accumulated_df_renamed.rename(columns={
f'{col}_mean': f'{col}_approx_mean',
f'{col}_std': f'{col}_approx_std',
f'{col}_n': f'{col}_approx_n',
})
# Inner join on healpix_id
comparison = accumulated_df_renamed.merge(
full_aggregated,
on='healpix_id',
how='inner',
suffixes=('_approx', '_full')
)
print(f" ✓ {len(comparison)} cells matched between methods")
# Statistical comparison
print(f"\n" + "=" * 70)
print("COMPARISON RESULTS")
print("=" * 70)
for col in value_columns:
approx_mean_col = f'{col}_approx_mean'
full_mean_col = f'{col}_mean'
approx_n_col = f'{col}_approx_n'
if approx_mean_col not in comparison.columns or full_mean_col not in comparison.columns:
print(f"\n⚠️ Column {col}: Missing in comparison (skipped)")
continue
print(f"\n{col}:")
# Observation count validation
print(f" Observation count:")
print(f" Approx (StreamingStats): {comparison[approx_n_col].sum():.0f} total")
print(f" Full (n_sources): {comparison['n_sources'].sum():.0f} total")
count_match = np.isclose(comparison[approx_n_col].sum(), comparison['n_sources'].sum())
print(f" {'✓ Counts match' if count_match else '✗ MISMATCH DETECTED'}")
# Mean comparison
print(f" Mean comparison:")
print(f" Approximate mean: {comparison[approx_mean_col].mean():.6f} (std: {comparison[approx_mean_col].std():.6f})")
print(f" Full aggregation: {comparison[full_mean_col].mean():.6f} (std: {comparison[full_mean_col].std():.6f})")
# Compute per-cell differences
mean_diff = comparison[approx_mean_col] - comparison[full_mean_col]
mean_pct_diff = np.abs(mean_diff) / np.abs(comparison[full_mean_col] + 1e-10) * 100
print(f" Absolute difference: {mean_diff.mean():.6e} (max: {mean_diff.abs().max():.6e})")
print(f" % difference: {mean_pct_diff.mean():.4f}% (max: {mean_pct_diff.max():.4f}%)")
# Show top discrepancies
top_disc = mean_pct_diff.nlargest(3)
if len(top_disc) > 0:
print(f" Top 3 discrepancies:")
for idx in top_disc.index:
cell_id = comparison.iloc[idx]['healpix_id']
approx_val = comparison.iloc[idx][approx_mean_col]
full_val = comparison.iloc[idx][full_mean_col]
n_obs = comparison.iloc[idx]['n_sources']
pct = mean_pct_diff.iloc[idx]
print(f" Cell {cell_id}: approx={approx_val:.6f}, full={full_val:.6f}, "
f"diff={pct:.4f}% (n_sources={n_obs:.0f})")
print("\n" + "=" * 70)
print("✓ COMPARISON COMPLETE")
print("=" * 70)# Step 5: Visualize Results (Approximate vs Full)
import healpy as hp
import matplotlib.pyplot as plt
print("=" * 70)
print("STEP 5: Visualization - Approximate vs Full Aggregation")
print("=" * 70)
# Ensure healpix_id is a column (not index) in both DataFrames
accumulated_df_clean = accumulated_df_renamed.reset_index(drop=True)
full_agg_clean = comparison.reset_index(drop=True)
print(f"\n1. Verifying schema...")
if 'healpix_id' not in accumulated_df_clean.columns:
raise ValueError("healpix_id missing from accumulated_df_clean")
if 'healpix_id' not in full_agg_clean.columns:
raise ValueError("healpix_id missing from full_agg_clean")
print(f" ✓ Schema validated")
# Create HEALPix maps for both methods
nside = state_meta.nside
npix = hp.nside2npix(nside)
print(f"\n2. Creating HEALPix maps (nside={nside}, npix={npix})...")
# Method 1: Approximate (StreamingStats)
approx_map = np.full(npix, hp.UNSEEN, dtype=np.float64)
for col in value_columns:
approx_col = f'{col}_approx_mean'
if approx_col in accumulated_df_clean.columns:
healpix_ids = accumulated_df_clean['healpix_id'].values.astype(int)
means = accumulated_df_clean[approx_col].values
# Validate indices
if np.any((healpix_ids < 0) | (healpix_ids >= npix)):
bad_indices = healpix_ids[(healpix_ids < 0) | (healpix_ids >= npix)]
raise ValueError(f"Invalid healpix_id values (out of range [0, {npix})): {bad_indices[:5]}")
approx_map[healpix_ids] = means
print(f" ✓ Populated {np.sum(approx_map != hp.UNSEEN):.0f} cells in approx_map")
break
# Method 2: Full Aggregation
full_map = np.full(npix, hp.UNSEEN, dtype=np.float64)
for col in value_columns:
full_col = f'{col}_mean'
if full_col in full_agg_clean.columns:
healpix_ids = full_agg_clean['healpix_id'].values.astype(int)
means = full_agg_clean[full_col].values
# Validate indices
if np.any((healpix_ids < 0) | (healpix_ids >= npix)):
bad_indices = healpix_ids[(healpix_ids < 0) | (healpix_ids >= npix)]
raise ValueError(f"Invalid healpix_id values (out of range [0, {npix})): {bad_indices[:5]}")
full_map[healpix_ids] = means
print(f" ✓ Populated {np.sum(full_map != hp.UNSEEN):.0f} cells in full_map")
break
# Compute difference map
print(f"\n3. Computing difference map...")
diff_map = approx_map.copy()
valid_mask = (approx_map != hp.UNSEEN) & (full_map != hp.UNSEEN)
diff_map[~valid_mask] = hp.UNSEEN
diff_map[valid_mask] = approx_map[valid_mask] - full_map[valid_mask]
print(f" ✓ Difference map has {np.sum(diff_map != hp.UNSEEN):.0f} valid cells")
# Render three separate figures (healpy limitation workaround)
print(f"\n4. Rendering visualizations...")
# Approximate map
print(" Rendering approx_map...")
hp.mollview(approx_map, title="Approximate (StreamingStats)", unit=f"{value_columns[0]}")
# plt.savefig('comparison_approx.png', dpi=150, bbox_inches='tight')
# plt.close()
# Full aggregation map
print(" Rendering full_map...")
hp.mollview(full_map, title="Full Aggregation", unit=f"{value_columns[0]}")
# plt.savefig('comparison_full.png', dpi=150, bbox_inches='tight')
# plt.close()
# Difference map
print(" Rendering diff_map...")
hp.mollview(diff_map, title="Difference (Approx - Full)", unit=f"Δ{value_columns[0]}")
# plt.savefig('comparison_diff.png', dpi=150, bbox_inches='tight')
# plt.close()
# print("\n✓ Saved comparison visualizations:")
# print(" - comparison_approx.png")
# print(" - comparison_full.png")
# print(" - comparison_diff.png")
# # Optional: Create a composite image with PIL (if available)
# try:
# from PIL import Image
# print(f"\n5. Creating composite image...")
# approx_img = Image.open('comparison_approx.png')
# full_img = Image.open('comparison_full.png')
# diff_img = Image.open('comparison_diff.png')
# # Resize to same height for horizontal concat
# target_height = 600
# approx_img = approx_img.resize((int(approx_img.width * target_height / approx_img.height), target_height))
# full_img = full_img.resize((int(full_img.width * target_height / full_img.height), target_height))
# diff_img = diff_img.resize((int(diff_img.width * target_height / diff_img.height), target_height))
# # Concatenate horizontally
# total_width = approx_img.width + full_img.width + diff_img.width
# max_height = target_height
# composite = Image.new('RGB', (total_width, max_height))
# composite.paste(approx_img, (0, 0))
# composite.paste(full_img, (approx_img.width, 0))
# composite.paste(diff_img, (approx_img.width + full_img.width, 0))
# composite.save('comparison_maps_composite.png', dpi=(150, 150))
# print(f" ✓ Saved composite: comparison_maps_composite.png")
# except ImportError:
# print(f" ⚠️ PIL not available; skipping composite image creation")
# print("\n" + "=" * 70)
# print("✓ VISUALIZATION COMPLETE")
# print("=" * 70)This notebook demonstrated the complete healpyxel workflow:
process_partition()aggregate_by_sidecar()For processing many batch files efficiently, use the CLI tools:
# Process all batches with sidecar creation
for batch in test_data/batches/batch_*.parquet; do
healpyxel_sidecar --input "$batch" \
--nside 32 64 \
--mode fuzzy \
--lon-convention 0_360 \
--ncores 4
done
# Aggregate all data (process each batch with its sidecar)
for batch in test_data/batches/batch_*.parquet; do
healpyxel_aggregate --input "$batch" \
--sidecar-dir test_data/batches/ \
--sidecar-index all \
--aggregate \
--columns r1050 \
--aggs mean median std
done
# Finalize maps (from accumulator state, not aggregate output)
healpyxel_finalize --state state_v010.parquet \
--output final_mosaic.parquet \
--nside 32nside values for different resolutionsfinalize_maps() to create production-ready map products