from fastapi import FastAPI, Depends, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from sqlalchemy.orm import Session from typing import List import uuid from . import models, schemas, auth, database from .database import engine, get_db models.Base.metadata.create_all(bind=engine) app = FastAPI(title="Dashy API") app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify your frontend URL allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.post("/token", response_model=schemas.Token) async def login_for_access_token(form_data: schemas.UserCreate, db: Session = Depends(get_db)): user = db.query(models.User).filter(models.User.id == form_data.id).first() if not user or not auth.verify_password(form_data.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = auth.create_access_token(data={"sub": user.id}) return {"access_token": access_token, "token_type": "bearer"} @app.get("/users", response_model=List[schemas.User]) def read_users(db: Session = Depends(get_db)): return db.query(models.User).all() @app.get("/tasks", response_model=List[schemas.Task]) def read_tasks(db: Session = Depends(get_db)): return db.query(models.Task).all() @app.post("/tasks", response_model=schemas.Task) def create_task(task: schemas.TaskCreate, db: Session = Depends(get_db)): task_id = task.id or f"t_{uuid.uuid4().hex[:8]}" db_task = models.Task( id=task_id, title=task.title, description=task.description, assignee_id=task.assignee_id, added_by=task.added_by, priority=task.priority, source=task.source, status=task.status, due_at=task.due_at, reminder_at=task.reminder_at ) db.add(db_task) for tag_name in task.tags: tag = db.query(models.Tag).filter(models.Tag.tag == tag_name).first() if not tag: tag = models.Tag(tag=tag_name) db.add(tag) db_task.tags.append(tag) db.commit() db.refresh(db_task) return db_task @app.patch("/tasks/{task_id}", response_model=schemas.Task) def update_task(task_id: str, task_update: schemas.TaskUpdate, db: Session = Depends(get_db)): db_task = db.query(models.Task).filter(models.Task.id == task_id).first() if not db_task: raise HTTPException(status_code=404, detail="Task not found") update_data = task_update.dict(exclude_unset=True) for key, value in update_data.items(): setattr(db_task, key, value) db.commit() db.refresh(db_task) return db_task @app.get("/audit", response_model=List[schemas.AuditLog]) def read_audit(db: Session = Depends(get_db)): return db.query(models.AuditLog).order_by(models.AuditLog.at.desc()).all() @app.post("/audit", response_model=schemas.AuditLog) def create_audit(audit: schemas.AuditLogBase, db: Session = Depends(get_db)): audit_id = f"a_{uuid.uuid4().hex[:8]}" db_audit = models.AuditLog( id=audit_id, actor=audit.actor, action=audit.action, summary=audit.summary, target=audit.target ) db.add(db_audit) db.commit() db.refresh(db_audit) return db_audit