Files
plumbing-dashy/backend/main.py
T

260 lines
10 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
from typing import List
import uuid
from . import models, schemas, auth, database
from .database import engine, get_db
models.Base.metadata.create_all(bind=engine)
app = FastAPI(title="Dashy API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify your frontend URL
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: schemas.UserLogin, db: Session = Depends(get_db)):
# Search by ID or Name
user = db.query(models.User).filter(
(models.User.id == form_data.id) |
(models.User.name == form_data.id)
).first()
if not user or not auth.verify_password(form_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = auth.create_access_token(data={"sub": user.id})
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users", response_model=List[schemas.User])
def read_users(db: Session = Depends(get_db)):
return db.query(models.User).all()
@app.post("/users", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
db_user = models.User(
id=user.id,
name=user.name,
role=user.role,
hue=user.hue,
initials=user.initials,
email=user.email,
phone=user.phone,
password_hash=auth.get_password_hash(user.password) if user.password else None,
account_type=user.account_type
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.patch("/users/{user_id}", response_model=schemas.User)
def update_user(user_id: str, user_update: schemas.UserUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
if current_user.account_type != "admin" and current_user.id != user_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
update_data = user_update.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user
@app.post("/users/{user_id}/password")
def change_password(user_id: str, pwd_data: schemas.PasswordChange, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
if current_user.id != user_id:
raise HTTPException(status_code=403, detail="Cannot change another user's password")
if not auth.verify_password(pwd_data.old_password, current_user.password_hash):
raise HTTPException(status_code=400, detail="Incorrect current password")
current_user.password_hash = auth.get_password_hash(pwd_data.new_password)
db.commit()
return {"message": "Password updated successfully"}
@app.delete("/users/{user_id}")
def delete_user(user_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
if current_user.account_type != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
# Check for OPEN assigned tasks (that are not in the trash)
open_tasks = db.query(models.Task).filter(
models.Task.assignee_id == user_id,
models.Task.status == "open",
models.Task.deleted_at == None
).first()
if open_tasks:
raise HTTPException(status_code=400, detail="Cannot delete user: They still have OPEN tasks assigned to them. Reassign them first.")
# Nullify references in closed tasks and notes so we don't lose history
db.query(models.Task).filter(models.Task.assignee_id == user_id).update({"assignee_id": None})
db.query(models.TaskNote).filter(models.TaskNote.author_id == user_id).update({"author_id": None})
db.delete(db_user)
db.commit()
return {"message": "User deleted"}
@app.get("/tasks", response_model=List[schemas.Task])
def read_tasks(db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
return db.query(models.Task).filter(models.Task.deleted_at == None).order_by(models.Task.position.asc()).all()
@app.get("/tasks/deleted", response_model=List[schemas.Task])
def read_deleted_tasks(db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
if current_user.account_type != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
return db.query(models.Task).filter(models.Task.deleted_at != None).order_by(models.Task.position.asc()).all()
@app.post("/tasks", response_model=schemas.Task)
def create_task(task: schemas.TaskCreate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
task_id = task.id or f"t_{uuid.uuid4().hex[:8]}"
# Calculate position (max in column + 1000)
max_pos = db.query(func.max(models.Task.position)).filter(models.Task.assignee_id == task.assignee_id).scalar() or 0.0
db_task = models.Task(
id=task_id,
title=task.title,
description=task.description,
assignee_id=task.assignee_id,
added_by=task.added_by,
priority=task.priority,
source=task.source,
status=task.status,
due_at=task.due_at,
reminder_at=task.reminder_at,
position=max_pos + 1000.0
)
db.add(db_task)
for tag_name in task.tags:
tag = db.query(models.Tag).filter(models.Tag.tag == tag_name).first()
if not tag:
tag = models.Tag(tag=tag_name)
db.add(tag)
db_task.tags.append(tag)
db.commit()
db.refresh(db_task)
return db_task
@app.patch("/tasks/{task_id}", response_model=schemas.Task)
def update_task(task_id: str, task_update: schemas.TaskUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
db_task = db.query(models.Task).filter(models.Task.id == task_id).first()
if not db_task:
raise HTTPException(status_code=404, detail="Task not found")
update_data = task_update.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_task, key, value)
db.commit()
db.refresh(db_task)
return db_task
@app.delete("/tasks/{task_id}")
def delete_task(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
db_task = db.query(models.Task).filter(models.Task.id == task_id).first()
if not db_task:
raise HTTPException(status_code=404, detail="Task not found")
db_task.deleted_at = func.now()
db.commit()
return {"message": "Task moved to trash"}
@app.get("/tasks/{task_id}/notes", response_model=List[schemas.TaskNote])
def read_task_notes(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
return db.query(models.TaskNote).filter(models.TaskNote.task_id == task_id).order_by(models.TaskNote.created_at.desc()).all()
@app.post("/tasks/{task_id}/notes", response_model=schemas.TaskNote)
def create_task_note(task_id: str, note: schemas.TaskNoteBase, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
db_note = models.TaskNote(
task_id=task_id,
author_id=current_user.id,
body=note.body
)
db.add(db_note)
db.commit()
db.refresh(db_note)
return db_note
@app.post("/tasks/{task_id}/restore", response_model=schemas.Task)
def restore_task(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
if current_user.account_type != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
db_task = db.query(models.Task).filter(models.Task.id == task_id).first()
if not db_task:
raise HTTPException(status_code=404, detail="Task not found")
db_task.deleted_at = None
db.commit()
db.refresh(db_task)
return db_task
@app.get("/workspace", response_model=schemas.Workspace)
def read_workspace(db: Session = Depends(get_db)):
ws = db.query(models.Workspace).first()
if not ws:
ws = models.Workspace(id="default", name="murchison-auto", timezone="Pacific/Auckland")
db.add(ws)
db.commit()
db.refresh(ws)
return ws
@app.patch("/workspace", response_model=schemas.Workspace)
def update_workspace(ws_update: schemas.WorkspaceUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
if current_user.account_type != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
ws = db.query(models.Workspace).first()
if not ws:
ws = models.Workspace(id="default", name="murchison-auto", timezone="Pacific/Auckland")
db.add(ws)
update_data = ws_update.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(ws, key, value)
db.commit()
db.refresh(ws)
return ws
@app.get("/audit", response_model=List[schemas.AuditLog])
def read_audit(db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
return db.query(models.AuditLog).order_by(models.AuditLog.at.desc()).all()
@app.post("/audit", response_model=schemas.AuditLog)
def create_audit(audit: schemas.AuditLogBase, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
audit_id = f"a_{uuid.uuid4().hex[:8]}"
db_audit = models.AuditLog(
id=audit_id,
actor=audit.actor,
action=audit.action,
summary=audit.summary,
target=audit.target
)
db.add(db_audit)
db.commit()
db.refresh(db_audit)
return db_audit