Spaces:
Sleeping
Sleeping
| """ENTSO-E Transparency Platform Data Collection with Rate Limiting | |
| Collects generation, load, and cross-border flow data from ENTSO-E API. | |
| Implements proper rate limiting to avoid temporary bans. | |
| ENTSO-E Rate Limits (OFFICIAL): | |
| - 60 requests per 60 seconds (hard limit - exceeding triggers 10-min ban) | |
| - Screen scraping >60 requests/min leads to temporary IP ban | |
| Strategy: | |
| - 27 requests/minute (45% of 60 limit - safe) | |
| - 1 request every ~2.2 seconds | |
| - Request data in monthly chunks to minimize API calls | |
| """ | |
| import polars as pl | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from dotenv import load_dotenv | |
| import os | |
| import time | |
| from typing import List, Tuple | |
| from tqdm import tqdm | |
| from entsoe import EntsoePandasClient | |
| import pandas as pd | |
| # Load environment variables | |
| load_dotenv() | |
| # FBMC Bidding Zones (12 zones from project plan) | |
| BIDDING_ZONES = { | |
| 'AT': 'Austria', | |
| 'BE': 'Belgium', | |
| 'HR': 'Croatia', | |
| 'CZ': 'Czech Republic', | |
| 'FR': 'France', | |
| 'DE_LU': 'Germany-Luxembourg', | |
| 'HU': 'Hungary', | |
| 'NL': 'Netherlands', | |
| 'PL': 'Poland', | |
| 'RO': 'Romania', | |
| 'SK': 'Slovakia', | |
| 'SI': 'Slovenia', | |
| } | |
| # FBMC Cross-Border Flows (~20 major borders) | |
| BORDERS = [ | |
| ('DE_LU', 'NL'), | |
| ('DE_LU', 'FR'), | |
| ('DE_LU', 'BE'), | |
| ('DE_LU', 'AT'), | |
| ('DE_LU', 'CZ'), | |
| ('DE_LU', 'PL'), | |
| ('FR', 'BE'), | |
| ('FR', 'ES'), # External but affects FBMC | |
| ('FR', 'CH'), # External but affects FBMC | |
| ('AT', 'CZ'), | |
| ('AT', 'HU'), | |
| ('AT', 'SI'), | |
| ('AT', 'CH'), # External but affects FBMC | |
| ('CZ', 'SK'), | |
| ('CZ', 'PL'), | |
| ('HU', 'SK'), | |
| ('HU', 'RO'), | |
| ('HU', 'HR'), | |
| ('SI', 'HR'), | |
| ('PL', 'SK'), | |
| ('PL', 'CZ'), | |
| ] | |
| class EntsoECollector: | |
| """Collect ENTSO-E data with proper rate limiting.""" | |
| def __init__(self, requests_per_minute: int = 27): | |
| """Initialize collector with rate limiting. | |
| Args: | |
| requests_per_minute: Max requests per minute (default: 27 = 45% of 60 limit) | |
| """ | |
| api_key = os.getenv('ENTSOE_API_KEY') | |
| if not api_key or 'your_entsoe' in api_key.lower(): | |
| raise ValueError("ENTSO-E API key not configured in .env file") | |
| self.client = EntsoePandasClient(api_key=api_key) | |
| self.requests_per_minute = requests_per_minute | |
| self.delay_seconds = 60.0 / requests_per_minute | |
| self.request_count = 0 | |
| print(f"ENTSO-E Collector initialized") | |
| print(f"Rate limit: {self.requests_per_minute} requests/minute") | |
| print(f"Delay between requests: {self.delay_seconds:.2f}s") | |
| def _rate_limit(self): | |
| """Apply rate limiting delay.""" | |
| time.sleep(self.delay_seconds) | |
| self.request_count += 1 | |
| def _generate_monthly_chunks( | |
| self, | |
| start_date: str, | |
| end_date: str | |
| ) -> List[Tuple[pd.Timestamp, pd.Timestamp]]: | |
| """Generate monthly date chunks for API requests. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| List of (start, end) timestamp tuples | |
| """ | |
| start_dt = pd.Timestamp(start_date, tz='UTC') | |
| end_dt = pd.Timestamp(end_date, tz='UTC') | |
| chunks = [] | |
| current = start_dt | |
| while current < end_dt: | |
| # Get end of month or end_date, whichever is earlier | |
| month_end = (current + pd.offsets.MonthEnd(0)) | |
| chunk_end = min(month_end, end_dt) | |
| chunks.append((current, chunk_end)) | |
| current = chunk_end + pd.Timedelta(hours=1) | |
| return chunks | |
| def collect_generation_per_type( | |
| self, | |
| zone: str, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Collect generation by production type for a bidding zone. | |
| Args: | |
| zone: Bidding zone code (e.g., 'DE_LU', 'FR') | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with generation data | |
| """ | |
| chunks = self._generate_monthly_chunks(start_date, end_date) | |
| all_data = [] | |
| for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} generation", leave=False): | |
| try: | |
| # Fetch generation data | |
| df = self.client.query_generation( | |
| zone, | |
| start=start_chunk, | |
| end=end_chunk, | |
| psr_type=None # Get all production types | |
| ) | |
| if df is not None and not df.empty: | |
| # Convert to long format | |
| df_reset = df.reset_index() | |
| df_melted = df_reset.melt( | |
| id_vars=['index'], | |
| var_name='production_type', | |
| value_name='generation_mw' | |
| ) | |
| df_melted = df_melted.rename(columns={'index': 'timestamp'}) | |
| df_melted['zone'] = zone | |
| # Convert to Polars | |
| pl_df = pl.from_pandas(df_melted) | |
| all_data.append(pl_df) | |
| self._rate_limit() | |
| except Exception as e: | |
| print(f" ❌ Failed {zone} {start_chunk.date()} to {end_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_data: | |
| return pl.concat(all_data) | |
| else: | |
| return pl.DataFrame() | |
| def collect_load( | |
| self, | |
| zone: str, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Collect load (demand) data for a bidding zone. | |
| Args: | |
| zone: Bidding zone code | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with load data | |
| """ | |
| chunks = self._generate_monthly_chunks(start_date, end_date) | |
| all_data = [] | |
| for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} load", leave=False): | |
| try: | |
| # Fetch load data | |
| series = self.client.query_load( | |
| zone, | |
| start=start_chunk, | |
| end=end_chunk | |
| ) | |
| if series is not None and not series.empty: | |
| df = pd.DataFrame({ | |
| 'timestamp': series.index, | |
| 'load_mw': series.values, | |
| 'zone': zone | |
| }) | |
| pl_df = pl.from_pandas(df) | |
| all_data.append(pl_df) | |
| self._rate_limit() | |
| except Exception as e: | |
| print(f" ❌ Failed {zone} {start_chunk.date()} to {end_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_data: | |
| return pl.concat(all_data) | |
| else: | |
| return pl.DataFrame() | |
| def collect_cross_border_flows( | |
| self, | |
| from_zone: str, | |
| to_zone: str, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Collect cross-border flow data between two zones. | |
| Args: | |
| from_zone: From bidding zone | |
| to_zone: To bidding zone | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with flow data | |
| """ | |
| chunks = self._generate_monthly_chunks(start_date, end_date) | |
| all_data = [] | |
| border_id = f"{from_zone}_{to_zone}" | |
| for start_chunk, end_chunk in tqdm(chunks, desc=f" {border_id}", leave=False): | |
| try: | |
| # Fetch cross-border flow | |
| series = self.client.query_crossborder_flows( | |
| from_zone, | |
| to_zone, | |
| start=start_chunk, | |
| end=end_chunk | |
| ) | |
| if series is not None and not series.empty: | |
| df = pd.DataFrame({ | |
| 'timestamp': series.index, | |
| 'flow_mw': series.values, | |
| 'from_zone': from_zone, | |
| 'to_zone': to_zone, | |
| 'border': border_id | |
| }) | |
| pl_df = pl.from_pandas(df) | |
| all_data.append(pl_df) | |
| self._rate_limit() | |
| except Exception as e: | |
| print(f" ❌ Failed {border_id} {start_chunk.date()} to {end_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_data: | |
| return pl.concat(all_data) | |
| else: | |
| return pl.DataFrame() | |
| def collect_all( | |
| self, | |
| start_date: str, | |
| end_date: str, | |
| output_dir: Path | |
| ) -> dict: | |
| """Collect all ENTSO-E data with rate limiting. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| output_dir: Directory to save Parquet files | |
| Returns: | |
| Dictionary with paths to saved files | |
| """ | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Calculate total requests | |
| months = len(self._generate_monthly_chunks(start_date, end_date)) | |
| total_requests = ( | |
| len(BIDDING_ZONES) * months * 2 + # Generation + load | |
| len(BORDERS) * months # Flows | |
| ) | |
| estimated_minutes = total_requests / self.requests_per_minute | |
| print("=" * 70) | |
| print("ENTSO-E Data Collection") | |
| print("=" * 70) | |
| print(f"Date range: {start_date} to {end_date}") | |
| print(f"Bidding zones: {len(BIDDING_ZONES)}") | |
| print(f"Cross-border flows: {len(BORDERS)}") | |
| print(f"Monthly chunks: {months}") | |
| print(f"Total requests: ~{total_requests}") | |
| print(f"Rate limit: {self.requests_per_minute} requests/minute (45% of 60 max)") | |
| print(f"Estimated time: {estimated_minutes:.1f} minutes") | |
| print() | |
| results = {} | |
| # 1. Collect Generation Data | |
| print("[1/3] Collecting generation data by production type...") | |
| generation_data = [] | |
| for zone in tqdm(BIDDING_ZONES.keys(), desc="Generation"): | |
| df = self.collect_generation_per_type(zone, start_date, end_date) | |
| if not df.is_empty(): | |
| generation_data.append(df) | |
| if generation_data: | |
| generation_df = pl.concat(generation_data) | |
| gen_path = output_dir / "entsoe_generation_2024_2025.parquet" | |
| generation_df.write_parquet(gen_path) | |
| results['generation'] = gen_path | |
| print(f"✅ Generation: {generation_df.shape[0]:,} records → {gen_path}") | |
| # 2. Collect Load Data | |
| print("\n[2/3] Collecting load (demand) data...") | |
| load_data = [] | |
| for zone in tqdm(BIDDING_ZONES.keys(), desc="Load"): | |
| df = self.collect_load(zone, start_date, end_date) | |
| if not df.is_empty(): | |
| load_data.append(df) | |
| if load_data: | |
| load_df = pl.concat(load_data) | |
| load_path = output_dir / "entsoe_load_2024_2025.parquet" | |
| load_df.write_parquet(load_path) | |
| results['load'] = load_path | |
| print(f"✅ Load: {load_df.shape[0]:,} records → {load_path}") | |
| # 3. Collect Cross-Border Flows | |
| print("\n[3/3] Collecting cross-border flows...") | |
| flow_data = [] | |
| for from_zone, to_zone in tqdm(BORDERS, desc="Flows"): | |
| df = self.collect_cross_border_flows(from_zone, to_zone, start_date, end_date) | |
| if not df.is_empty(): | |
| flow_data.append(df) | |
| if flow_data: | |
| flow_df = pl.concat(flow_data) | |
| flow_path = output_dir / "entsoe_flows_2024_2025.parquet" | |
| flow_df.write_parquet(flow_path) | |
| results['flows'] = flow_path | |
| print(f"✅ Flows: {flow_df.shape[0]:,} records → {flow_path}") | |
| print() | |
| print("=" * 70) | |
| print("ENTSO-E Collection Complete") | |
| print("=" * 70) | |
| print(f"Total API requests made: {self.request_count}") | |
| print(f"Files created: {len(results)}") | |
| for data_type, path in results.items(): | |
| file_size = path.stat().st_size / (1024**2) | |
| print(f" - {data_type}: {file_size:.1f} MB") | |
| return results | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Collect ENTSO-E data with proper rate limiting") | |
| parser.add_argument( | |
| '--start-date', | |
| default='2024-10-01', | |
| help='Start date (YYYY-MM-DD)' | |
| ) | |
| parser.add_argument( | |
| '--end-date', | |
| default='2025-09-30', | |
| help='End date (YYYY-MM-DD)' | |
| ) | |
| parser.add_argument( | |
| '--output-dir', | |
| type=Path, | |
| default=Path('data/raw'), | |
| help='Output directory for Parquet files' | |
| ) | |
| parser.add_argument( | |
| '--requests-per-minute', | |
| type=int, | |
| default=27, | |
| help='Requests per minute (default: 27 = 45%% of 60 limit)' | |
| ) | |
| args = parser.parse_args() | |
| # Initialize collector and run | |
| collector = EntsoECollector(requests_per_minute=args.requests_per_minute) | |
| collector.collect_all( | |
| start_date=args.start_date, | |
| end_date=args.end_date, | |
| output_dir=args.output_dir | |
| ) | |