Spaces:
Sleeping
Sleeping
| """Recover October 27-31, 2023 LTA data using day-by-day collection. | |
| October 2023 has DST transition on Sunday, Oct 29 at 03:00 CET. | |
| This script collects each day individually to avoid any DST ambiguity. | |
| """ | |
| import sys | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| import polars as pl | |
| import time | |
| from requests.exceptions import HTTPError | |
| # Add src to path | |
| sys.path.insert(0, str(Path.cwd() / 'src')) | |
| from data_collection.collect_jao import JAOCollector | |
| def collect_single_day(collector, date_str: str): | |
| """Collect LTA data for a single day. | |
| Args: | |
| collector: JAOCollector instance | |
| date_str: Date in YYYY-MM-DD format | |
| Returns: | |
| Polars DataFrame with day's LTA data, or None if failed | |
| """ | |
| import pandas as pd | |
| print(f" Day {date_str}...", end=" ", flush=True) | |
| # Retry logic | |
| max_retries = 5 | |
| base_delay = 60 | |
| for attempt in range(max_retries): | |
| try: | |
| # Rate limiting: 1 second between requests | |
| time.sleep(1) | |
| # Convert to pandas Timestamp with UTC timezone | |
| pd_date = pd.Timestamp(date_str, tz='UTC') | |
| # Query LTA for this single day | |
| df = collector.client.query_lta(pd_date, pd_date) | |
| if df is not None and not df.empty: | |
| print(f"{len(df):,} records") | |
| # CRITICAL: Reset index to preserve datetime (mtu) as column | |
| return pl.from_pandas(df.reset_index()) | |
| else: | |
| print("No data") | |
| return None | |
| except HTTPError as e: | |
| if e.response.status_code == 429: | |
| wait_time = base_delay * (2 ** attempt) | |
| print(f"Rate limited, waiting {wait_time}s... ", end="", flush=True) | |
| time.sleep(wait_time) | |
| if attempt < max_retries - 1: | |
| print(f"Retrying... ", end="", flush=True) | |
| else: | |
| print(f"Failed after {max_retries} attempts") | |
| return None | |
| else: | |
| print(f"Failed: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"Failed: {e}") | |
| return None | |
| def main(): | |
| """Recover October 27-31, 2023 LTA data day by day.""" | |
| print("\n" + "=" * 80) | |
| print("OCTOBER 27-31, 2023 LTA RECOVERY - DAY-BY-DAY") | |
| print("=" * 80) | |
| print("Strategy: Collect each day individually to avoid DST issues") | |
| print("=" * 80) | |
| # Initialize collector | |
| collector = JAOCollector() | |
| start_time = datetime.now() | |
| # Days to recover | |
| days = [ | |
| "2023-10-27", | |
| "2023-10-28", | |
| "2023-10-29", # DST transition day | |
| "2023-10-30", | |
| "2023-10-31", | |
| ] | |
| print(f"\nCollecting {len(days)} days:") | |
| all_data = [] | |
| for day in days: | |
| day_df = collect_single_day(collector, day) | |
| if day_df is not None: | |
| all_data.append(day_df) | |
| # Combine daily data | |
| if not all_data: | |
| print("\n[ERROR] No data collected for any day") | |
| return | |
| combined = pl.concat(all_data, how='vertical') | |
| print(f"\nCombined Oct 27-31, 2023: {len(combined):,} records") | |
| # ========================================================================= | |
| # MERGE WITH EXISTING DATA | |
| # ========================================================================= | |
| print("\n" + "=" * 80) | |
| print("MERGING WITH EXISTING LTA DATA") | |
| print("=" * 80) | |
| existing_path = Path('data/raw/phase1_24month/jao_lta.parquet') | |
| if not existing_path.exists(): | |
| print(f"[ERROR] Existing LTA file not found: {existing_path}") | |
| return | |
| # Read existing data | |
| existing_df = pl.read_parquet(existing_path) | |
| print(f"\nExisting data: {len(existing_df):,} records") | |
| # Backup existing file (create new backup) | |
| backup_path = existing_path.with_name('jao_lta.parquet.backup2') | |
| existing_df.write_parquet(backup_path) | |
| print(f"Backup created: {backup_path}") | |
| # Merge | |
| merged_df = pl.concat([existing_df, combined], how='vertical') | |
| # Deduplicate if needed | |
| if 'datetime' in merged_df.columns or 'timestamp' in merged_df.columns: | |
| initial_count = len(merged_df) | |
| merged_df = merged_df.unique() | |
| deduped = initial_count - len(merged_df) | |
| if deduped > 0: | |
| print(f"\nRemoved {deduped} duplicate records") | |
| # Save | |
| merged_df.write_parquet(existing_path) | |
| print("\n" + "=" * 80) | |
| print("RECOVERY COMPLETE") | |
| print("=" * 80) | |
| print(f"Original records: {len(existing_df):,}") | |
| print(f"Recovered records: {len(combined):,}") | |
| print(f"Total records: {len(merged_df):,}") | |
| print(f"File: {existing_path}") | |
| print(f"Size: {existing_path.stat().st_size / (1024**2):.2f} MB") | |
| print(f"Backup: {backup_path}") | |
| elapsed = datetime.now() - start_time | |
| print(f"\nTotal time: {elapsed}") | |
| print("=" * 80) | |
| if __name__ == '__main__': | |
| main() | |