Spaces:
Sleeping
Sleeping
File size: 4,974 Bytes
27cb60a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
"""Recover October 27-31, 2023 LTA data using day-by-day collection.
October 2023 has DST transition on Sunday, Oct 29 at 03:00 CET.
This script collects each day individually to avoid any DST ambiguity.
"""
import sys
from pathlib import Path
from datetime import datetime, timedelta
import polars as pl
import time
from requests.exceptions import HTTPError
# Add src to path
sys.path.insert(0, str(Path.cwd() / 'src'))
from data_collection.collect_jao import JAOCollector
def collect_single_day(collector, date_str: str):
"""Collect LTA data for a single day.
Args:
collector: JAOCollector instance
date_str: Date in YYYY-MM-DD format
Returns:
Polars DataFrame with day's LTA data, or None if failed
"""
import pandas as pd
print(f" Day {date_str}...", end=" ", flush=True)
# Retry logic
max_retries = 5
base_delay = 60
for attempt in range(max_retries):
try:
# Rate limiting: 1 second between requests
time.sleep(1)
# Convert to pandas Timestamp with UTC timezone
pd_date = pd.Timestamp(date_str, tz='UTC')
# Query LTA for this single day
df = collector.client.query_lta(pd_date, pd_date)
if df is not None and not df.empty:
print(f"{len(df):,} records")
# CRITICAL: Reset index to preserve datetime (mtu) as column
return pl.from_pandas(df.reset_index())
else:
print("No data")
return None
except HTTPError as e:
if e.response.status_code == 429:
wait_time = base_delay * (2 ** attempt)
print(f"Rate limited, waiting {wait_time}s... ", end="", flush=True)
time.sleep(wait_time)
if attempt < max_retries - 1:
print(f"Retrying... ", end="", flush=True)
else:
print(f"Failed after {max_retries} attempts")
return None
else:
print(f"Failed: {e}")
return None
except Exception as e:
print(f"Failed: {e}")
return None
def main():
"""Recover October 27-31, 2023 LTA data day by day."""
print("\n" + "=" * 80)
print("OCTOBER 27-31, 2023 LTA RECOVERY - DAY-BY-DAY")
print("=" * 80)
print("Strategy: Collect each day individually to avoid DST issues")
print("=" * 80)
# Initialize collector
collector = JAOCollector()
start_time = datetime.now()
# Days to recover
days = [
"2023-10-27",
"2023-10-28",
"2023-10-29", # DST transition day
"2023-10-30",
"2023-10-31",
]
print(f"\nCollecting {len(days)} days:")
all_data = []
for day in days:
day_df = collect_single_day(collector, day)
if day_df is not None:
all_data.append(day_df)
# Combine daily data
if not all_data:
print("\n[ERROR] No data collected for any day")
return
combined = pl.concat(all_data, how='vertical')
print(f"\nCombined Oct 27-31, 2023: {len(combined):,} records")
# =========================================================================
# MERGE WITH EXISTING DATA
# =========================================================================
print("\n" + "=" * 80)
print("MERGING WITH EXISTING LTA DATA")
print("=" * 80)
existing_path = Path('data/raw/phase1_24month/jao_lta.parquet')
if not existing_path.exists():
print(f"[ERROR] Existing LTA file not found: {existing_path}")
return
# Read existing data
existing_df = pl.read_parquet(existing_path)
print(f"\nExisting data: {len(existing_df):,} records")
# Backup existing file (create new backup)
backup_path = existing_path.with_name('jao_lta.parquet.backup2')
existing_df.write_parquet(backup_path)
print(f"Backup created: {backup_path}")
# Merge
merged_df = pl.concat([existing_df, combined], how='vertical')
# Deduplicate if needed
if 'datetime' in merged_df.columns or 'timestamp' in merged_df.columns:
initial_count = len(merged_df)
merged_df = merged_df.unique()
deduped = initial_count - len(merged_df)
if deduped > 0:
print(f"\nRemoved {deduped} duplicate records")
# Save
merged_df.write_parquet(existing_path)
print("\n" + "=" * 80)
print("RECOVERY COMPLETE")
print("=" * 80)
print(f"Original records: {len(existing_df):,}")
print(f"Recovered records: {len(combined):,}")
print(f"Total records: {len(merged_df):,}")
print(f"File: {existing_path}")
print(f"Size: {existing_path.stat().st_size / (1024**2):.2f} MB")
print(f"Backup: {backup_path}")
elapsed = datetime.now() - start_time
print(f"\nTotal time: {elapsed}")
print("=" * 80)
if __name__ == '__main__':
main()
|