"""Recover October 2023 & 2024 LTA data with DST-safe date ranges. The main collection failed for October due to DST transitions: - October 2023: DST transition on Sunday, Oct 29 - October 2024: DST transition on Sunday, Oct 27 This script collects October in 2 chunks to avoid DST hour ambiguity: - Chunk 1: Oct 1-26 (before DST weekend) - Chunk 2: Oct 27-31 (after/including DST transition) """ import sys from pathlib import Path from datetime import datetime import polars as pl import time from requests.exceptions import HTTPError # Add src to path sys.path.insert(0, str(Path.cwd() / 'src')) from data_collection.collect_jao import JAOCollector def collect_october_split(collector, year: int, month: int = 10): """Collect October LTA data in 2 chunks to avoid DST issues. Args: collector: JAOCollector instance year: Year to collect (2023 or 2024) month: Month (default 10 for October) Returns: Polars DataFrame with October LTA data, or None if failed """ import pandas as pd print(f"\n{'=' * 70}") print(f"COLLECTING OCTOBER {year} LTA (DST-Safe)") print(f"{'=' * 70}") all_data = [] # Define date chunks that avoid DST transition chunks = [ (f"{year}-10-01", f"{year}-10-26"), # Before DST weekend (f"{year}-10-27", f"{year}-10-31"), # After/including DST ] for chunk_num, (start_date, end_date) in enumerate(chunks, 1): print(f"\n Chunk {chunk_num}/2: {start_date} to {end_date}...", end=" ", flush=True) # Retry logic with exponential backoff max_retries = 5 base_delay = 60 success = False for attempt in range(max_retries): try: # Rate limiting: 1 second between requests time.sleep(1) # Convert to pandas Timestamps with UTC timezone pd_start = pd.Timestamp(start_date, tz='UTC') pd_end = pd.Timestamp(end_date, tz='UTC') # Query LTA for this chunk df = collector.client.query_lta(pd_start, pd_end) if df is not None and not df.empty: # CRITICAL: Reset index to preserve datetime (mtu) as column all_data.append(pl.from_pandas(df.reset_index())) print(f"{len(df):,} records") success = True break else: print("No data") success = True break except HTTPError as e: if e.response.status_code == 429: # Rate limited - exponential backoff wait_time = base_delay * (2 ** attempt) print(f"Rate limited (429), waiting {wait_time}s... ", end="", flush=True) time.sleep(wait_time) if attempt < max_retries - 1: print(f"Retrying ({attempt + 2}/{max_retries})...", end=" ", flush=True) else: print(f"Failed after {max_retries} attempts") else: # Other HTTP error print(f"Failed: {e}") break except Exception as e: print(f"Failed: {e}") break # Combine chunks if all_data: combined = pl.concat(all_data, how='vertical') print(f"\n Combined October {year}: {len(combined):,} records") return combined else: print(f"\n [WARNING] No data collected for October {year}") return None def main(): """Recover October 2023 and 2024 LTA data.""" print("\n" + "=" * 80) print("OCTOBER LTA RECOVERY - DST-SAFE COLLECTION") print("=" * 80) print("Target: October 2023 & October 2024") print("Strategy: Split around DST transition dates") print("=" * 80) # Initialize collector collector = JAOCollector() start_time = datetime.now() # Collect October 2023 oct_2023 = collect_october_split(collector, 2023) # Collect October 2024 oct_2024 = collect_october_split(collector, 2024) # ========================================================================= # MERGE WITH EXISTING DATA # ========================================================================= print("\n" + "=" * 80) print("MERGING WITH EXISTING LTA DATA") print("=" * 80) existing_path = Path('data/raw/phase1_24month/jao_lta.parquet') if not existing_path.exists(): print(f"[ERROR] Existing LTA file not found: {existing_path}") print("Cannot merge. Exiting.") return # Read existing data existing_df = pl.read_parquet(existing_path) print(f"\nExisting data: {len(existing_df):,} records") # Backup existing file backup_path = existing_path.with_suffix('.parquet.backup') existing_df.write_parquet(backup_path) print(f"Backup created: {backup_path}") # Combine all data all_dfs = [existing_df] recovered_count = 0 if oct_2023 is not None: all_dfs.append(oct_2023) recovered_count += len(oct_2023) print(f"+ October 2023: {len(oct_2023):,} records") if oct_2024 is not None: all_dfs.append(oct_2024) recovered_count += len(oct_2024) print(f"+ October 2024: {len(oct_2024):,} records") if recovered_count == 0: print("\n[WARNING] No October data recovered") return # Merge and deduplicate merged_df = pl.concat(all_dfs, how='vertical') # Remove duplicates if any (unlikely but safe) if 'datetime' in merged_df.columns or 'timestamp' in merged_df.columns: time_col = 'datetime' if 'datetime' in merged_df.columns else 'timestamp' initial_count = len(merged_df) merged_df = merged_df.unique() deduped_count = initial_count - len(merged_df) if deduped_count > 0: print(f"\nRemoved {deduped_count} duplicate records") # Save merged data merged_df.write_parquet(existing_path) print("\n" + "=" * 80) print("RECOVERY COMPLETE") print("=" * 80) print(f"Original records: {len(existing_df):,}") print(f"Recovered records: {recovered_count:,}") print(f"Total records: {len(merged_df):,}") print(f"File: {existing_path}") print(f"Size: {existing_path.stat().st_size / (1024**2):.2f} MB") print(f"Backup: {backup_path}") elapsed = datetime.now() - start_time print(f"\nTotal time: {elapsed}") print("=" * 80) if __name__ == '__main__': main()