"""ENTSO-E Transparency Platform Data Collection with Rate Limiting Collects generation, load, and cross-border flow data from ENTSO-E API. Implements proper rate limiting to avoid temporary bans. ENTSO-E Rate Limits (OFFICIAL): - 60 requests per 60 seconds (hard limit - exceeding triggers 10-min ban) - Screen scraping >60 requests/min leads to temporary IP ban Strategy: - 27 requests/minute (45% of 60 limit - safe) - 1 request every ~2.2 seconds - Request data in monthly chunks to minimize API calls """ import polars as pl from pathlib import Path from datetime import datetime, timedelta from dotenv import load_dotenv import os import time from typing import List, Tuple from tqdm import tqdm from entsoe import EntsoePandasClient import pandas as pd # Load environment variables load_dotenv() # FBMC Bidding Zones (12 zones from project plan) BIDDING_ZONES = { 'AT': 'Austria', 'BE': 'Belgium', 'HR': 'Croatia', 'CZ': 'Czech Republic', 'FR': 'France', 'DE_LU': 'Germany-Luxembourg', 'HU': 'Hungary', 'NL': 'Netherlands', 'PL': 'Poland', 'RO': 'Romania', 'SK': 'Slovakia', 'SI': 'Slovenia', } # FBMC Cross-Border Flows (~20 major borders) BORDERS = [ ('DE_LU', 'NL'), ('DE_LU', 'FR'), ('DE_LU', 'BE'), ('DE_LU', 'AT'), ('DE_LU', 'CZ'), ('DE_LU', 'PL'), ('FR', 'BE'), ('FR', 'ES'), # External but affects FBMC ('FR', 'CH'), # External but affects FBMC ('AT', 'CZ'), ('AT', 'HU'), ('AT', 'SI'), ('AT', 'CH'), # External but affects FBMC ('CZ', 'SK'), ('CZ', 'PL'), ('HU', 'SK'), ('HU', 'RO'), ('HU', 'HR'), ('SI', 'HR'), ('PL', 'SK'), ('PL', 'CZ'), ] class EntsoECollector: """Collect ENTSO-E data with proper rate limiting.""" def __init__(self, requests_per_minute: int = 27): """Initialize collector with rate limiting. Args: requests_per_minute: Max requests per minute (default: 27 = 45% of 60 limit) """ api_key = os.getenv('ENTSOE_API_KEY') if not api_key or 'your_entsoe' in api_key.lower(): raise ValueError("ENTSO-E API key not configured in .env file") self.client = EntsoePandasClient(api_key=api_key) self.requests_per_minute = requests_per_minute self.delay_seconds = 60.0 / requests_per_minute self.request_count = 0 print(f"ENTSO-E Collector initialized") print(f"Rate limit: {self.requests_per_minute} requests/minute") print(f"Delay between requests: {self.delay_seconds:.2f}s") def _rate_limit(self): """Apply rate limiting delay.""" time.sleep(self.delay_seconds) self.request_count += 1 def _generate_monthly_chunks( self, start_date: str, end_date: str ) -> List[Tuple[pd.Timestamp, pd.Timestamp]]: """Generate monthly date chunks for API requests. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: List of (start, end) timestamp tuples """ start_dt = pd.Timestamp(start_date, tz='UTC') end_dt = pd.Timestamp(end_date, tz='UTC') chunks = [] current = start_dt while current < end_dt: # Get end of month or end_date, whichever is earlier month_end = (current + pd.offsets.MonthEnd(0)) chunk_end = min(month_end, end_dt) chunks.append((current, chunk_end)) current = chunk_end + pd.Timedelta(hours=1) return chunks def collect_generation_per_type( self, zone: str, start_date: str, end_date: str ) -> pl.DataFrame: """Collect generation by production type for a bidding zone. Args: zone: Bidding zone code (e.g., 'DE_LU', 'FR') start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: Polars DataFrame with generation data """ chunks = self._generate_monthly_chunks(start_date, end_date) all_data = [] for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} generation", leave=False): try: # Fetch generation data df = self.client.query_generation( zone, start=start_chunk, end=end_chunk, psr_type=None # Get all production types ) if df is not None and not df.empty: # Convert to long format df_reset = df.reset_index() df_melted = df_reset.melt( id_vars=['index'], var_name='production_type', value_name='generation_mw' ) df_melted = df_melted.rename(columns={'index': 'timestamp'}) df_melted['zone'] = zone # Convert to Polars pl_df = pl.from_pandas(df_melted) all_data.append(pl_df) self._rate_limit() except Exception as e: print(f" ❌ Failed {zone} {start_chunk.date()} to {end_chunk.date()}: {e}") self._rate_limit() continue if all_data: return pl.concat(all_data) else: return pl.DataFrame() def collect_load( self, zone: str, start_date: str, end_date: str ) -> pl.DataFrame: """Collect load (demand) data for a bidding zone. Args: zone: Bidding zone code start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: Polars DataFrame with load data """ chunks = self._generate_monthly_chunks(start_date, end_date) all_data = [] for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} load", leave=False): try: # Fetch load data series = self.client.query_load( zone, start=start_chunk, end=end_chunk ) if series is not None and not series.empty: df = pd.DataFrame({ 'timestamp': series.index, 'load_mw': series.values, 'zone': zone }) pl_df = pl.from_pandas(df) all_data.append(pl_df) self._rate_limit() except Exception as e: print(f" ❌ Failed {zone} {start_chunk.date()} to {end_chunk.date()}: {e}") self._rate_limit() continue if all_data: return pl.concat(all_data) else: return pl.DataFrame() def collect_cross_border_flows( self, from_zone: str, to_zone: str, start_date: str, end_date: str ) -> pl.DataFrame: """Collect cross-border flow data between two zones. Args: from_zone: From bidding zone to_zone: To bidding zone start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: Polars DataFrame with flow data """ chunks = self._generate_monthly_chunks(start_date, end_date) all_data = [] border_id = f"{from_zone}_{to_zone}" for start_chunk, end_chunk in tqdm(chunks, desc=f" {border_id}", leave=False): try: # Fetch cross-border flow series = self.client.query_crossborder_flows( from_zone, to_zone, start=start_chunk, end=end_chunk ) if series is not None and not series.empty: df = pd.DataFrame({ 'timestamp': series.index, 'flow_mw': series.values, 'from_zone': from_zone, 'to_zone': to_zone, 'border': border_id }) pl_df = pl.from_pandas(df) all_data.append(pl_df) self._rate_limit() except Exception as e: print(f" ❌ Failed {border_id} {start_chunk.date()} to {end_chunk.date()}: {e}") self._rate_limit() continue if all_data: return pl.concat(all_data) else: return pl.DataFrame() def collect_all( self, start_date: str, end_date: str, output_dir: Path ) -> dict: """Collect all ENTSO-E data with rate limiting. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) output_dir: Directory to save Parquet files Returns: Dictionary with paths to saved files """ output_dir.mkdir(parents=True, exist_ok=True) # Calculate total requests months = len(self._generate_monthly_chunks(start_date, end_date)) total_requests = ( len(BIDDING_ZONES) * months * 2 + # Generation + load len(BORDERS) * months # Flows ) estimated_minutes = total_requests / self.requests_per_minute print("=" * 70) print("ENTSO-E Data Collection") print("=" * 70) print(f"Date range: {start_date} to {end_date}") print(f"Bidding zones: {len(BIDDING_ZONES)}") print(f"Cross-border flows: {len(BORDERS)}") print(f"Monthly chunks: {months}") print(f"Total requests: ~{total_requests}") print(f"Rate limit: {self.requests_per_minute} requests/minute (45% of 60 max)") print(f"Estimated time: {estimated_minutes:.1f} minutes") print() results = {} # 1. Collect Generation Data print("[1/3] Collecting generation data by production type...") generation_data = [] for zone in tqdm(BIDDING_ZONES.keys(), desc="Generation"): df = self.collect_generation_per_type(zone, start_date, end_date) if not df.is_empty(): generation_data.append(df) if generation_data: generation_df = pl.concat(generation_data) gen_path = output_dir / "entsoe_generation_2024_2025.parquet" generation_df.write_parquet(gen_path) results['generation'] = gen_path print(f"✅ Generation: {generation_df.shape[0]:,} records → {gen_path}") # 2. Collect Load Data print("\n[2/3] Collecting load (demand) data...") load_data = [] for zone in tqdm(BIDDING_ZONES.keys(), desc="Load"): df = self.collect_load(zone, start_date, end_date) if not df.is_empty(): load_data.append(df) if load_data: load_df = pl.concat(load_data) load_path = output_dir / "entsoe_load_2024_2025.parquet" load_df.write_parquet(load_path) results['load'] = load_path print(f"✅ Load: {load_df.shape[0]:,} records → {load_path}") # 3. Collect Cross-Border Flows print("\n[3/3] Collecting cross-border flows...") flow_data = [] for from_zone, to_zone in tqdm(BORDERS, desc="Flows"): df = self.collect_cross_border_flows(from_zone, to_zone, start_date, end_date) if not df.is_empty(): flow_data.append(df) if flow_data: flow_df = pl.concat(flow_data) flow_path = output_dir / "entsoe_flows_2024_2025.parquet" flow_df.write_parquet(flow_path) results['flows'] = flow_path print(f"✅ Flows: {flow_df.shape[0]:,} records → {flow_path}") print() print("=" * 70) print("ENTSO-E Collection Complete") print("=" * 70) print(f"Total API requests made: {self.request_count}") print(f"Files created: {len(results)}") for data_type, path in results.items(): file_size = path.stat().st_size / (1024**2) print(f" - {data_type}: {file_size:.1f} MB") return results if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Collect ENTSO-E data with proper rate limiting") parser.add_argument( '--start-date', default='2024-10-01', help='Start date (YYYY-MM-DD)' ) parser.add_argument( '--end-date', default='2025-09-30', help='End date (YYYY-MM-DD)' ) parser.add_argument( '--output-dir', type=Path, default=Path('data/raw'), help='Output directory for Parquet files' ) parser.add_argument( '--requests-per-minute', type=int, default=27, help='Requests per minute (default: 27 = 45%% of 60 limit)' ) args = parser.parse_args() # Initialize collector and run collector = EntsoECollector(requests_per_minute=args.requests_per_minute) collector.collect_all( start_date=args.start_date, end_date=args.end_date, output_dir=args.output_dir )