Spaces:
Sleeping
Sleeping
File size: 7,527 Bytes
27cb60a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
"""Mask missing October 27-31, 2023 LTA data using forward fill from October 26.
Missing data: October 27-31, 2023 (~145 records, 0.5% of dataset)
Strategy: Forward fill LTA values from October 26, 2023
Rationale: LTA (Long Term Allocations) change infrequently, forward fill is conservative
"""
import sys
from pathlib import Path
from datetime import datetime, timedelta
import polars as pl
def main():
"""Forward fill missing October 27-31, 2023 LTA data."""
print("\n" + "=" * 80)
print("OCTOBER 27-31, 2023 LTA MASKING")
print("=" * 80)
print("Strategy: Forward fill from October 26, 2023")
print("Missing data: ~145 records (0.5% of dataset)")
print("=" * 80)
print()
# =========================================================================
# 1. Load existing LTA data
# =========================================================================
lta_path = Path('data/raw/phase1_24month/jao_lta.parquet')
if not lta_path.exists():
print(f"[ERROR] LTA file not found: {lta_path}")
return
print("Loading existing LTA data...")
lta_df = pl.read_parquet(lta_path)
print(f" Current records: {len(lta_df):,}")
print(f" Columns: {lta_df.columns}")
print()
# Backup existing file
backup_path = lta_path.with_name('jao_lta.parquet.backup3')
lta_df.write_parquet(backup_path)
print(f"Backup created: {backup_path}")
print()
# =========================================================================
# 2. Identify October 26, 2023 data (source for forward fill)
# =========================================================================
print("Extracting October 26, 2023 data...")
# Use 'mtu' (Market Time Unit) timestamp column
time_col = 'mtu'
if time_col not in lta_df.columns:
print(f"[ERROR] No 'mtu' timestamp column found. Available columns: {lta_df.columns}")
return
print(f" Using timestamp column: '{time_col}'")
# Convert to datetime if string
if lta_df[time_col].dtype == pl.Utf8:
lta_df = lta_df.with_columns([
pl.col(time_col).str.strptime(pl.Datetime, format="%Y-%m-%d %H:%M:%S").alias(time_col)
])
# Filter October 26, 2023 data
oct_26_data = lta_df.filter(
(pl.col(time_col).dt.year() == 2023) &
(pl.col(time_col).dt.month() == 10) &
(pl.col(time_col).dt.day() == 26)
)
print(f" October 26, 2023 records: {len(oct_26_data)}")
if len(oct_26_data) == 0:
print("[ERROR] No October 26, 2023 data found to use for masking")
return
print()
# =========================================================================
# 3. Generate masked records for October 27-31, 2023
# =========================================================================
print("Generating masked records for October 27-31, 2023...")
all_masked_records = []
missing_days = [27, 28, 29, 30, 31]
for day in missing_days:
# Create masked records by copying Oct 26 data and updating timestamp
masked_day = oct_26_data.clone()
# Calculate time delta (1 day, 2 days, etc.)
days_delta = day - 26
# Update timestamp (preserve dtype)
masked_day = masked_day.with_columns([
(pl.col(time_col) + pl.duration(days=days_delta)).cast(lta_df[time_col].dtype).alias(time_col)
])
# Add masking flag
masked_day = masked_day.with_columns([
pl.lit(True).alias('is_masked'),
pl.lit('forward_fill_oct26').alias('masking_method')
])
all_masked_records.append(masked_day)
print(f" Day {day}: {len(masked_day)} records (forward filled from Oct 26)")
# Combine all masked records
masked_df = pl.concat(all_masked_records, how='vertical')
print(f"\n Total masked records: {len(masked_df):,}")
print()
# =========================================================================
# 4. Add masking flags to existing data
# =========================================================================
print("Adding masking flags to existing data...")
# Add is_masked=False and masking_method=None to existing records
lta_df = lta_df.with_columns([
pl.lit(False).alias('is_masked'),
pl.lit(None).cast(pl.Utf8).alias('masking_method')
])
# =========================================================================
# 5. Merge and validate
# =========================================================================
print("Merging masked records with existing data...")
# Combine
complete_df = pl.concat([lta_df, masked_df], how='vertical')
# Sort by timestamp
complete_df = complete_df.sort(time_col)
# Deduplicate based on timestamp (October recovery created duplicates)
initial_count = len(complete_df)
complete_df = complete_df.unique(subset=['mtu'])
deduped = initial_count - len(complete_df)
if deduped > 0:
print(f" Removed {deduped} duplicate timestamps from October recovery merge")
print()
print("=" * 80)
print("MASKING COMPLETE")
print("=" * 80)
print(f"Original records: {len(lta_df):,}")
print(f"Masked records: {len(masked_df):,}")
print(f"Total records: {len(complete_df):,}")
print()
# Count masked records
masked_count = complete_df.filter(pl.col('is_masked') == True).height
print(f"Masked data: {masked_count:,} records ({masked_count/len(complete_df)*100:.2f}%)")
print()
# =========================================================================
# 6. Save complete dataset
# =========================================================================
print("Saving complete dataset...")
complete_df.write_parquet(lta_path)
print(f" File: {lta_path}")
print(f" Size: {lta_path.stat().st_size / (1024**2):.2f} MB")
print(f" Backup: {backup_path}")
print()
# =========================================================================
# 7. Validation
# =========================================================================
print("=" * 80)
print("VALIDATION")
print("=" * 80)
# Check date continuity for October 2023
oct_2023 = complete_df.filter(
(pl.col(time_col).dt.year() == 2023) &
(pl.col(time_col).dt.month() == 10)
)
unique_days = oct_2023.select(pl.col(time_col).dt.day().unique().sort()).to_series().to_list()
expected_days = list(range(1, 32)) # 1-31
missing_days_final = set(expected_days) - set(unique_days)
if missing_days_final:
print(f"[WARNING] October 2023 still missing days: {sorted(missing_days_final)}")
else:
print("[OK] October 2023 date continuity: Complete (days 1-31)")
# Check masked records
masked_oct = complete_df.filter(
(pl.col(time_col).dt.year() == 2023) &
(pl.col(time_col).dt.month() == 10) &
(pl.col(time_col).dt.day().is_in([27, 28, 29, 30, 31])) &
(pl.col('is_masked') == True)
)
print(f"[OK] Masked October 27-31, 2023: {len(masked_oct):,} records")
# Overall data range
min_date = complete_df.select(pl.col(time_col).min()).item()
max_date = complete_df.select(pl.col(time_col).max()).item()
print(f"[OK] Data range: {min_date} to {max_date}")
print("=" * 80)
print()
print("SUCCESS: October 2023 LTA data masked with forward fill")
print()
if __name__ == '__main__':
main()
|