from typing import Dict, Optional from datetime import datetime import uuid import json import os from app.models.schemas import JobInfo, JobStatus class JobStore: """Simple in-memory job store with file persistence.""" def __init__(self, persistence_file: str = "data/jobs.json"): self._jobs: Dict[str, JobInfo] = {} self._persistence_file = persistence_file self._load_jobs() def _load_jobs(self): """Load jobs from file on startup.""" if os.path.exists(self._persistence_file): try: with open(self._persistence_file, "r") as f: data = json.load(f) for job_id, job_data in data.items(): job_data["created_at"] = datetime.fromisoformat(job_data["created_at"]) job_data["updated_at"] = datetime.fromisoformat(job_data["updated_at"]) self._jobs[job_id] = JobInfo(**job_data) except Exception: pass def _save_jobs(self): """Persist jobs to file.""" os.makedirs(os.path.dirname(self._persistence_file), exist_ok=True) data = {} for job_id, job in self._jobs.items(): job_dict = job.model_dump() job_dict["created_at"] = job_dict["created_at"].isoformat() job_dict["updated_at"] = job_dict["updated_at"].isoformat() data[job_id] = job_dict with open(self._persistence_file, "w") as f: json.dump(data, f, ensure_ascii=False, indent=2) def create_job(self, original_url: str) -> JobInfo: """Create a new job.""" job_id = str(uuid.uuid4())[:8] now = datetime.now() job = JobInfo( job_id=job_id, status=JobStatus.PENDING, created_at=now, updated_at=now, original_url=original_url, ) self._jobs[job_id] = job self._save_jobs() return job def get_job(self, job_id: str) -> Optional[JobInfo]: """Get a job by ID.""" return self._jobs.get(job_id) def update_job(self, job_id: str, **kwargs) -> Optional[JobInfo]: """Update a job.""" job = self._jobs.get(job_id) if job: for key, value in kwargs.items(): if hasattr(job, key): setattr(job, key, value) job.updated_at = datetime.now() self._save_jobs() return job def list_jobs(self, limit: int = 50) -> list[JobInfo]: """List recent jobs.""" jobs = sorted( self._jobs.values(), key=lambda j: j.created_at, reverse=True ) return jobs[:limit] def delete_job(self, job_id: str) -> bool: """Delete a job.""" if job_id in self._jobs: del self._jobs[job_id] self._save_jobs() return True return False # Global job store instance job_store = JobStore()