Spaces:
Sleeping
Sleeping
| from typing import Annotated | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from fastapi.encoders import jsonable_encoder | |
| from fastapi.exceptions import RequestValidationError | |
| from starlette.middleware.cors import CORSMiddleware | |
| from fastapi import FastAPI, Header, UploadFile, Depends, HTTPException, status | |
| import base64 | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from starlette.responses import JSONResponse | |
| from collections import defaultdict | |
| from pydantic import BaseModel | |
| from threading import Lock | |
| from logger import get_now | |
| from run import main as run_main | |
| START_AT = get_now() | |
| app = FastAPI() | |
| lock = Lock() | |
| n_run = 0 | |
| last_run = None | |
| is_running=False | |
| def scheduled_job(): | |
| with lock: | |
| global is_running | |
| if is_running: | |
| return False | |
| is_running = True | |
| print("Job is running!") | |
| run_main() | |
| with lock: | |
| global n_run | |
| n_run = n_run + 1 | |
| global last_run | |
| last_run = get_now() | |
| is_running = False | |
| return True | |
| # Create a scheduler | |
| scheduler = BackgroundScheduler() | |
| # Add the scheduled job to the scheduler | |
| scheduler.add_job(scheduled_job, 'interval', minutes=30) | |
| # Start the scheduler | |
| scheduler.start() | |
| # You can also stop the scheduler when the FastAPI application shuts down | |
| def shutdown_event(): | |
| scheduler.shutdown() | |
| class BaseResponse(BaseModel): | |
| status: int = 1 | |
| message: str = "" | |
| result: object = None | |
| async def http_exception_handler(request, exc: HTTPException): | |
| return JSONResponse( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| content=jsonable_encoder(BaseResponse(status=0, message=exc.detail)) | |
| ) | |
| def validation_exception_handler(request, exc: RequestValidationError) -> JSONResponse: | |
| reformatted_message = defaultdict(list) | |
| for pydantic_error in exc.errors(): | |
| loc, msg = pydantic_error["loc"], pydantic_error["msg"] | |
| filtered_loc = loc[1:] if loc[0] in ("body", "query", "path") else loc | |
| field_string = ".".join(filtered_loc) | |
| reformatted_message[field_string].append(msg) | |
| return JSONResponse( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| content=jsonable_encoder(BaseResponse(status=0, message="Invalid request", result=reformatted_message)) | |
| ) | |
| def status(): | |
| return BaseResponse(result={ | |
| "start_at": START_AT, | |
| "current": get_now(), | |
| "n_runs": n_run, | |
| "last_run": last_run, | |
| }) | |
| def run_once(): | |
| print("Running the job once.") | |
| success = scheduled_job() # Manually trigger the job | |
| if not success: | |
| return BaseResponse(message="Job is running, not start a new job") | |
| return BaseResponse(message="Job executed once.") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) |