"""CLI entry point for EduMatch Data Management.""" from __future__ import annotations import sys from pathlib import Path # Add project root to Python path project_root = Path(__file__).parent.parent if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) import typer from loguru import logger from rich.console import Console from rich.table import Table from tools import api_client, data_manager from tools.config import get_settings from tools.pinecone_processor import process_and_save settings = get_settings() # Configure logger logger.remove() # Remove default handler logger.add( sys.stderr, format="{time:HH:mm:ss} | {level: <8} | {message}", ) logger.add( settings.log_file, rotation=settings.log_rotation, retention=settings.log_retention, format="{time} | {level} | {message}", ) app = typer.Typer(help="Common Core MCP CLI - Manage educational standards data") console = Console() @app.command() def jurisdictions( search: str = typer.Option( None, "--search", "-s", help="Filter by jurisdiction name (case-insensitive partial match)", ), type: str = typer.Option( None, "--type", "-t", help="Filter by jurisdiction type: school, organization, state, or nation", ), force: bool = typer.Option( False, "--force", "-f", help="Force refresh from API, ignoring local cache" ), ): """ List all available jurisdictions (states/organizations). By default, jurisdictions are loaded from local cache (data/raw/jurisdictions.json) to avoid repeated API calls. Use --force to fetch fresh data from the API and update the cache. The cache is automatically created on first use. Filters can be combined: use --search to filter by name and --type to filter by type. """ try: if force: console.print("[yellow]Forcing refresh from API...[/yellow]") # Validate type filter if provided if type: valid_types = {"school", "organization", "state", "nation"} if type.lower() not in valid_types: console.print( f"[red]Error: Invalid type '{type}'. Must be one of: {', '.join(sorted(valid_types))}[/red]" ) raise typer.Exit(code=1) results = api_client.get_jurisdictions( search_term=search, type_filter=type, force_refresh=force ) table = Table("ID", "Title", "Type", title="Jurisdictions") for j in results: table.add_row(j.id, j.title, j.type) console.print(table) console.print(f"\n[green]Found {len(results)} jurisdictions[/green]") if not force: console.print("[dim]Tip: Use --force to refresh from API[/dim]") except Exception as e: console.print(f"[red]Error: {e}[/red]") logger.exception("Failed to fetch jurisdictions") raise typer.Exit(code=1) @app.command() def jurisdiction_details( jurisdiction_id: str = typer.Argument(..., help="Jurisdiction ID"), force: bool = typer.Option( False, "--force", "-f", help="Force refresh from API, ignoring local cache" ), ): """ Download and display jurisdiction metadata including standard set references. By default, jurisdiction metadata is loaded from local cache (data/raw/jurisdictions/{id}/data.json) to avoid repeated API calls. Use --force to fetch fresh data from the API and update the cache. The cache is automatically created on first use. Note: This command downloads metadata about standard sets (IDs, titles, subjects) but NOT the full standard set content. Use the 'download' command to get full standard set data. """ try: if force: console.print("[yellow]Forcing refresh from API...[/yellow]") jurisdiction_data = api_client.get_jurisdiction_details( jurisdiction_id, force_refresh=force ) # Display jurisdiction info console.print(f"\n[bold]Jurisdiction:[/bold] {jurisdiction_data.title}") console.print(f"[bold]Type:[/bold] {jurisdiction_data.type}") console.print(f"[bold]ID:[/bold] {jurisdiction_data.id}") # Display standard sets standard_sets = jurisdiction_data.standardSets if standard_sets: table = Table( "Set ID", "Subject", "Title", "Grade Levels", title="Standard Sets" ) for s in standard_sets: grade_levels = ", ".join(s.educationLevels) table.add_row( s.id, s.subject, s.title, grade_levels or "N/A", ) console.print("\n") console.print(table) console.print(f"\n[green]Found {len(standard_sets)} standard sets[/green]") else: console.print("\n[yellow]No standard sets found[/yellow]") if not force: console.print("[dim]Tip: Use --force to refresh from API[/dim]") except Exception as e: console.print(f"[red]Error: {e}[/red]") logger.exception("Failed to fetch jurisdiction details") raise typer.Exit(code=1) @app.command("download-sets") def download_sets( set_id: str = typer.Argument(None, help="Standard set ID (if downloading by ID)"), jurisdiction: str = typer.Option( None, "--jurisdiction", "-j", help="Jurisdiction ID (if downloading by jurisdiction)", ), force: bool = typer.Option( False, "--force", "-f", help="Force refresh from API, ignoring local cache" ), yes: bool = typer.Option( False, "--yes", "-y", help="Skip confirmation prompt when downloading by jurisdiction", ), dry_run: bool = typer.Option( False, "--dry-run", help="Show what would be downloaded without actually downloading", ), education_levels: str = typer.Option( None, "--education-levels", help="Comma-separated grade levels (e.g., '03,04,05')", ), publication_status: str = typer.Option( None, "--publication-status", help="Publication status filter (e.g., 'Published', 'Deprecated')", ), valid_year: str = typer.Option( None, "--valid-year", help="Valid year filter (e.g., '2012')" ), title: str = typer.Option( None, "--title", help="Partial title match (case-insensitive)" ), subject: str = typer.Option( None, "--subject", help="Partial subject match (case-insensitive)" ), ): """ Download standard sets either by ID or by jurisdiction with filtering. When downloading by jurisdiction, filters can be applied and all filters combine with AND logic. A confirmation prompt will be shown listing all standard sets that will be downloaded. Use --dry-run to preview what would be downloaded without actually downloading anything. """ try: # Validate arguments if not set_id and not jurisdiction: console.print( "[red]Error: Must provide either set_id or --jurisdiction[/red]" ) raise typer.Exit(code=1) if set_id and jurisdiction: console.print( "[red]Error: Cannot specify both set_id and --jurisdiction[/red]" ) raise typer.Exit(code=1) # Download by ID if set_id: if dry_run: console.print( f"[yellow][DRY RUN] Would download standard set: {set_id}[/yellow]" ) cache_path = Path("data/raw/standardSets") / set_id / "data.json" console.print(f" Would cache to: {cache_path}") return with console.status(f"[bold blue]Downloading standard set {set_id}..."): api_client.download_standard_set(set_id, force_refresh=force) cache_path = Path("data/raw/standardSets") / set_id / "data.json" console.print("[green]✓ Successfully downloaded standard set[/green]") console.print(f" Cached to: {cache_path}") # Process the downloaded set try: with console.status(f"[bold blue]Processing standard set {set_id}..."): processed_path = process_and_save(set_id) console.print("[green]✓ Successfully processed standard set[/green]") console.print(f" Processed to: {processed_path}") except FileNotFoundError: console.print( "[yellow]Warning: data.json not found, skipping processing[/yellow]" ) except Exception as e: console.print( f"[yellow]Warning: Failed to process standard set: {e}[/yellow]" ) logger.exception(f"Failed to process standard set {set_id}") return # Download by jurisdiction if jurisdiction: # Parse education levels education_levels_list = None if education_levels: education_levels_list = [ level.strip() for level in education_levels.split(",") ] # Get jurisdiction metadata jurisdiction_data = api_client.get_jurisdiction_details( jurisdiction, force_refresh=False ) all_sets = jurisdiction_data.standardSets # Apply filters using the API client's filter function from tools.api_client import _filter_standard_set filtered_sets = [ s for s in all_sets if _filter_standard_set( s, education_levels=education_levels_list, publication_status=publication_status, valid_year=valid_year, title_search=title, subject_search=subject, ) ] if not filtered_sets: console.print( "[yellow]No standard sets match the provided filters.[/yellow]" ) return # Display filtered sets if dry_run: console.print( f"\n[yellow][DRY RUN] Standard sets that would be downloaded ({len(filtered_sets)}):[/yellow]" ) else: console.print( f"\n[bold]Standard sets to download ({len(filtered_sets)}):[/bold]" ) table = Table( "Set ID", "Subject", "Title", "Grade Levels", "Status", "Year", "Downloaded", title="Standard Sets", ) for s in filtered_sets: display_id = s.id[:20] + "..." if len(s.id) > 20 else s.id # Check if already downloaded set_data_path = settings.standard_sets_dir / s.id / "data.json" is_downloaded = set_data_path.exists() downloaded_status = ( "[green]✓[/green]" if is_downloaded else "[yellow]✗[/yellow]" ) table.add_row( display_id, s.subject, s.title[:40], ", ".join(s.educationLevels), s.document.publicationStatus or "N/A", s.document.valid, downloaded_status, ) console.print(table) # If dry run, show summary and exit if dry_run: console.print( f"\n[yellow][DRY RUN] Would download {len(filtered_sets)} standard set(s)[/yellow]" ) console.print( "[dim]Run without --dry-run to actually download these standard sets.[/dim]" ) return # Confirmation prompt if not yes: if not typer.confirm( f"\nDownload {len(filtered_sets)} standard set(s)?" ): console.print("[yellow]Download cancelled.[/yellow]") return # Download each standard set console.print( f"\n[bold blue]Downloading {len(filtered_sets)} standard set(s)...[/bold blue]" ) downloaded = 0 failed = 0 for i, standard_set in enumerate(filtered_sets, 1): set_id = standard_set.id try: with console.status( f"[bold blue][{i}/{len(filtered_sets)}] Downloading {set_id[:20]}..." ): api_client.download_standard_set(set_id, force_refresh=force) downloaded += 1 # Process the downloaded set try: with console.status( f"[bold blue][{i}/{len(filtered_sets)}] Processing {set_id[:20]}..." ): process_and_save(set_id) except FileNotFoundError: console.print( f"[yellow]Warning: Skipping processing for {set_id[:20]}... (data.json not found)[/yellow]" ) except Exception as e: console.print( f"[yellow]Warning: Failed to process {set_id[:20]}...: {e}[/yellow]" ) logger.exception(f"Failed to process standard set {set_id}") except Exception as e: console.print(f"[red]✗ Failed to download {set_id}: {e}[/red]") logger.exception(f"Failed to download standard set {set_id}") failed += 1 # Summary console.print( f"\n[green]✓ Successfully downloaded {downloaded} standard set(s)[/green]" ) if failed > 0: console.print( f"[red]✗ Failed to download {failed} standard set(s)[/red]" ) except Exception as e: console.print(f"[red]Error: {e}[/red]") logger.exception("Failed to download standard sets") raise typer.Exit(code=1) @app.command("list") def list_datasets(): """List all downloaded standard sets and their processing status.""" try: datasets = data_manager.list_downloaded_standard_sets() if not datasets: console.print("[yellow]No standard sets downloaded yet.[/yellow]") console.print("[dim]Use 'download-sets' to download standard sets.[/dim]") return # Check for processed.json files for d in datasets: set_dir = settings.standard_sets_dir / d.set_id processed_file = set_dir / "processed.json" d.processed = processed_file.exists() # Count processed vs unprocessed processed_count = sum(1 for d in datasets if d.processed) unprocessed_count = len(datasets) - processed_count table = Table( "Set ID", "Jurisdiction", "Subject", "Title", "Grades", "Status", "Processed", title="Downloaded Standard Sets", ) for d in datasets: # Truncate long set IDs display_id = d.set_id[:25] + "..." if len(d.set_id) > 25 else d.set_id table.add_row( display_id, d.jurisdiction, d.subject[:30], d.title[:30], ", ".join(d.education_levels), d.publication_status, "[green]✓[/green]" if d.processed else "[yellow]✗[/yellow]", ) console.print(table) console.print("\n[bold]Summary:[/bold]") console.print(f" Total: {len(datasets)} standard sets") console.print(f" Processed: [green]{processed_count}[/green]") console.print(f" Unprocessed: [yellow]{unprocessed_count}[/yellow]") except Exception as e: console.print(f"[red]Error: {e}[/red]") logger.exception("Failed to list datasets") raise typer.Exit(code=1) @app.command("pinecone-init") def pinecone_init(): """ Initialize Pinecone index. Checks if the configured index exists and creates it if not. Uses integrated embeddings with llama-text-embed-v2 model. """ try: from src.pinecone_client import PineconeClient console.print("[bold]Initializing Pinecone...[/bold]") # Initialize Pinecone client (validates API key) try: client = PineconeClient() except ValueError as e: console.print(f"[red]Error: {e}[/red]") raise typer.Exit(code=1) console.print(f" Index name: [cyan]{client.index_name}[/cyan]") console.print(f" Namespace: [cyan]{client.namespace}[/cyan]") # Check and create index if needed with console.status("[bold blue]Checking index status..."): created = client.ensure_index_exists() if created: console.print( f"\n[green]Successfully created index '{client.index_name}'[/green]" ) console.print("[dim]Index configuration:[/dim]") console.print(" Cloud: aws") console.print(" Region: us-east-1") console.print(" Embedding model: llama-text-embed-v2") console.print(" Field map: text -> content") else: console.print( f"\n[green]Index '{client.index_name}' already exists[/green]" ) # Show index stats with console.status("[bold blue]Fetching index stats..."): stats = client.get_index_stats() console.print("\n[bold]Index Statistics:[/bold]") console.print(f" Total vectors: [cyan]{stats['total_vector_count']}[/cyan]") namespaces = stats.get("namespaces", {}) if namespaces: console.print(f" Namespaces: [cyan]{len(namespaces)}[/cyan]") table = Table("Namespace", "Vector Count", title="Namespace Details") for ns_name, ns_info in namespaces.items(): vector_count = getattr(ns_info, "vector_count", 0) table.add_row(ns_name or "(default)", str(vector_count)) console.print(table) else: console.print(" Namespaces: [yellow]None (empty index)[/yellow]") except Exception as e: console.print(f"[red]Error: {e}[/red]") logger.exception("Failed to initialize Pinecone") raise typer.Exit(code=1) @app.command("pinecone-upload") def pinecone_upload( set_id: str = typer.Option( None, "--set-id", help="Upload a specific standard set by ID" ), all: bool = typer.Option( False, "--all", help="Upload all downloaded standard sets with processed.json" ), force: bool = typer.Option( False, "--force", help="Re-upload even if .pinecone_uploaded marker exists", ), dry_run: bool = typer.Option( False, "--dry-run", help="Show what would be uploaded without actually uploading", ), batch_size: int = typer.Option( 96, "--batch-size", help="Number of records per batch (default: 96)" ), ): """ Upload processed standard sets to Pinecone. Use --set-id to upload a specific set, or --all to upload all sets with processed.json. If neither is provided, you'll be prompted to confirm uploading all sets. """ try: from src.pinecone_client import PineconeClient from tools.pinecone_models import ProcessedStandardSet import json # Initialize Pinecone client try: client = PineconeClient() except ValueError as e: console.print(f"[red]Error: {e}[/red]") raise typer.Exit(code=1) # Validate index exists try: client.validate_index() except ValueError as e: console.print(f"[red]Error: {e}[/red]") raise typer.Exit(code=1) # Discover standard sets with processed.json standard_sets_dir = settings.standard_sets_dir if not standard_sets_dir.exists(): console.print("[yellow]No standard sets directory found.[/yellow]") console.print( "[dim]Use 'download-sets' to download standard sets first.[/dim]" ) return # Find all sets with processed.json sets_to_upload = [] for set_dir in standard_sets_dir.iterdir(): if not set_dir.is_dir(): continue processed_file = set_dir / "processed.json" if not processed_file.exists(): continue set_id_from_dir = set_dir.name # Check if already uploaded (unless --force) # Mark all sets during discovery; filtering by --set-id happens later if not force and PineconeClient.is_uploaded(set_dir): sets_to_upload.append( (set_id_from_dir, set_dir, True) ) # True = already uploaded else: sets_to_upload.append( (set_id_from_dir, set_dir, False) ) # False = needs upload if not sets_to_upload: console.print( "[yellow]No standard sets with processed.json found.[/yellow]" ) console.print( "[dim]Use 'download-sets' to download and process standard sets first.[/dim]" ) return # Filter by --set-id if provided if set_id: sets_to_upload = [ (sid, sdir, skipped) for sid, sdir, skipped in sets_to_upload if sid == set_id ] if not sets_to_upload: console.print( f"[yellow]Standard set '{set_id}' not found or has no processed.json.[/yellow]" ) return # If neither --set-id nor --all provided, prompt for confirmation if not set_id and not all: console.print( f"\n[bold]Found {len(sets_to_upload)} standard set(s) with processed.json:[/bold]" ) table = Table("Set ID", "Status", title="Standard Sets") for sid, sdir, skipped in sets_to_upload: status = ( "[yellow]Already uploaded[/yellow]" if skipped else "[green]Ready[/green]" ) table.add_row(sid, status) console.print(table) if not typer.confirm( f"\nUpload {len(sets_to_upload)} standard set(s) to Pinecone?" ): console.print("[yellow]Upload cancelled.[/yellow]") return # Show what would be uploaded (dry-run or preview) if dry_run or not all: console.print( f"\n[bold]Standard sets to upload ({len(sets_to_upload)}):[/bold]" ) table = Table("Set ID", "Records", "Status", title="Upload Preview") for sid, sdir, skipped in sets_to_upload: if skipped and not force: table.add_row( sid, "N/A", "[yellow]Skipped (already uploaded)[/yellow]" ) continue # Load processed.json to count records try: with open(sdir / "processed.json", encoding="utf-8") as f: processed_data = json.load(f) record_count = len(processed_data.get("records", [])) status = ( "[green]Ready[/green]" if not dry_run else "[yellow]Would upload[/yellow]" ) table.add_row(sid, str(record_count), status) except Exception as e: table.add_row(sid, "Error", f"[red]Failed to read: {e}[/red]") console.print(table) if dry_run: console.print( f"\n[yellow][DRY RUN] Would upload {len([s for s in sets_to_upload if not s[2] or force])} standard set(s)[/yellow]" ) console.print("[dim]Run without --dry-run to actually upload.[/dim]") return # Perform uploads uploaded_count = 0 failed_count = 0 skipped_count = 0 for i, (sid, sdir, already_uploaded) in enumerate(sets_to_upload, 1): if already_uploaded and not force: skipped_count += 1 continue try: # Load processed.json with open(sdir / "processed.json", encoding="utf-8") as f: processed_data = json.load(f) processed_set = ProcessedStandardSet(**processed_data) records = processed_set.records if not records: console.print( f"[yellow]Skipping {sid} (no records)[/yellow]" ) skipped_count += 1 continue # Upload records with console.status( f"[bold blue][{i}/{len(sets_to_upload)}] Uploading {sid} ({len(records)} records)" ): client.batch_upsert(records, batch_size=batch_size) # Mark as uploaded PineconeClient.mark_uploaded(sdir) uploaded_count += 1 console.print( f"[green]✓ [{i}/{len(sets_to_upload)}] Uploaded {sid} ({len(records)} records)[/green]" ) except FileNotFoundError: console.print( f"[red]✗ [{i}/{len(sets_to_upload)}] Failed: {sid} (processed.json not found)[/red]" ) logger.exception(f"Failed to upload standard set {sid}") failed_count += 1 except Exception as e: console.print( f"[red]✗ [{i}/{len(sets_to_upload)}] Failed: {sid} ({e})[/red]" ) logger.exception(f"Failed to upload standard set {sid}") failed_count += 1 # Summary console.print("\n[bold]Upload Summary:[/bold]") console.print(f" Uploaded: [green]{uploaded_count}[/green]") if skipped_count > 0: console.print(f" Skipped: [yellow]{skipped_count}[/yellow]") if failed_count > 0: console.print(f" Failed: [red]{failed_count}[/red]") except Exception as e: console.print(f"[red]Error: {e}[/red]") logger.exception("Failed to upload to Pinecone") raise typer.Exit(code=1) if __name__ == "__main__": app()