from fastapi import FastAPI, Depends, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from sqlalchemy.orm import Session from sqlalchemy.sql import func from typing import List import uuid from . import models, schemas, auth, database from .database import engine, get_db models.Base.metadata.create_all(bind=engine) app = FastAPI(title="Dashy API") app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify your frontend URL allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.post("/token", response_model=schemas.Token) async def login_for_access_token(form_data: schemas.UserLogin, db: Session = Depends(get_db)): # Search by ID or Name user = db.query(models.User).filter( (models.User.id == form_data.id) | (models.User.name == form_data.id) ).first() if not user or not auth.verify_password(form_data.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = auth.create_access_token(data={"sub": user.id}) return {"access_token": access_token, "token_type": "bearer"} @app.get("/users", response_model=List[schemas.User]) def read_users(db: Session = Depends(get_db)): return db.query(models.User).all() @app.post("/users", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): db_user = models.User( id=user.id, name=user.name, role=user.role, hue=user.hue, initials=user.initials, email=user.email, phone=user.phone, password_hash=auth.get_password_hash(user.password) if user.password else None, account_type=user.account_type ) db.add(db_user) db.commit() db.refresh(db_user) return db_user @app.patch("/users/{user_id}", response_model=schemas.User) def update_user(user_id: str, user_update: schemas.UserUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): if current_user.account_type != "admin" and current_user.id != user_id: raise HTTPException(status_code=403, detail="Not enough permissions") db_user = db.query(models.User).filter(models.User.id == user_id).first() if not db_user: raise HTTPException(status_code=404, detail="User not found") update_data = user_update.dict(exclude_unset=True) for key, value in update_data.items(): setattr(db_user, key, value) db.commit() db.refresh(db_user) return db_user @app.post("/users/{user_id}/password") def change_password(user_id: str, pwd_data: schemas.PasswordChange, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): if current_user.id != user_id: raise HTTPException(status_code=403, detail="Cannot change another user's password") if not auth.verify_password(pwd_data.old_password, current_user.password_hash): raise HTTPException(status_code=400, detail="Incorrect current password") current_user.password_hash = auth.get_password_hash(pwd_data.new_password) db.commit() return {"message": "Password updated successfully"} @app.delete("/users/{user_id}") def delete_user(user_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): if current_user.account_type != "admin": raise HTTPException(status_code=403, detail="Not enough permissions") db_user = db.query(models.User).filter(models.User.id == user_id).first() if not db_user: raise HTTPException(status_code=404, detail="User not found") # Check for OPEN assigned tasks (that are not in the trash) open_tasks = db.query(models.Task).filter( models.Task.assignee_id == user_id, models.Task.status == "open", models.Task.deleted_at == None ).first() if open_tasks: raise HTTPException(status_code=400, detail="Cannot delete user: They still have OPEN tasks assigned to them. Reassign them first.") # Nullify references in closed tasks and notes so we don't lose history db.query(models.Task).filter(models.Task.assignee_id == user_id).update({"assignee_id": None}) db.query(models.TaskNote).filter(models.TaskNote.author_id == user_id).update({"author_id": None}) db.delete(db_user) db.commit() return {"message": "User deleted"} @app.get("/tasks", response_model=List[schemas.Task]) def read_tasks(db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): return db.query(models.Task).filter(models.Task.deleted_at == None).order_by(models.Task.position.asc()).all() @app.get("/tasks/deleted", response_model=List[schemas.Task]) def read_deleted_tasks(db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): if current_user.account_type != "admin": raise HTTPException(status_code=403, detail="Not enough permissions") return db.query(models.Task).filter(models.Task.deleted_at != None).order_by(models.Task.position.asc()).all() @app.post("/tasks", response_model=schemas.Task) def create_task(task: schemas.TaskCreate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): task_id = task.id or f"t_{uuid.uuid4().hex[:8]}" # Calculate position (max in column + 1000) max_pos = db.query(func.max(models.Task.position)).filter(models.Task.assignee_id == task.assignee_id).scalar() or 0.0 db_task = models.Task( id=task_id, title=task.title, description=task.description, assignee_id=task.assignee_id, added_by=task.added_by, priority=task.priority, source=task.source, status=task.status, due_at=task.due_at, reminder_at=task.reminder_at, position=max_pos + 1000.0 ) db.add(db_task) for tag_name in task.tags: tag = db.query(models.Tag).filter(models.Tag.tag == tag_name).first() if not tag: tag = models.Tag(tag=tag_name) db.add(tag) db_task.tags.append(tag) db.commit() db.refresh(db_task) return db_task @app.patch("/tasks/{task_id}", response_model=schemas.Task) def update_task(task_id: str, task_update: schemas.TaskUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): db_task = db.query(models.Task).filter(models.Task.id == task_id).first() if not db_task: raise HTTPException(status_code=404, detail="Task not found") update_data = task_update.dict(exclude_unset=True) for key, value in update_data.items(): setattr(db_task, key, value) db.commit() db.refresh(db_task) return db_task @app.delete("/tasks/{task_id}") def delete_task(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): db_task = db.query(models.Task).filter(models.Task.id == task_id).first() if not db_task: raise HTTPException(status_code=404, detail="Task not found") db_task.deleted_at = func.now() db.commit() return {"message": "Task moved to trash"} @app.get("/tasks/{task_id}/notes", response_model=List[schemas.TaskNote]) def read_task_notes(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): return db.query(models.TaskNote).filter(models.TaskNote.task_id == task_id).order_by(models.TaskNote.created_at.desc()).all() @app.post("/tasks/{task_id}/notes", response_model=schemas.TaskNote) def create_task_note(task_id: str, note: schemas.TaskNoteBase, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): db_note = models.TaskNote( task_id=task_id, author_id=current_user.id, body=note.body ) db.add(db_note) db.commit() db.refresh(db_note) return db_note @app.post("/tasks/{task_id}/restore", response_model=schemas.Task) def restore_task(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): if current_user.account_type != "admin": raise HTTPException(status_code=403, detail="Not enough permissions") db_task = db.query(models.Task).filter(models.Task.id == task_id).first() if not db_task: raise HTTPException(status_code=404, detail="Task not found") db_task.deleted_at = None db.commit() db.refresh(db_task) return db_task @app.get("/workspace", response_model=schemas.Workspace) def read_workspace(db: Session = Depends(get_db)): ws = db.query(models.Workspace).first() if not ws: ws = models.Workspace(id="default", name="murchison-auto", timezone="Pacific/Auckland") db.add(ws) db.commit() db.refresh(ws) return ws @app.patch("/workspace", response_model=schemas.Workspace) def update_workspace(ws_update: schemas.WorkspaceUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): if current_user.account_type != "admin": raise HTTPException(status_code=403, detail="Not enough permissions") ws = db.query(models.Workspace).first() if not ws: ws = models.Workspace(id="default", name="murchison-auto", timezone="Pacific/Auckland") db.add(ws) update_data = ws_update.dict(exclude_unset=True) for key, value in update_data.items(): setattr(ws, key, value) db.commit() db.refresh(ws) return ws @app.get("/audit", response_model=List[schemas.AuditLog]) def read_audit(db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): return db.query(models.AuditLog).order_by(models.AuditLog.at.desc()).all() @app.post("/audit", response_model=schemas.AuditLog) def create_audit(audit: schemas.AuditLogBase, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)): audit_id = f"a_{uuid.uuid4().hex[:8]}" db_audit = models.AuditLog( id=audit_id, actor=audit.actor, action=audit.action, summary=audit.summary, target=audit.target ) db.add(db_audit) db.commit() db.refresh(db_audit) return db_audit