fbmc-chronos2 / scripts /test_entsoe_phase1b_validate_solutions.py
Evgueni Poloukarov
feat: complete Phase 1 ENTSO-E asset-specific outage validation
27cb60a
raw
history blame
14.6 kB
"""
Phase 1B: Validate Asset-Specific Outages & Pumped Storage Consumption
========================================================================
Tests the two breakthrough solutions:
1. Asset-specific transmission outages using _query_unavailability(mRID=cnec_eic)
2. Pumped storage consumption via XML parsing (inBiddingZone vs outBiddingZone)
"""
import os
import sys
from pathlib import Path
from datetime import datetime, timedelta
import time
import pandas as pd
import polars as pl
import zipfile
from io import BytesIO
import xml.etree.ElementTree as ET
from dotenv import load_dotenv
from entsoe import EntsoePandasClient, EntsoeRawClient
# Add src to path
sys.path.append(str(Path(__file__).parent.parent))
# Load environment
load_dotenv()
API_KEY = os.getenv('ENTSOE_API_KEY')
if not API_KEY:
raise ValueError("ENTSOE_API_KEY not found in .env file")
# Initialize clients
pandas_client = EntsoePandasClient(api_key=API_KEY)
raw_client = EntsoeRawClient(api_key=API_KEY)
print("="*80)
print("PHASE 1B: VALIDATION OF BREAKTHROUGH SOLUTIONS")
print("="*80)
print()
# ============================================================================
# TEST 1: Asset-Specific Transmission Outages with mRID Parameter
# ============================================================================
print("-"*80)
print("TEST 1: ASSET-SPECIFIC TRANSMISSION OUTAGES (mRID PARAMETER)")
print("-"*80)
print()
# Load CNEC EIC codes
print("Loading CNEC EIC codes...")
try:
cnec_file = Path(__file__).parent.parent / 'data' / 'processed' / 'critical_cnecs_tier1.csv'
cnec_df = pl.read_csv(cnec_file)
cnec_eics = cnec_df.select('cnec_eic').to_series().to_list()
print(f"[OK] Loaded {len(cnec_eics)} Tier-1 CNEC EICs")
print()
# Test with first CNEC
test_cnec = cnec_eics[0]
test_cnec_name = cnec_df.filter(pl.col('cnec_eic') == test_cnec).select('cnec_name').item()
print(f"Test CNEC: {test_cnec}")
print(f"Name: {test_cnec_name}")
print()
print("Attempting asset-specific query using _query_unavailability()...")
print("Parameters:")
print(f" - doctype: A78 (transmission unavailability)")
print(f" - mRID: {test_cnec}")
print(f" - country_code: FR (France)")
print(f" - period: 2025-09-23 to 2025-09-30")
print()
start_time = time.time()
try:
# Use internal method with mRID parameter
outages_zip = pandas_client._query_unavailability(
country_code='FR',
start=pd.Timestamp('2025-09-23', tz='UTC'),
end=pd.Timestamp('2025-09-30', tz='UTC'),
doctype='A78', # Transmission unavailability
mRID=test_cnec, # Asset-specific filter!
docstatus=None
)
query_time = time.time() - start_time
print(f"[OK] Query successful! (took {query_time:.2f} seconds)")
print(f" Response type: {type(outages_zip)}")
print(f" Response size: {len(outages_zip)} bytes")
print()
# Parse ZIP to check contents
print("Parsing ZIP response...")
with zipfile.ZipFile(BytesIO(outages_zip), 'r') as zf:
xml_files = [f for f in zf.namelist() if f.endswith('.xml')]
print(f" XML files in ZIP: {len(xml_files)}")
if xml_files:
# Parse first XML file
with zf.open(xml_files[0]) as xml_file:
xml_content = xml_file.read()
root = ET.fromstring(xml_content)
# Check if CNEC EIC appears in XML
xml_str = xml_content.decode('utf-8')
cnec_in_xml = test_cnec in xml_str
print(f" CNEC EIC found in XML: {cnec_in_xml}")
# Extract some details
ns = {'ns': 'urn:iec62325.351:tc57wg16:451-6:transmissiondocument:3:0'}
# Try to find unavailability records
unavail_series = root.findall('.//ns:Unavailability_TimeSeries', ns)
print(f" Unavailability TimeSeries found: {len(unavail_series)}")
if unavail_series:
# Extract details from first record
first_series = unavail_series[0]
# Try to find registered resource
reg_resource = first_series.find('.//ns:registeredResource', ns)
if reg_resource is not None:
resource_mrid = reg_resource.find('.//ns:mRID', ns)
if resource_mrid is not None:
print(f" Registered resource mRID: {resource_mrid.text}")
print(f" Matches test CNEC: {resource_mrid.text == test_cnec}")
# Extract time period
period = first_series.find('.//ns:Period', ns)
if period is not None:
time_interval = period.find('.//ns:timeInterval', ns)
if time_interval is not None:
start = time_interval.find('.//ns:start', ns)
end = time_interval.find('.//ns:end', ns)
if start is not None and end is not None:
print(f" Outage period: {start.text} to {end.text}")
print()
print("[SUCCESS] Asset-specific outages with mRID parameter WORKS!")
print(f">> Can query all 208 CNECs individually")
print(f">> Estimated time for 208 CNECs: {query_time * 208 / 60:.1f} minutes per time period")
else:
print(" [WARN] No XML files in ZIP (may be no outages for this asset)")
print(" >> Try with different CNEC or time period")
except Exception as e:
print(f"[FAIL] Query with mRID failed: {e}")
print(" >> Asset-specific filtering may not be available")
print(" >> Fallback to border-level outages (20 features)")
except Exception as e:
print(f"[FAIL] Test 1 failed: {e}")
print()
# ============================================================================
# TEST 2: Pumped Storage Consumption via XML Parsing
# ============================================================================
print("-"*80)
print("TEST 2: PUMPED STORAGE CONSUMPTION (XML PARSING)")
print("-"*80)
print()
print("Testing pumped storage for Switzerland (CH)...")
print("Query: PSR type B10 (Hydro Pumped Storage)")
print("Period: 2025-09-23 00:00 to 2025-09-24 23:00 (48 hours)")
print()
try:
# Get raw XML response
print("Fetching raw XML from ENTSO-E API...")
xml_response = raw_client.query_generation(
country_code='CH',
start=pd.Timestamp('2025-09-23 00:00', tz='UTC'),
end=pd.Timestamp('2025-09-24 23:00', tz='UTC'),
psr_type='B10' # Hydro Pumped Storage
)
print(f"[OK] Received XML response ({len(xml_response)} bytes)")
print()
# Parse XML
print("Parsing XML to identify generation vs consumption...")
root = ET.fromstring(xml_response)
# Define namespace
ns = {'ns': 'urn:iec62325.351:tc57wg16:451-6:generationloaddocument:3:0'}
# Find all TimeSeries
timeseries_list = root.findall('.//ns:TimeSeries', ns)
print(f" TimeSeries elements found: {len(timeseries_list)}")
print()
generation_series = []
consumption_series = []
for ts in timeseries_list:
# Check for direction indicators
in_domain = ts.find('.//ns:inBiddingZone_Domain.mRID', ns)
out_domain = ts.find('.//ns:outBiddingZone_Domain.mRID', ns)
# Get PSR type
psr_type = ts.find('.//ns:MktPSRType', ns)
if psr_type is not None:
psr_type_code = psr_type.find('.//ns:psrType', ns)
psr_type_text = psr_type_code.text if psr_type_code is not None else 'Unknown'
else:
psr_type_text = 'Unknown'
if out_domain is not None:
# outBiddingZone = power going OUT of zone (consumption/pumping)
consumption_series.append(ts)
print(f" [CONSUMPTION] TimeSeries with outBiddingZone_Domain")
print(f" PSR Type: {psr_type_text}")
print(f" Domain: {out_domain.text}")
elif in_domain is not None:
# inBiddingZone = power coming INTO zone (generation)
generation_series.append(ts)
print(f" [GENERATION] TimeSeries with inBiddingZone_Domain")
print(f" PSR Type: {psr_type_text}")
print(f" Domain: {in_domain.text}")
print()
print(f"Summary:")
print(f" Generation TimeSeries: {len(generation_series)}")
print(f" Consumption TimeSeries: {len(consumption_series)}")
print()
if len(generation_series) > 0 and len(consumption_series) > 0:
print("[SUCCESS] Pumped storage consumption/generation SEPARATED!")
print(">> Can extract both generation and consumption from same query")
print(">> inBiddingZone_Domain = generation (power produced)")
print(">> outBiddingZone_Domain = consumption (power used for pumping)")
print()
# Extract sample values
print("Extracting sample hourly values...")
# Parse generation values
if generation_series:
gen_ts = generation_series[0]
period = gen_ts.find('.//ns:Period', ns)
if period is not None:
points = period.findall('.//ns:Point', ns)
print(f"\n Generation (first 10 hours):")
for point in points[:10]:
position = point.find('.//ns:position', ns)
quantity = point.find('.//ns:quantity', ns)
if position is not None and quantity is not None:
print(f" Hour {position.text}: {quantity.text} MW")
# Parse consumption values
if consumption_series:
cons_ts = consumption_series[0]
period = cons_ts.find('.//ns:Period', ns)
if period is not None:
points = period.findall('.//ns:Point', ns)
print(f"\n Consumption/Pumping (first 10 hours):")
for point in points[:10]:
position = point.find('.//ns:position', ns)
quantity = point.find('.//ns:quantity', ns)
if position is not None and quantity is not None:
print(f" Hour {position.text}: {quantity.text} MW")
print()
print(">> Implementation: Parse XML, separate by inBiddingZone vs outBiddingZone")
print(">> Result: 7 generation + 7 consumption + 7 net = 21 pumped storage features")
elif len(generation_series) > 0:
print("[PARTIAL SUCCESS] Only generation found, no consumption")
print(">> May need alternative query or accept generation-only")
print(">> Result: 7 pumped storage generation features only")
else:
print("[FAIL] No TimeSeries parsed correctly")
print(">> XML structure may be different than expected")
except Exception as e:
print(f"[FAIL] Test 2 failed: {e}")
import traceback
traceback.print_exc()
print()
# ============================================================================
# TEST 3: Multiple CNEC Performance Test
# ============================================================================
print("-"*80)
print("TEST 3: MULTIPLE CNEC PERFORMANCE TEST")
print("-"*80)
print()
print("Testing query time for multiple CNECs to estimate full collection time...")
print()
try:
# Test with 3 sample CNECs
sample_cnecs = cnec_eics[:3]
print(f"Testing {len(sample_cnecs)} CNECs:")
for cnec in sample_cnecs:
name = cnec_df.filter(pl.col('cnec_eic') == cnec).select('cnec_name').item()
print(f" - {cnec}: {name}")
print()
query_times = []
for i, cnec in enumerate(sample_cnecs, 1):
print(f"Query {i}/{len(sample_cnecs)}: {cnec}...")
start_time = time.time()
try:
outages_zip = pandas_client._query_unavailability(
country_code='FR',
start=pd.Timestamp('2025-09-23', tz='UTC'),
end=pd.Timestamp('2025-09-30', tz='UTC'),
doctype='A78',
mRID=cnec,
docstatus=None
)
query_time = time.time() - start_time
query_times.append(query_time)
print(f" [OK] {query_time:.2f}s (response: {len(outages_zip)} bytes)")
# Rate limiting: wait 2.2 seconds between queries (27 req/min)
if i < len(sample_cnecs):
time.sleep(2.2)
except Exception as e:
print(f" [FAIL] {e}")
print()
if query_times:
avg_time = sum(query_times) / len(query_times)
print(f"Average query time: {avg_time:.2f} seconds")
print()
# Estimate for all 208 CNECs
total_time = 208 * (avg_time + 2.2) # Query time + rate limit delay
print(f"Estimated time for 208 CNECs:")
print(f" Per time period: {total_time / 60:.1f} minutes")
print(f" For 24-month collection (24 months): {total_time * 24 / 3600:.1f} hours")
print()
print("[OK] Performance acceptable for full collection")
except Exception as e:
print(f"[FAIL] Performance test failed: {e}")
print()
# ============================================================================
# SUMMARY
# ============================================================================
print("="*80)
print("VALIDATION SUMMARY")
print("="*80)
print()
print("TEST 1: Asset-Specific Transmission Outages")
print(" Status: [Refer to test output above]")
print(" If SUCCESS: Implement 208-feature transmission outages")
print(" If FAIL: Fallback to 20-feature border-level outages")
print()
print("TEST 2: Pumped Storage Consumption")
print(" Status: [Refer to test output above]")
print(" If SUCCESS: Implement 21 pumped storage features (7 gen + 7 cons + 7 net)")
print(" If FAIL: Fallback to 7-feature generation-only")
print()
print("TEST 3: Performance")
print(" Status: [Refer to test output above]")
print(" Collection time estimate: [See above]")
print()
print("="*80)
print("NEXT STEPS:")
print("1. Review validation results above")
print("2. Update implementation plan based on outcomes")
print("3. Proceed to Phase 2 (extend collect_entsoe.py)")
print("="*80)