fbmc-chronos2 / src /data_collection /collect_entsoe.py
Evgueni Poloukarov
feat: Day 0 - Initialize FBMC Flow Forecasting MVP
4202f60
raw
history blame
13.5 kB
"""ENTSO-E Transparency Platform Data Collection with Rate Limiting
Collects generation, load, and cross-border flow data from ENTSO-E API.
Implements proper rate limiting to avoid temporary bans.
ENTSO-E Rate Limits (OFFICIAL):
- 60 requests per 60 seconds (hard limit - exceeding triggers 10-min ban)
- Screen scraping >60 requests/min leads to temporary IP ban
Strategy:
- 27 requests/minute (45% of 60 limit - safe)
- 1 request every ~2.2 seconds
- Request data in monthly chunks to minimize API calls
"""
import polars as pl
from pathlib import Path
from datetime import datetime, timedelta
from dotenv import load_dotenv
import os
import time
from typing import List, Tuple
from tqdm import tqdm
from entsoe import EntsoePandasClient
import pandas as pd
# Load environment variables
load_dotenv()
# FBMC Bidding Zones (12 zones from project plan)
BIDDING_ZONES = {
'AT': 'Austria',
'BE': 'Belgium',
'HR': 'Croatia',
'CZ': 'Czech Republic',
'FR': 'France',
'DE_LU': 'Germany-Luxembourg',
'HU': 'Hungary',
'NL': 'Netherlands',
'PL': 'Poland',
'RO': 'Romania',
'SK': 'Slovakia',
'SI': 'Slovenia',
}
# FBMC Cross-Border Flows (~20 major borders)
BORDERS = [
('DE_LU', 'NL'),
('DE_LU', 'FR'),
('DE_LU', 'BE'),
('DE_LU', 'AT'),
('DE_LU', 'CZ'),
('DE_LU', 'PL'),
('FR', 'BE'),
('FR', 'ES'), # External but affects FBMC
('FR', 'CH'), # External but affects FBMC
('AT', 'CZ'),
('AT', 'HU'),
('AT', 'SI'),
('AT', 'CH'), # External but affects FBMC
('CZ', 'SK'),
('CZ', 'PL'),
('HU', 'SK'),
('HU', 'RO'),
('HU', 'HR'),
('SI', 'HR'),
('PL', 'SK'),
('PL', 'CZ'),
]
class EntsoECollector:
"""Collect ENTSO-E data with proper rate limiting."""
def __init__(self, requests_per_minute: int = 27):
"""Initialize collector with rate limiting.
Args:
requests_per_minute: Max requests per minute (default: 27 = 45% of 60 limit)
"""
api_key = os.getenv('ENTSOE_API_KEY')
if not api_key or 'your_entsoe' in api_key.lower():
raise ValueError("ENTSO-E API key not configured in .env file")
self.client = EntsoePandasClient(api_key=api_key)
self.requests_per_minute = requests_per_minute
self.delay_seconds = 60.0 / requests_per_minute
self.request_count = 0
print(f"ENTSO-E Collector initialized")
print(f"Rate limit: {self.requests_per_minute} requests/minute")
print(f"Delay between requests: {self.delay_seconds:.2f}s")
def _rate_limit(self):
"""Apply rate limiting delay."""
time.sleep(self.delay_seconds)
self.request_count += 1
def _generate_monthly_chunks(
self,
start_date: str,
end_date: str
) -> List[Tuple[pd.Timestamp, pd.Timestamp]]:
"""Generate monthly date chunks for API requests.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
List of (start, end) timestamp tuples
"""
start_dt = pd.Timestamp(start_date, tz='UTC')
end_dt = pd.Timestamp(end_date, tz='UTC')
chunks = []
current = start_dt
while current < end_dt:
# Get end of month or end_date, whichever is earlier
month_end = (current + pd.offsets.MonthEnd(0))
chunk_end = min(month_end, end_dt)
chunks.append((current, chunk_end))
current = chunk_end + pd.Timedelta(hours=1)
return chunks
def collect_generation_per_type(
self,
zone: str,
start_date: str,
end_date: str
) -> pl.DataFrame:
"""Collect generation by production type for a bidding zone.
Args:
zone: Bidding zone code (e.g., 'DE_LU', 'FR')
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Polars DataFrame with generation data
"""
chunks = self._generate_monthly_chunks(start_date, end_date)
all_data = []
for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} generation", leave=False):
try:
# Fetch generation data
df = self.client.query_generation(
zone,
start=start_chunk,
end=end_chunk,
psr_type=None # Get all production types
)
if df is not None and not df.empty:
# Convert to long format
df_reset = df.reset_index()
df_melted = df_reset.melt(
id_vars=['index'],
var_name='production_type',
value_name='generation_mw'
)
df_melted = df_melted.rename(columns={'index': 'timestamp'})
df_melted['zone'] = zone
# Convert to Polars
pl_df = pl.from_pandas(df_melted)
all_data.append(pl_df)
self._rate_limit()
except Exception as e:
print(f" ❌ Failed {zone} {start_chunk.date()} to {end_chunk.date()}: {e}")
self._rate_limit()
continue
if all_data:
return pl.concat(all_data)
else:
return pl.DataFrame()
def collect_load(
self,
zone: str,
start_date: str,
end_date: str
) -> pl.DataFrame:
"""Collect load (demand) data for a bidding zone.
Args:
zone: Bidding zone code
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Polars DataFrame with load data
"""
chunks = self._generate_monthly_chunks(start_date, end_date)
all_data = []
for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} load", leave=False):
try:
# Fetch load data
series = self.client.query_load(
zone,
start=start_chunk,
end=end_chunk
)
if series is not None and not series.empty:
df = pd.DataFrame({
'timestamp': series.index,
'load_mw': series.values,
'zone': zone
})
pl_df = pl.from_pandas(df)
all_data.append(pl_df)
self._rate_limit()
except Exception as e:
print(f" ❌ Failed {zone} {start_chunk.date()} to {end_chunk.date()}: {e}")
self._rate_limit()
continue
if all_data:
return pl.concat(all_data)
else:
return pl.DataFrame()
def collect_cross_border_flows(
self,
from_zone: str,
to_zone: str,
start_date: str,
end_date: str
) -> pl.DataFrame:
"""Collect cross-border flow data between two zones.
Args:
from_zone: From bidding zone
to_zone: To bidding zone
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Polars DataFrame with flow data
"""
chunks = self._generate_monthly_chunks(start_date, end_date)
all_data = []
border_id = f"{from_zone}_{to_zone}"
for start_chunk, end_chunk in tqdm(chunks, desc=f" {border_id}", leave=False):
try:
# Fetch cross-border flow
series = self.client.query_crossborder_flows(
from_zone,
to_zone,
start=start_chunk,
end=end_chunk
)
if series is not None and not series.empty:
df = pd.DataFrame({
'timestamp': series.index,
'flow_mw': series.values,
'from_zone': from_zone,
'to_zone': to_zone,
'border': border_id
})
pl_df = pl.from_pandas(df)
all_data.append(pl_df)
self._rate_limit()
except Exception as e:
print(f" ❌ Failed {border_id} {start_chunk.date()} to {end_chunk.date()}: {e}")
self._rate_limit()
continue
if all_data:
return pl.concat(all_data)
else:
return pl.DataFrame()
def collect_all(
self,
start_date: str,
end_date: str,
output_dir: Path
) -> dict:
"""Collect all ENTSO-E data with rate limiting.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
output_dir: Directory to save Parquet files
Returns:
Dictionary with paths to saved files
"""
output_dir.mkdir(parents=True, exist_ok=True)
# Calculate total requests
months = len(self._generate_monthly_chunks(start_date, end_date))
total_requests = (
len(BIDDING_ZONES) * months * 2 + # Generation + load
len(BORDERS) * months # Flows
)
estimated_minutes = total_requests / self.requests_per_minute
print("=" * 70)
print("ENTSO-E Data Collection")
print("=" * 70)
print(f"Date range: {start_date} to {end_date}")
print(f"Bidding zones: {len(BIDDING_ZONES)}")
print(f"Cross-border flows: {len(BORDERS)}")
print(f"Monthly chunks: {months}")
print(f"Total requests: ~{total_requests}")
print(f"Rate limit: {self.requests_per_minute} requests/minute (45% of 60 max)")
print(f"Estimated time: {estimated_minutes:.1f} minutes")
print()
results = {}
# 1. Collect Generation Data
print("[1/3] Collecting generation data by production type...")
generation_data = []
for zone in tqdm(BIDDING_ZONES.keys(), desc="Generation"):
df = self.collect_generation_per_type(zone, start_date, end_date)
if not df.is_empty():
generation_data.append(df)
if generation_data:
generation_df = pl.concat(generation_data)
gen_path = output_dir / "entsoe_generation_2024_2025.parquet"
generation_df.write_parquet(gen_path)
results['generation'] = gen_path
print(f"✅ Generation: {generation_df.shape[0]:,} records → {gen_path}")
# 2. Collect Load Data
print("\n[2/3] Collecting load (demand) data...")
load_data = []
for zone in tqdm(BIDDING_ZONES.keys(), desc="Load"):
df = self.collect_load(zone, start_date, end_date)
if not df.is_empty():
load_data.append(df)
if load_data:
load_df = pl.concat(load_data)
load_path = output_dir / "entsoe_load_2024_2025.parquet"
load_df.write_parquet(load_path)
results['load'] = load_path
print(f"✅ Load: {load_df.shape[0]:,} records → {load_path}")
# 3. Collect Cross-Border Flows
print("\n[3/3] Collecting cross-border flows...")
flow_data = []
for from_zone, to_zone in tqdm(BORDERS, desc="Flows"):
df = self.collect_cross_border_flows(from_zone, to_zone, start_date, end_date)
if not df.is_empty():
flow_data.append(df)
if flow_data:
flow_df = pl.concat(flow_data)
flow_path = output_dir / "entsoe_flows_2024_2025.parquet"
flow_df.write_parquet(flow_path)
results['flows'] = flow_path
print(f"✅ Flows: {flow_df.shape[0]:,} records → {flow_path}")
print()
print("=" * 70)
print("ENTSO-E Collection Complete")
print("=" * 70)
print(f"Total API requests made: {self.request_count}")
print(f"Files created: {len(results)}")
for data_type, path in results.items():
file_size = path.stat().st_size / (1024**2)
print(f" - {data_type}: {file_size:.1f} MB")
return results
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Collect ENTSO-E data with proper rate limiting")
parser.add_argument(
'--start-date',
default='2024-10-01',
help='Start date (YYYY-MM-DD)'
)
parser.add_argument(
'--end-date',
default='2025-09-30',
help='End date (YYYY-MM-DD)'
)
parser.add_argument(
'--output-dir',
type=Path,
default=Path('data/raw'),
help='Output directory for Parquet files'
)
parser.add_argument(
'--requests-per-minute',
type=int,
default=27,
help='Requests per minute (default: 27 = 45%% of 60 limit)'
)
args = parser.parse_args()
# Initialize collector and run
collector = EntsoECollector(requests_per_minute=args.requests_per_minute)
collector.collect_all(
start_date=args.start_date,
end_date=args.end_date,
output_dir=args.output_dir
)