File size: 7,694 Bytes
d4939ce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""
Convert manually exported Alegro outages to standardized parquet format.

After manually exporting from ENTSO-E web UI, run this script to convert
the CSV/Excel to our standard schema.

Usage:
    python scripts/convert_alegro_manual_export.py data/raw/alegro_manual_export.csv

Expected columns in manual export (may vary):
- Asset Name / Resource Name
- Asset EIC / mRID
- Start Time / Unavailability Start
- End Time / Unavailability End
- Business Type / Type (A53=Planned, A54=Forced)
- Available Capacity / Unavailable Capacity (MW)

Author: Claude + Evgueni Poloukarov
Date: 2025-11-09
"""
import sys
from pathlib import Path
import polars as pl
import pandas as pd


def convert_alegro_export(input_file: Path, output_path: Path) -> pl.DataFrame:
    """
    Convert manually exported Alegro outages to standard schema.

    Args:
        input_file: Path to downloaded CSV/Excel file
        output_path: Path to save standardized parquet

    Returns:
        Standardized outages DataFrame
    """
    print("=" * 80)
    print("CONVERTING MANUAL ALEGRO OUTAGE EXPORT")
    print("=" * 80)
    print(f"\nInput: {input_file}")
    print()

    # Read file (supports both CSV and Excel)
    if input_file.suffix.lower() in ['.csv', '.txt']:
        print("Reading CSV file...")
        df = pl.read_csv(input_file)
    elif input_file.suffix.lower() in ['.xlsx', '.xls']:
        print("Reading Excel file...")
        df_pandas = pd.read_excel(input_file)
        df = pl.from_pandas(df_pandas)
    else:
        raise ValueError(f"Unsupported file format: {input_file.suffix}")

    print(f"  Loaded {len(df)} rows, {len(df.columns)} columns")
    print(f"  Columns: {df.columns}")
    print()

    # Show first few rows to help identify column names
    print("Sample data:")
    print(df.head(3))
    print()

    # Map columns to standard schema (flexible mapping)
    column_mapping = {}

    # Find asset EIC column
    eic_candidates = [c for c in df.columns if any(x in c.lower() for x in ['eic', 'mrid', 'code', 'id'])]
    if eic_candidates:
        column_mapping['asset_eic'] = eic_candidates[0]
        print(f"Mapped asset_eic <- {eic_candidates[0]}")

    # Find asset name column
    name_candidates = [c for c in df.columns if any(x in c.lower() for x in ['name', 'resource', 'asset'])]
    if name_candidates:
        column_mapping['asset_name'] = name_candidates[0]
        print(f"Mapped asset_name <- {name_candidates[0]}")

    # Find start time column
    start_candidates = [c for c in df.columns if any(x in c.lower() for x in ['start', 'begin', 'from'])]
    if start_candidates:
        column_mapping['start_time'] = start_candidates[0]
        print(f"Mapped start_time <- {start_candidates[0]}")

    # Find end time column
    end_candidates = [c for c in df.columns if any(x in c.lower() for x in ['end', 'to', 'until'])]
    if end_candidates:
        column_mapping['end_time'] = end_candidates[0]
        print(f"Mapped end_time <- {end_candidates[0]}")

    # Find business type column
    type_candidates = [c for c in df.columns if any(x in c.lower() for x in ['type', 'business', 'category'])]
    if type_candidates:
        column_mapping['businesstype'] = type_candidates[0]
        print(f"Mapped businesstype <- {type_candidates[0]}")

    # Find capacity column (if available)
    capacity_candidates = [c for c in df.columns if any(x in c.lower() for x in ['capacity', 'mw', 'power'])]
    if capacity_candidates:
        column_mapping['capacity_mw'] = capacity_candidates[0]
        print(f"Mapped capacity_mw <- {capacity_candidates[0]}")

    print()

    if not column_mapping:
        print("[ERROR] Could not automatically map columns!")
        print("Please manually map columns in the script.")
        print()
        print("Available columns:")
        for i, col in enumerate(df.columns, 1):
            print(f"  {i}. {col}")
        sys.exit(1)

    # Rename columns
    df_renamed = df.select([
        pl.col(original).alias(standard) if original in df.columns else pl.lit(None).alias(standard)
        for standard, original in column_mapping.items()
    ])

    # Add missing columns with defaults
    required_columns = {
        'asset_eic': pl.Utf8,
        'asset_name': pl.Utf8,
        'start_time': pl.Datetime,
        'end_time': pl.Datetime,
        'businesstype': pl.Utf8,
        'from_zone': pl.Utf8,
        'to_zone': pl.Utf8,
        'border': pl.Utf8
    }

    for col, dtype in required_columns.items():
        if col not in df_renamed.columns:
            if dtype == pl.Datetime:
                df_renamed = df_renamed.with_columns(pl.lit(None).cast(pl.Datetime).alias(col))
            else:
                df_renamed = df_renamed.with_columns(pl.lit(None).cast(dtype).alias(col))

    # Set known values for Alegro
    df_renamed = df_renamed.with_columns([
        pl.lit('BE').alias('from_zone'),
        pl.lit('DE').alias('to_zone'),
        pl.lit('BE_DE').alias('border')
    ])

    # Parse timestamps if they're strings
    if df_renamed['start_time'].dtype == pl.Utf8:
        df_renamed = df_renamed.with_columns(
            pl.col('start_time').str.to_datetime().alias('start_time')
        )

    if df_renamed['end_time'].dtype == pl.Utf8:
        df_renamed = df_renamed.with_columns(
            pl.col('end_time').str.to_datetime().alias('end_time')
        )

    # Filter to only future outages (forward-looking for forecasting)
    now = pd.Timestamp.now(tz='UTC')
    df_future = df_renamed.filter(pl.col('end_time') > now)

    print("=" * 80)
    print("CONVERSION SUMMARY")
    print("=" * 80)
    print(f"Total outages in export: {len(df_renamed)}")
    print(f"Future outages (for forecasting): {len(df_future)}")
    print()

    # Show business type breakdown
    if 'businesstype' in df_renamed.columns:
        type_counts = df_renamed.group_by('businesstype').agg(pl.len().alias('count'))
        print("Business Type breakdown:")
        for row in type_counts.iter_rows(named=True):
            print(f"  {row['businesstype']}: {row['count']} outages")
        print()

    # Save both full and future-only versions
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Save all outages
    df_renamed.write_parquet(output_path)
    print(f"[SAVED ALL] {output_path} ({len(df_renamed)} outages)")

    # Save future outages separately
    future_path = output_path.parent / output_path.name.replace('.parquet', '_future.parquet')
    df_future.write_parquet(future_path)
    print(f"[SAVED FUTURE] {future_path} ({len(df_future)} outages)")

    print()
    print("=" * 80)
    print("[SUCCESS] Alegro outages converted successfully!")
    print("=" * 80)
    print()
    print("Next steps:")
    print("1. Verify the data looks correct:")
    print(f"   python -c \"import polars as pl; print(pl.read_parquet('{output_path}'))\"")
    print("2. Integrate into feature engineering pipeline")
    print()

    return df_renamed


def main():
    """Main execution."""
    if len(sys.argv) < 2:
        print("Usage: python scripts/convert_alegro_manual_export.py <input_file>")
        print()
        print("Example:")
        print("  python scripts/convert_alegro_manual_export.py data/raw/alegro_manual_export.csv")
        print()
        sys.exit(1)

    input_file = Path(sys.argv[1])
    if not input_file.exists():
        print(f"[ERROR] File not found: {input_file}")
        sys.exit(1)

    # Output path
    base_dir = Path.cwd()
    output_path = base_dir / 'data' / 'raw' / 'alegro_hvdc_outages_24month.parquet'

    # Convert
    outages = convert_alegro_export(input_file, output_path)


if __name__ == '__main__':
    main()