from pathlib import Path from typing import Any, Dict, List import json import os import shutil import torch from fastapi import FastAPI, File, Form, Request, Response, UploadFile from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from loguru import logger from werkzeug.utils import secure_filename import main as extractor app = FastAPI() # Static files and templates ------------------------------------------------- app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") def flask_like_url_for(endpoint: str, **kwargs: Any) -> str: """Minimal Flask-like url_for for templates using filename= for static. The Jinja template calls url_for('static', filename='css/styles.css'), which is Flask style. We emulate that here so templates work unchanged. """ if endpoint == "static": filename = str(kwargs.get("filename", "")) return "/static/" + filename.lstrip("/") # Fallback: just return "/"; templates only use static. return "/" + endpoint.lstrip("/") templates.env.globals["url_for"] = flask_like_url_for # Configuration ------------------------------------------------------------- UPLOAD_FOLDER = Path("./uploads") OUTPUT_FOLDER = Path("./output") MAX_CONTENT_LENGTH = 500 * 1024 * 1024 # 500MB os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(OUTPUT_FOLDER, exist_ok=True) # Global model cache -------------------------------------------------------- _model: Any = None def get_device_info() -> Dict[str, Any]: """Get information about GPU/CPU availability.""" cuda_available = torch.cuda.is_available() device = "cuda" if cuda_available else "cpu" info: Dict[str, Any] = { "device": device, "cuda_available": cuda_available, "device_name": None, "device_count": 0, } if cuda_available: info["device_name"] = torch.cuda.get_device_name(0) info["device_count"] = torch.cuda.device_count() return info def load_model_once() -> Any: """Load the DocLayout-YOLO model once and cache it in this process.""" global _model if _model is None: logger.info("Loading DocLayout-YOLO model...") _model = extractor.get_model() logger.info("Model loaded successfully") return _model # Routes -------------------------------------------------------------------- @app.get("/", response_class=HTMLResponse) async def index(request: Request) -> HTMLResponse: """Main page, equivalent to the Flask index route.""" device_info = get_device_info() return templates.TemplateResponse( "index.html", {"request": request, "device_info": device_info} ) @app.get("/api/device-info") async def device_info() -> Dict[str, Any]: """API endpoint to get device information.""" return get_device_info() @app.post("/api/upload") async def upload_files( request: Request, files: List[UploadFile] = File(..., alias="files[]"), extraction_mode: str = Form("images"), ) -> JSONResponse: """Handle multiple PDF file uploads (FastAPI version of Flask route).""" if not files or all((f.filename or "") == "" for f in files): return JSONResponse({"error": "No files selected"}, status_code=400) include_images = extraction_mode != "markdown" include_markdown = extraction_mode != "images" results: List[Dict[str, Any]] = [] for upload in files: filename = upload.filename or "" if not filename.endswith(".pdf"): continue try: safe_name = secure_filename(filename) stem = Path(safe_name).stem upload_path = UPLOAD_FOLDER / safe_name # Save uploaded file to disk with upload_path.open("wb") as out_f: while True: chunk = await upload.read(1024 * 1024) if not chunk: break out_f.write(chunk) # Prepare output directory output_dir = OUTPUT_FOLDER / stem output_dir.mkdir(parents=True, exist_ok=True) # Move PDF into output directory pdf_path = output_dir / safe_name upload_path.replace(pdf_path) # Process PDF extractor.USE_MULTIPROCESSING = False logger.info( f"Processing {safe_name} (images={include_images}, markdown={include_markdown})" ) if include_images: load_model_once() extractor.process_pdf_with_pool( pdf_path, output_dir, pool=None, extract_images=include_images, extract_markdown=include_markdown, ) # Collect results json_path = output_dir / f"{stem}_content_list.json" elements: List[Dict[str, Any]] = [] if include_images and json_path.exists(): elements = json.loads(json_path.read_text(encoding="utf-8")) annotated_pdf: str | None = None if include_images: candidate_pdf = output_dir / f"{stem}_layout.pdf" if candidate_pdf.exists(): annotated_pdf = str(candidate_pdf.relative_to(OUTPUT_FOLDER)) markdown_path: str | None = None if include_markdown: candidate_md = output_dir / f"{stem}.md" if candidate_md.exists(): markdown_path = str(candidate_md.relative_to(OUTPUT_FOLDER)) figures = [e for e in elements if e.get("type") == "figure"] tables = [e for e in elements if e.get("type") == "table"] results.append( { "filename": safe_name, "stem": stem, "output_dir": str(output_dir.relative_to(OUTPUT_FOLDER)), "figures_count": len(figures), "tables_count": len(tables), "elements_count": len(elements), "annotated_pdf": annotated_pdf, "markdown_path": markdown_path, "include_images": include_images, "include_markdown": include_markdown, } ) except Exception as e: # pragma: no cover - runtime error path logger.error(f"Error processing {filename}: {e}") results.append({"filename": filename, "error": str(e)}) return JSONResponse({"results": results}) @app.get("/api/pdf-list") async def pdf_list() -> Dict[str, Any]: """Get list of processed PDFs.""" pdfs: List[Dict[str, Any]] = [] output_dir = OUTPUT_FOLDER if not output_dir.exists(): return {"pdfs": pdfs} for item in output_dir.iterdir(): if item.is_dir(): json_files = list(item.glob("*_content_list.json")) md_files = list(item.glob("*.md")) pdf_files = list(item.glob("*.pdf")) if json_files or md_files or pdf_files: stem = item.name pdfs.append( { "stem": stem, "output_dir": str(item.relative_to(output_dir)), } ) return {"pdfs": pdfs} @app.get("/api/pdf-details/{pdf_stem:path}") async def pdf_details(pdf_stem: str) -> JSONResponse: """Get detailed information about a processed PDF.""" output_dir = OUTPUT_FOLDER / pdf_stem if not output_dir.exists(): return JSONResponse({"error": "PDF not found"}, status_code=404) json_files = list(output_dir.glob("*_content_list.json")) elements: List[Dict[str, Any]] = [] if json_files: elements = json.loads(json_files[0].read_text(encoding="utf-8")) figures = [e for e in elements if e.get("type") == "figure"] tables = [e for e in elements if e.get("type") == "table"] annotated_pdf: str | None = None pdf_files = list(output_dir.glob("*_layout.pdf")) if pdf_files: annotated_pdf = str(pdf_files[0].relative_to(OUTPUT_FOLDER)) markdown_path: str | None = None md_files = list(output_dir.glob("*.md")) if md_files: markdown_path = str(md_files[0].relative_to(OUTPUT_FOLDER)) figure_dir = output_dir / "figures" table_dir = output_dir / "tables" figure_images: List[str] = [] if figure_dir.exists(): figure_images = [ str(f.relative_to(OUTPUT_FOLDER)) for f in sorted(figure_dir.glob("*.png")) ] table_images: List[str] = [] if table_dir.exists(): table_images = [ str(t.relative_to(OUTPUT_FOLDER)) for t in sorted(table_dir.glob("*.png")) ] return JSONResponse( { "stem": pdf_stem, "figures": figures, "tables": tables, "figures_count": len(figures), "tables_count": len(tables), "elements_count": len(elements), "annotated_pdf": annotated_pdf, "markdown_path": markdown_path, "figure_images": figure_images, "table_images": table_images, } ) @app.get("/output/{filename:path}", response_model=None) async def output_file(filename: str): """Serve output files (PDFs, images, markdown).""" output_root = OUTPUT_FOLDER.resolve() file_path = (output_root / filename).resolve() if output_root not in file_path.parents and file_path != output_root: return JSONResponse({"error": "Invalid path"}, status_code=400) if not file_path.exists() or not file_path.is_file(): return JSONResponse({"error": "Not found"}, status_code=404) return FileResponse(file_path) def _delete_by_stem(stem_raw: str) -> JSONResponse: stem = (stem_raw or "").strip() if not stem: return JSONResponse({"error": "Missing stem"}, status_code=400) output_root = OUTPUT_FOLDER.resolve() target_dir = (output_root / stem).resolve() if output_root not in target_dir.parents and target_dir != output_root: return JSONResponse({"error": "Invalid stem path"}, status_code=400) if not target_dir.exists() or not target_dir.is_dir(): return JSONResponse({"error": "Not found"}, status_code=404) shutil.rmtree(target_dir, ignore_errors=False) logger.info(f"Deleted processed output: {target_dir}") return JSONResponse({"ok": True, "deleted": stem}) @app.post("/api/delete") async def delete_pdf(request: Request, stem_form: str | None = Form(default=None)) -> JSONResponse: """Delete a processed PDF directory by stem (JSON or form body).""" try: stem = (stem_form or "").strip() if not stem: data: Dict[str, Any] = {} try: data = await request.json() except Exception: data = {} stem = (str(data.get("stem") or "")).strip() return _delete_by_stem(stem) except Exception as e: # pragma: no cover - runtime error path logger.error(f"Delete failed: {e}") return JSONResponse({"error": str(e)}, status_code=500) @app.api_route("/api/delete/{stem:path}", methods=["POST", "GET"]) async def delete_pdf_by_path(stem: str) -> JSONResponse: """Alternate endpoint to delete using URL path, for clients avoiding bodies.""" try: return _delete_by_stem(stem) except Exception as e: # pragma: no cover - runtime error path logger.error(f"Delete failed: {e}") return JSONResponse({"error": str(e)}, status_code=500)