Spaces:
Sleeping
Sleeping
File size: 7,111 Bytes
27cb60a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
"""Collect LTA and Net Positions data for 24 months (Oct 2023 - Sept 2025)."""
import sys
from pathlib import Path
from datetime import datetime, timedelta
import polars as pl
import time
from requests.exceptions import HTTPError
# Add src to path
sys.path.insert(0, str(Path.cwd() / 'src'))
from data_collection.collect_jao import JAOCollector
def collect_lta_monthly(collector, start_date, end_date):
"""Collect LTA data month by month (API doesn't support long ranges).
Implements JAO API rate limiting:
- 100 requests/minute limit
- 1 second between requests (60 req/min with safety margin)
- Exponential backoff on 429 errors
"""
import pandas as pd
all_lta_data = []
# Generate monthly date ranges
current_start = pd.Timestamp(start_date)
end_ts = pd.Timestamp(end_date)
month_count = 0
while current_start <= end_ts:
# Calculate month end
if current_start.month == 12:
current_end = current_start.replace(year=current_start.year + 1, month=1, day=1) - timedelta(days=1)
else:
current_end = current_start.replace(month=current_start.month + 1, day=1) - timedelta(days=1)
# Don't go past final end date
if current_end > end_ts:
current_end = end_ts
month_count += 1
print(f" Month {month_count}/24: {current_start.date()} to {current_end.date()}...", end=" ", flush=True)
# Retry logic with exponential backoff
max_retries = 5
base_delay = 60 # Start with 60s on 429 error
for attempt in range(max_retries):
try:
# Rate limiting: 1 second between all requests
time.sleep(1)
# Query LTA for this month
pd_start = pd.Timestamp(current_start, tz='UTC')
pd_end = pd.Timestamp(current_end, tz='UTC')
df = collector.client.query_lta(pd_start, pd_end)
if df is not None and not df.empty:
# CRITICAL: Reset index to preserve datetime (mtu) as column
all_lta_data.append(pl.from_pandas(df.reset_index()))
print(f"{len(df):,} records")
else:
print("No data")
# Success - break retry loop
break
except HTTPError as e:
if e.response.status_code == 429:
# Rate limited - exponential backoff
wait_time = base_delay * (2 ** attempt)
print(f"Rate limited (429), waiting {wait_time}s... ", end="", flush=True)
time.sleep(wait_time)
if attempt < max_retries - 1:
print(f"Retrying ({attempt + 2}/{max_retries})...", end=" ", flush=True)
else:
print(f"Failed after {max_retries} attempts")
else:
# Other HTTP error - don't retry
print(f"Failed: {e}")
break
except Exception as e:
# Non-HTTP error
print(f"Failed: {e}")
break
# Move to next month
if current_start.month == 12:
current_start = current_start.replace(year=current_start.year + 1, month=1, day=1)
else:
current_start = current_start.replace(month=current_start.month + 1, day=1)
# Combine all monthly data
if all_lta_data:
combined = pl.concat(all_lta_data, how='vertical')
print(f"\n Combined: {len(combined):,} total records")
return combined
else:
return None
def main():
"""Collect LTA and Net Positions for complete 24-month period."""
print("\n" + "=" * 80)
print("JAO LTA + NET POSITIONS COLLECTION - 24 MONTHS")
print("=" * 80)
print("Period: October 2023 - September 2025")
print("=" * 80)
print()
# Initialize collector
collector = JAOCollector()
# Date range (matches Phase 1 SPARSE collection)
start_date = '2023-10-01'
end_date = '2025-09-30'
# Output directory
output_dir = Path('data/raw/phase1_24month')
output_dir.mkdir(parents=True, exist_ok=True)
start_time = datetime.now()
# =========================================================================
# DATASET 1: LTA (Long Term Allocations)
# =========================================================================
print("\n" + "=" * 80)
print("DATASET 1/2: LTA (Long Term Allocations)")
print("=" * 80)
print("Collecting monthly (API limitation)...")
print()
lta_output = output_dir / 'jao_lta.parquet'
try:
lta_df = collect_lta_monthly(collector, start_date, end_date)
if lta_df is not None:
# Save to parquet
lta_df.write_parquet(lta_output)
print(f"\n[OK] LTA collection successful: {len(lta_df):,} records")
print(f"[OK] Saved to: {lta_output}")
print(f"[OK] File size: {lta_output.stat().st_size / (1024**2):.2f} MB")
else:
print(f"\n[WARNING] LTA collection returned no data")
except Exception as e:
print(f"\n[ERROR] LTA collection failed: {e}")
import traceback
traceback.print_exc()
# =========================================================================
# DATASET 2: NET POSITIONS (Domain Boundaries)
# =========================================================================
print("\n" + "=" * 80)
print("DATASET 2/2: NET POSITIONS (Domain Boundaries)")
print("=" * 80)
print()
netpos_output = output_dir / 'jao_net_positions.parquet'
try:
netpos_df = collector.collect_net_positions_sample(
start_date=start_date,
end_date=end_date,
output_path=netpos_output
)
if netpos_df is not None:
print(f"\n[OK] Net Positions collection successful: {len(netpos_df):,} records")
else:
print(f"\n[WARNING] Net Positions collection returned no data")
except Exception as e:
print(f"\n[ERROR] Net Positions collection failed: {e}")
import traceback
traceback.print_exc()
# =========================================================================
# SUMMARY
# =========================================================================
elapsed = datetime.now() - start_time
print("\n" + "=" * 80)
print("COLLECTION COMPLETE")
print("=" * 80)
print(f"Total time: {elapsed}")
print()
print("Files created:")
if lta_output.exists():
print(f" [OK] {lta_output}")
print(f" Size: {lta_output.stat().st_size / (1024**2):.2f} MB")
else:
print(f" [MISSING] {lta_output}")
if netpos_output.exists():
print(f" [OK] {netpos_output}")
print(f" Size: {netpos_output.stat().st_size / (1024**2):.2f} MB")
else:
print(f" [MISSING] {netpos_output}")
print("=" * 80)
if __name__ == '__main__':
main()
|