from fastapi import FastAPI, Depends, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from sqlalchemy.orm import Session from typing import List import uuid from . import models, schemas, auth, database from .database import engine, get_db models.Base.metadata.create_all(bind=engine) app = FastAPI(title="Dashy API") app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify your frontend URL allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.post("/token", response_model=schemas.Token) async def login_for_access_token(form_data: schemas.UserLogin, db: Session = Depends(get_db)): # Search by ID or Name user = db.query(models.User).filter( (models.User.id == form_data.id) | (models.User.name == form_data.id) ).first() if not user or not auth.verify_password(form_data.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = auth.create_access_token(data={"sub": user.id}) return {"access_token": access_token, "token_type": "bearer"} @app.get("/users", response_model=List[schemas.User]) def read_users(db: Session = Depends(get_db)): return db.query(models.User).all() @app.post("/users", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): db_user = models.User( id=user.id, name=user.name, role=user.role, hue=user.hue, initials=user.initials, email=user.email, phone=user.phone, photo=user.photo, account_type=user.account_type, password_hash=auth.get_password_hash(user.password) ) db.add(db_user) db.commit() db.refresh(db_user) return db_user @app.patch("/users/{user_id}", response_model=schemas.User) def update_user(user_id: str, user_update: schemas.UserUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): db_user = db.query(models.User).filter(models.User.id == user_id).first() if not db_user: raise HTTPException(status_code=404, detail="User not found") update_data = user_update.dict(exclude_unset=True) for key, value in update_data.items(): setattr(db_user, key, value) db.commit() db.refresh(db_user) return db_user @app.post("/users/{user_id}/password") def change_password(user_id: str, payload: schemas.PasswordChange, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): db_user = db.query(models.User).filter(models.User.id == user_id).first() if not db_user: raise HTTPException(status_code=404, detail="User not found") if not auth.verify_password(payload.old_password, db_user.password_hash): raise HTTPException(status_code=401, detail="Current password is incorrect") db_user.password_hash = auth.get_password_hash(payload.new_password) db.commit() return {"message": "Password updated"} @app.delete("/users/{user_id}") def delete_user(user_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): db_user = db.query(models.User).filter(models.User.id == user_id).first() if not db_user: raise HTTPException(status_code=404, detail="User not found") # Reassign tasks to rod db.query(models.Task).filter(models.Task.assignee_id == user_id).update({"assignee_id": "rod"}) db.delete(db_user) db.commit() return {"message": "User deleted"} @app.get("/tasks", response_model=List[schemas.Task]) def read_tasks(db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): return db.query(models.Task).filter(models.Task.deleted_at == None).all() @app.get("/tasks/deleted", response_model=List[schemas.Task]) def read_deleted_tasks(db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): if current_user.account_type != "admin": raise HTTPException(status_code=403, detail="Not enough permissions") return db.query(models.Task).filter(models.Task.deleted_at != None).all() @app.post("/tasks", response_model=schemas.Task) def create_task(task: schemas.TaskCreate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): task_id = task.id or f"t_{uuid.uuid4().hex[:8]}" db_task = models.Task( id=task_id, title=task.title, description=task.description, assignee_id=task.assignee_id, added_by=task.added_by, priority=task.priority, source=task.source, status=task.status, due_at=task.due_at, reminder_at=task.reminder_at ) db.add(db_task) for tag_name in task.tags: tag = db.query(models.Tag).filter(models.Tag.tag == tag_name).first() if not tag: tag = models.Tag(tag=tag_name) db.add(tag) db_task.tags.append(tag) db.commit() db.refresh(db_task) return db_task @app.patch("/tasks/{task_id}", response_model=schemas.Task) def update_task(task_id: str, task_update: schemas.TaskUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): db_task = db.query(models.Task).filter(models.Task.id == task_id).first() if not db_task: raise HTTPException(status_code=404, detail="Task not found") update_data = task_update.dict(exclude_unset=True) for key, value in update_data.items(): setattr(db_task, key, value) db.commit() db.refresh(db_task) return db_task @app.delete("/tasks/{task_id}") def delete_task(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): db_task = db.query(models.Task).filter(models.Task.id == task_id).first() if not db_task: raise HTTPException(status_code=404, detail="Task not found") from sqlalchemy.sql import func db_task.deleted_at = func.now() db.commit() return {"message": "Task moved to trash"} @app.post("/tasks/{task_id}/restore", response_model=schemas.Task) def restore_task(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): if current_user.account_type != "admin": raise HTTPException(status_code=403, detail="Not enough permissions") db_task = db.query(models.Task).filter(models.Task.id == task_id).first() if not db_task: raise HTTPException(status_code=404, detail="Task not found") db_task.deleted_at = None db.commit() db.refresh(db_task) return db_task @app.get("/workspace", response_model=schemas.Workspace) def read_workspace(db: Session = Depends(get_db)): ws = db.query(models.Workspace).first() if not ws: ws = models.Workspace(id="default", name="murchison-auto", timezone="Pacific/Auckland") db.add(ws) db.commit() db.refresh(ws) return ws @app.patch("/workspace", response_model=schemas.Workspace) def update_workspace(ws_update: schemas.WorkspaceUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): if current_user.account_type != "admin": raise HTTPException(status_code=403, detail="Not enough permissions") ws = db.query(models.Workspace).first() if not ws: ws = models.Workspace(id="default", name="murchison-auto", timezone="Pacific/Auckland") db.add(ws) update_data = ws_update.dict(exclude_unset=True) for key, value in update_data.items(): setattr(ws, key, value) db.commit() db.refresh(ws) return ws @app.get("/audit", response_model=List[schemas.AuditLog]) def read_audit(db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): return db.query(models.AuditLog).order_by(models.AuditLog.at.desc()).all() @app.post("/audit", response_model=schemas.AuditLog) def create_audit(audit: schemas.AuditLogBase, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): audit_id = f"a_{uuid.uuid4().hex[:8]}" db_audit = models.AuditLog( id=audit_id, actor=audit.actor, action=audit.action, summary=audit.summary, target=audit.target ) db.add(db_audit) db.commit() db.refresh(db_audit) return db_audit