"""Identify critical CNECs from 24-month SPARSE data (Phase 1). Analyzes binding patterns across 24 months to identify the 200 most critical CNECs: - Tier 1: Top 50 CNECs (full feature treatment) - Tier 2: Next 150 CNECs (reduced features) Outputs: - data/processed/cnec_ranking_full.csv: All CNECs ranked by importance - data/processed/critical_cnecs_tier1.csv: Top 50 CNEC EIC codes - data/processed/critical_cnecs_tier2.csv: Next 150 CNEC EIC codes - data/processed/critical_cnecs_all.csv: Combined 200 EIC codes for Phase 2 Usage: python scripts/identify_critical_cnecs.py --input data/raw/phase1_24month/jao_cnec_ptdf.parquet """ import sys from pathlib import Path import polars as pl import argparse # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) def calculate_cnec_importance( df: pl.DataFrame, total_hours: int ) -> pl.DataFrame: """Calculate importance score for each CNEC. Importance Score Formula: importance = binding_freq × avg_shadow_price × (1 - avg_margin_ratio) Where: - binding_freq: Fraction of hours CNEC appears in active constraints - avg_shadow_price: Average shadow price when binding (economic impact) - avg_margin_ratio: Average ram/fmax (proximity to limit, lower = more critical) Args: df: SPARSE CNEC data (active constraints only) total_hours: Total hours in dataset (for binding frequency calculation) Returns: DataFrame with CNEC rankings and statistics """ cnec_stats = ( df .group_by('cnec_eic', 'cnec_name', 'tso') .agg([ # Occurrence count: how many hours this CNEC was active pl.len().alias('active_hours'), # Shadow price statistics (economic impact) pl.col('shadow_price').mean().alias('avg_shadow_price'), pl.col('shadow_price').max().alias('max_shadow_price'), pl.col('shadow_price').quantile(0.95).alias('p95_shadow_price'), # RAM statistics (capacity utilization) pl.col('ram').mean().alias('avg_ram'), pl.col('fmax').mean().alias('avg_fmax'), (pl.col('ram') / pl.col('fmax')).mean().alias('avg_margin_ratio'), # Binding severity: fraction of active hours where shadow_price > 0 (pl.col('shadow_price') > 0).mean().alias('binding_severity'), # PTDF volatility: average absolute PTDF across zones (network impact) pl.concat_list([ pl.col('ptdf_AT').abs(), pl.col('ptdf_BE').abs(), pl.col('ptdf_CZ').abs(), pl.col('ptdf_DE').abs(), pl.col('ptdf_FR').abs(), pl.col('ptdf_HR').abs(), pl.col('ptdf_HU').abs(), pl.col('ptdf_NL').abs(), pl.col('ptdf_PL').abs(), pl.col('ptdf_RO').abs(), pl.col('ptdf_SI').abs(), pl.col('ptdf_SK').abs(), ]).list.mean().alias('avg_abs_ptdf') ]) .with_columns([ # Binding frequency: fraction of total hours CNEC was active (pl.col('active_hours') / total_hours).alias('binding_freq'), # Importance score (primary ranking metric) ( (pl.col('active_hours') / total_hours) * # binding_freq pl.col('avg_shadow_price') * # economic impact (1 - pl.col('avg_margin_ratio')) # criticality (1 - ram/fmax) ).alias('importance_score') ]) .sort('importance_score', descending=True) ) return cnec_stats def export_tier_eic_codes( cnec_stats: pl.DataFrame, tier_name: str, start_idx: int, count: int, output_path: Path ) -> pl.DataFrame: """Export EIC codes for a specific tier. Args: cnec_stats: DataFrame with CNEC rankings tier_name: Tier label (e.g., "Tier 1", "Tier 2") start_idx: Starting index in ranking (0-based) count: Number of CNECs to include output_path: Path to save CSV Returns: DataFrame with selected CNECs """ tier_cnecs = cnec_stats.slice(start_idx, count) # Create export DataFrame with essential info export_df = tier_cnecs.select([ pl.col('cnec_eic'), pl.col('cnec_name'), pl.col('tso'), pl.lit(tier_name).alias('tier'), pl.col('importance_score'), pl.col('binding_freq'), pl.col('avg_shadow_price'), pl.col('active_hours') ]) # Save to CSV output_path.parent.mkdir(parents=True, exist_ok=True) export_df.write_csv(output_path) print(f"\n{tier_name} CNECs ({count}):") print(f" EIC codes saved to: {output_path}") print(f" Importance score range: [{tier_cnecs['importance_score'].min():.2f}, {tier_cnecs['importance_score'].max():.2f}]") print(f" Binding frequency range: [{tier_cnecs['binding_freq'].min():.2%}, {tier_cnecs['binding_freq'].max():.2%}]") return export_df def main(): """Identify critical CNECs from 24-month SPARSE data.""" parser = argparse.ArgumentParser( description="Identify critical CNECs for Phase 2 feature engineering" ) parser.add_argument( '--input', type=Path, required=True, help='Path to 24-month SPARSE CNEC data (jao_cnec_ptdf.parquet)' ) parser.add_argument( '--tier1-count', type=int, default=50, help='Number of Tier 1 CNECs (default: 50)' ) parser.add_argument( '--tier2-count', type=int, default=150, help='Number of Tier 2 CNECs (default: 150)' ) parser.add_argument( '--output-dir', type=Path, default=Path('data/processed'), help='Output directory for results (default: data/processed)' ) args = parser.parse_args() print("=" * 80) print("CRITICAL CNEC IDENTIFICATION (Phase 1 Analysis)") print("=" * 80) print() # Load 24-month SPARSE CNEC data print(f"Loading SPARSE CNEC data from: {args.input}") if not args.input.exists(): print(f"[ERROR] Input file not found: {args.input}") print(" Please run Phase 1 data collection first:") print(" python scripts/collect_jao_complete.py --start-date 2023-10-01 --end-date 2025-09-30 --output-dir data/raw/phase1_24month") sys.exit(1) cnec_df = pl.read_parquet(args.input) print(f"[OK] Loaded {cnec_df.shape[0]:,} records") print(f" Columns: {cnec_df.shape[1]}") print() # Filter out CNECs without EIC codes (needed for Phase 2 collection) null_eic_count = cnec_df.filter(pl.col('cnec_eic').is_null()).shape[0] if null_eic_count > 0: print(f"[WARNING] Filtering out {null_eic_count:,} records with null EIC codes") cnec_df = cnec_df.filter(pl.col('cnec_eic').is_not_null()) print(f"[OK] Remaining records: {cnec_df.shape[0]:,}") print() # Calculate total hours in dataset if 'collection_date' in cnec_df.columns: unique_dates = cnec_df['collection_date'].n_unique() total_hours = unique_dates * 24 # Approximate (handles DST) else: # Fallback: estimate from data total_hours = len(cnec_df) // cnec_df['cnec_eic'].n_unique() print(f"Dataset coverage:") print(f" Unique dates: {unique_dates if 'collection_date' in cnec_df.columns else 'Unknown'}") print(f" Estimated total hours: {total_hours:,}") print(f" Unique CNECs: {cnec_df['cnec_eic'].n_unique()}") print() # Calculate CNEC importance scores print("Calculating CNEC importance scores...") cnec_stats = calculate_cnec_importance(cnec_df, total_hours) print(f"[OK] Analyzed {cnec_stats.shape[0]} unique CNECs") print() # Display top 10 CNECs print("=" * 80) print("TOP 10 MOST CRITICAL CNECs") print("=" * 80) top10 = cnec_stats.head(10) for i, row in enumerate(top10.iter_rows(named=True), 1): print(f"\n{i}. {row['cnec_name'][:60]}") eic_display = row['cnec_eic'][:16] + "..." if row['cnec_eic'] else "N/A" print(f" TSO: {row['tso']:<15s} | EIC: {eic_display}") print(f" Importance Score: {row['importance_score']:>8.2f}") print(f" Binding Frequency: {row['binding_freq']:>6.2%} ({row['active_hours']:,} hours)") print(f" Avg Shadow Price: €{row['avg_shadow_price']:>6.2f}/MW (max: €{row['max_shadow_price']:.2f})") print(f" Avg Margin Ratio: {row['avg_margin_ratio']:>6.2%} (RAM/Fmax)") print() print("=" * 80) # Export Tier 1 CNECs (Top 50) tier1_df = export_tier_eic_codes( cnec_stats, tier_name="Tier 1", start_idx=0, count=args.tier1_count, output_path=args.output_dir / "critical_cnecs_tier1.csv" ) # Export Tier 2 CNECs (Next 150) tier2_df = export_tier_eic_codes( cnec_stats, tier_name="Tier 2", start_idx=args.tier1_count, count=args.tier2_count, output_path=args.output_dir / "critical_cnecs_tier2.csv" ) # Export combined list (all 200) combined_df = pl.concat([tier1_df, tier2_df]) combined_path = args.output_dir / "critical_cnecs_all.csv" combined_df.write_csv(combined_path) print(f"\nCombined list (all 200 CNECs):") print(f" EIC codes saved to: {combined_path}") # Export full ranking with detailed statistics full_ranking_path = args.output_dir / "cnec_ranking_full.csv" # Drop any nested columns that CSV cannot handle export_cols = [c for c in cnec_stats.columns if cnec_stats[c].dtype != pl.List] cnec_stats.select(export_cols).write_csv(full_ranking_path) print(f"\nFull CNEC ranking:") print(f" All {cnec_stats.shape[0]} CNECs saved to: {full_ranking_path}") # Summary statistics print() print("=" * 80) print("SUMMARY") print("=" * 80) print(f"\nTotal CNECs analyzed: {cnec_stats.shape[0]}") print(f"Critical CNECs selected: {args.tier1_count + args.tier2_count}") print(f" - Tier 1 (full features): {args.tier1_count}") print(f" - Tier 2 (reduced features): {args.tier2_count}") print(f"\nImportance score distribution:") print(f" Min: {cnec_stats['importance_score'].min():.2f}") print(f" Max: {cnec_stats['importance_score'].max():.2f}") print(f" Median: {cnec_stats['importance_score'].median():.2f}") print(f" Tier 1 cutoff: {cnec_stats['importance_score'][args.tier1_count]:.2f}") print(f" Tier 2 cutoff: {cnec_stats['importance_score'][args.tier1_count + args.tier2_count]:.2f}") print(f"\nBinding frequency distribution (all CNECs):") print(f" Min: {cnec_stats['binding_freq'].min():.2%}") print(f" Max: {cnec_stats['binding_freq'].max():.2%}") print(f" Median: {cnec_stats['binding_freq'].median():.2%}") print(f"\nTier 1 binding frequency:") print(f" Range: [{tier1_df['binding_freq'].min():.2%}, {tier1_df['binding_freq'].max():.2%}]") print(f" Mean: {tier1_df['binding_freq'].mean():.2%}") print(f"\nTier 2 binding frequency:") print(f" Range: [{tier2_df['binding_freq'].min():.2%}, {tier2_df['binding_freq'].max():.2%}]") print(f" Mean: {tier2_df['binding_freq'].mean():.2%}") # TSO distribution print(f"\nTier 1 TSO distribution:") tier1_tsos = tier1_df.group_by('tso').agg(pl.len().alias('count')).sort('count', descending=True) for row in tier1_tsos.iter_rows(named=True): print(f" {row['tso']:<15s}: {row['count']:>3d} CNECs ({row['count']/args.tier1_count*100:.1f}%)") print(f"\nPhase 2 Data Collection:") print(f" Use EIC codes from: {combined_path}") print(f" Expected records: {args.tier1_count + args.tier2_count} CNECs × {total_hours:,} hours = {(args.tier1_count + args.tier2_count) * total_hours:,}") print(f" Estimated file size: ~100-150 MB (compressed parquet)") print() print("=" * 80) print("IDENTIFICATION COMPLETE") print("=" * 80) print() print("[NEXT STEP] Collect DENSE CNEC data for Phase 2 feature engineering:") print(" See: doc/final_domain_research.md for collection methods") if __name__ == "__main__": main()