Spaces:
Sleeping
Sleeping
| """Mask missing October 27-31, 2023 LTA data using forward fill from October 26. | |
| Missing data: October 27-31, 2023 (~145 records, 0.5% of dataset) | |
| Strategy: Forward fill LTA values from October 26, 2023 | |
| Rationale: LTA (Long Term Allocations) change infrequently, forward fill is conservative | |
| """ | |
| import sys | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| import polars as pl | |
| def main(): | |
| """Forward fill missing October 27-31, 2023 LTA data.""" | |
| print("\n" + "=" * 80) | |
| print("OCTOBER 27-31, 2023 LTA MASKING") | |
| print("=" * 80) | |
| print("Strategy: Forward fill from October 26, 2023") | |
| print("Missing data: ~145 records (0.5% of dataset)") | |
| print("=" * 80) | |
| print() | |
| # ========================================================================= | |
| # 1. Load existing LTA data | |
| # ========================================================================= | |
| lta_path = Path('data/raw/phase1_24month/jao_lta.parquet') | |
| if not lta_path.exists(): | |
| print(f"[ERROR] LTA file not found: {lta_path}") | |
| return | |
| print("Loading existing LTA data...") | |
| lta_df = pl.read_parquet(lta_path) | |
| print(f" Current records: {len(lta_df):,}") | |
| print(f" Columns: {lta_df.columns}") | |
| print() | |
| # Backup existing file | |
| backup_path = lta_path.with_name('jao_lta.parquet.backup3') | |
| lta_df.write_parquet(backup_path) | |
| print(f"Backup created: {backup_path}") | |
| print() | |
| # ========================================================================= | |
| # 2. Identify October 26, 2023 data (source for forward fill) | |
| # ========================================================================= | |
| print("Extracting October 26, 2023 data...") | |
| # Use 'mtu' (Market Time Unit) timestamp column | |
| time_col = 'mtu' | |
| if time_col not in lta_df.columns: | |
| print(f"[ERROR] No 'mtu' timestamp column found. Available columns: {lta_df.columns}") | |
| return | |
| print(f" Using timestamp column: '{time_col}'") | |
| # Convert to datetime if string | |
| if lta_df[time_col].dtype == pl.Utf8: | |
| lta_df = lta_df.with_columns([ | |
| pl.col(time_col).str.strptime(pl.Datetime, format="%Y-%m-%d %H:%M:%S").alias(time_col) | |
| ]) | |
| # Filter October 26, 2023 data | |
| oct_26_data = lta_df.filter( | |
| (pl.col(time_col).dt.year() == 2023) & | |
| (pl.col(time_col).dt.month() == 10) & | |
| (pl.col(time_col).dt.day() == 26) | |
| ) | |
| print(f" October 26, 2023 records: {len(oct_26_data)}") | |
| if len(oct_26_data) == 0: | |
| print("[ERROR] No October 26, 2023 data found to use for masking") | |
| return | |
| print() | |
| # ========================================================================= | |
| # 3. Generate masked records for October 27-31, 2023 | |
| # ========================================================================= | |
| print("Generating masked records for October 27-31, 2023...") | |
| all_masked_records = [] | |
| missing_days = [27, 28, 29, 30, 31] | |
| for day in missing_days: | |
| # Create masked records by copying Oct 26 data and updating timestamp | |
| masked_day = oct_26_data.clone() | |
| # Calculate time delta (1 day, 2 days, etc.) | |
| days_delta = day - 26 | |
| # Update timestamp (preserve dtype) | |
| masked_day = masked_day.with_columns([ | |
| (pl.col(time_col) + pl.duration(days=days_delta)).cast(lta_df[time_col].dtype).alias(time_col) | |
| ]) | |
| # Add masking flag | |
| masked_day = masked_day.with_columns([ | |
| pl.lit(True).alias('is_masked'), | |
| pl.lit('forward_fill_oct26').alias('masking_method') | |
| ]) | |
| all_masked_records.append(masked_day) | |
| print(f" Day {day}: {len(masked_day)} records (forward filled from Oct 26)") | |
| # Combine all masked records | |
| masked_df = pl.concat(all_masked_records, how='vertical') | |
| print(f"\n Total masked records: {len(masked_df):,}") | |
| print() | |
| # ========================================================================= | |
| # 4. Add masking flags to existing data | |
| # ========================================================================= | |
| print("Adding masking flags to existing data...") | |
| # Add is_masked=False and masking_method=None to existing records | |
| lta_df = lta_df.with_columns([ | |
| pl.lit(False).alias('is_masked'), | |
| pl.lit(None).cast(pl.Utf8).alias('masking_method') | |
| ]) | |
| # ========================================================================= | |
| # 5. Merge and validate | |
| # ========================================================================= | |
| print("Merging masked records with existing data...") | |
| # Combine | |
| complete_df = pl.concat([lta_df, masked_df], how='vertical') | |
| # Sort by timestamp | |
| complete_df = complete_df.sort(time_col) | |
| # Deduplicate based on timestamp (October recovery created duplicates) | |
| initial_count = len(complete_df) | |
| complete_df = complete_df.unique(subset=['mtu']) | |
| deduped = initial_count - len(complete_df) | |
| if deduped > 0: | |
| print(f" Removed {deduped} duplicate timestamps from October recovery merge") | |
| print() | |
| print("=" * 80) | |
| print("MASKING COMPLETE") | |
| print("=" * 80) | |
| print(f"Original records: {len(lta_df):,}") | |
| print(f"Masked records: {len(masked_df):,}") | |
| print(f"Total records: {len(complete_df):,}") | |
| print() | |
| # Count masked records | |
| masked_count = complete_df.filter(pl.col('is_masked') == True).height | |
| print(f"Masked data: {masked_count:,} records ({masked_count/len(complete_df)*100:.2f}%)") | |
| print() | |
| # ========================================================================= | |
| # 6. Save complete dataset | |
| # ========================================================================= | |
| print("Saving complete dataset...") | |
| complete_df.write_parquet(lta_path) | |
| print(f" File: {lta_path}") | |
| print(f" Size: {lta_path.stat().st_size / (1024**2):.2f} MB") | |
| print(f" Backup: {backup_path}") | |
| print() | |
| # ========================================================================= | |
| # 7. Validation | |
| # ========================================================================= | |
| print("=" * 80) | |
| print("VALIDATION") | |
| print("=" * 80) | |
| # Check date continuity for October 2023 | |
| oct_2023 = complete_df.filter( | |
| (pl.col(time_col).dt.year() == 2023) & | |
| (pl.col(time_col).dt.month() == 10) | |
| ) | |
| unique_days = oct_2023.select(pl.col(time_col).dt.day().unique().sort()).to_series().to_list() | |
| expected_days = list(range(1, 32)) # 1-31 | |
| missing_days_final = set(expected_days) - set(unique_days) | |
| if missing_days_final: | |
| print(f"[WARNING] October 2023 still missing days: {sorted(missing_days_final)}") | |
| else: | |
| print("[OK] October 2023 date continuity: Complete (days 1-31)") | |
| # Check masked records | |
| masked_oct = complete_df.filter( | |
| (pl.col(time_col).dt.year() == 2023) & | |
| (pl.col(time_col).dt.month() == 10) & | |
| (pl.col(time_col).dt.day().is_in([27, 28, 29, 30, 31])) & | |
| (pl.col('is_masked') == True) | |
| ) | |
| print(f"[OK] Masked October 27-31, 2023: {len(masked_oct):,} records") | |
| # Overall data range | |
| min_date = complete_df.select(pl.col(time_col).min()).item() | |
| max_date = complete_df.select(pl.col(time_col).max()).item() | |
| print(f"[OK] Data range: {min_date} to {max_date}") | |
| print("=" * 80) | |
| print() | |
| print("SUCCESS: October 2023 LTA data masked with forward fill") | |
| print() | |
| if __name__ == '__main__': | |
| main() | |