Files
plumbing-dashy/backend/main.py
T

314 lines
12 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
from typing import List, Dict
import uuid
import json
import asyncio
from . import models, schemas, auth, database
from .database import engine, get_db
models.Base.metadata.create_all(bind=engine)
class EventNotifier:
def __init__(self):
self.queues = []
def subscribe(self):
q = asyncio.Queue()
self.queues.append(q)
return q
def unsubscribe(self, q):
if q in self.queues:
self.queues.remove(q)
async def broadcast(self, message: str):
for q in self.queues:
await q.put(message)
manager = EventNotifier()
app = FastAPI(title="Dashy API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify your frontend URL
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/stream")
async def message_stream(request: Request):
async def event_generator():
q = manager.subscribe()
try:
while True:
if await request.is_disconnected():
break
# use a timeout so we can periodically check for disconnects
try:
msg = await asyncio.wait_for(q.get(), timeout=2.0)
yield f"data: {msg}\n\n"
except asyncio.TimeoutError:
# just a keep-alive ping
yield ": keepalive\n\n"
finally:
manager.unsubscribe(q)
return StreamingResponse(event_generator(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"})
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: schemas.UserLogin, db: Session = Depends(get_db)):
# Search by ID or Name
user = db.query(models.User).filter(
(models.User.id == form_data.id) |
(models.User.name == form_data.id)
).first()
if not user or not auth.verify_password(form_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = auth.create_access_token(data={"sub": user.id})
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users", response_model=List[schemas.User])
def read_users(db: Session = Depends(get_db)):
return db.query(models.User).all()
@app.post("/users", response_model=schemas.User)
async def create_user(user: schemas.UserCreate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
db_user = models.User(
id=user.id,
name=user.name,
role=user.role,
hue=user.hue,
initials=user.initials,
email=user.email,
phone=user.phone,
password_hash=auth.get_password_hash(user.password) if user.password else None,
account_type=user.account_type
)
db.add(db_user)
db.commit()
db.refresh(db_user)
await manager.broadcast(json.dumps({"type": "refresh"}))
return db_user
@app.patch("/users/{user_id}", response_model=schemas.User)
async def update_user(user_id: str, user_update: schemas.UserUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
if current_user.account_type != "admin" and current_user.id != user_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
update_data = user_update.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
await manager.broadcast(json.dumps({"type": "refresh"}))
return db_user
@app.post("/users/{user_id}/password")
async def change_password(user_id: str, pwd_data: schemas.PasswordChange, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
if current_user.id != user_id:
raise HTTPException(status_code=403, detail="Cannot change another user's password")
if not auth.verify_password(pwd_data.old_password, current_user.password_hash):
raise HTTPException(status_code=400, detail="Incorrect current password")
current_user.password_hash = auth.get_password_hash(pwd_data.new_password)
db.commit()
await manager.broadcast(json.dumps({"type": "refresh"}))
return {"message": "Password updated successfully"}
@app.delete("/users/{user_id}")
async def delete_user(user_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
if current_user.account_type != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
# Check for OPEN assigned tasks (that are not in the trash)
open_tasks = db.query(models.Task).filter(
models.Task.assignee_id == user_id,
models.Task.status == "open",
models.Task.deleted_at == None
).first()
if open_tasks:
raise HTTPException(status_code=400, detail="Cannot delete user: They still have OPEN tasks assigned to them. Reassign them first.")
# Nullify references in closed tasks and notes so we don't lose history
db.query(models.Task).filter(models.Task.assignee_id == user_id).update({"assignee_id": None})
db.query(models.TaskNote).filter(models.TaskNote.author_id == user_id).update({"author_id": None})
db.delete(db_user)
db.commit()
await manager.broadcast(json.dumps({"type": "refresh"}))
return {"message": "User deleted"}
@app.get("/tasks", response_model=List[schemas.Task])
def read_tasks(db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
return db.query(models.Task).filter(models.Task.deleted_at == None).order_by(models.Task.position.asc()).all()
@app.get("/tasks/deleted", response_model=List[schemas.Task])
def read_deleted_tasks(db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
if current_user.account_type != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
return db.query(models.Task).filter(models.Task.deleted_at != None).order_by(models.Task.position.asc()).all()
@app.post("/tasks", response_model=schemas.Task)
async def create_task(task: schemas.TaskCreate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
task_id = task.id or f"t_{uuid.uuid4().hex[:8]}"
# Calculate position (max in column + 1000)
max_pos = db.query(func.max(models.Task.position)).filter(models.Task.assignee_id == task.assignee_id).scalar() or 0.0
db_task = models.Task(
id=task_id,
title=task.title,
description=task.description,
assignee_id=task.assignee_id,
added_by=task.added_by,
priority=task.priority,
source=task.source,
status=task.status,
due_at=task.due_at,
reminder_at=task.reminder_at,
position=max_pos + 1000.0
)
db.add(db_task)
for tag_name in task.tags:
tag = db.query(models.Tag).filter(models.Tag.tag == tag_name).first()
if not tag:
tag = models.Tag(tag=tag_name)
db.add(tag)
db_task.tags.append(tag)
db.commit()
db.refresh(db_task)
await manager.broadcast(json.dumps({"type": "refresh"}))
return db_task
@app.patch("/tasks/{task_id}", response_model=schemas.Task)
async def update_task(task_id: str, task_update: schemas.TaskUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
db_task = db.query(models.Task).filter(models.Task.id == task_id).first()
if not db_task:
raise HTTPException(status_code=404, detail="Task not found")
update_data = task_update.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_task, key, value)
db.commit()
db.refresh(db_task)
await manager.broadcast(json.dumps({"type": "refresh"}))
return db_task
@app.delete("/tasks/{task_id}")
async def delete_task(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
db_task = db.query(models.Task).filter(models.Task.id == task_id).first()
if not db_task:
raise HTTPException(status_code=404, detail="Task not found")
db_task.deleted_at = func.now()
db.commit()
await manager.broadcast(json.dumps({"type": "refresh"}))
return {"message": "Task moved to trash"}
@app.get("/tasks/{task_id}/notes", response_model=List[schemas.TaskNote])
def read_task_notes(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
return db.query(models.TaskNote).filter(models.TaskNote.task_id == task_id).order_by(models.TaskNote.created_at.desc()).all()
@app.post("/tasks/{task_id}/notes", response_model=schemas.TaskNote)
async def create_task_note(task_id: str, note: schemas.TaskNoteBase, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
db_note = models.TaskNote(
task_id=task_id,
author_id=current_user.id,
body=note.body
)
db.add(db_note)
db.commit()
db.refresh(db_note)
await manager.broadcast(json.dumps({"type": "refresh"}))
return db_note
@app.post("/tasks/{task_id}/restore", response_model=schemas.Task)
async def restore_task(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
if current_user.account_type != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
db_task = db.query(models.Task).filter(models.Task.id == task_id).first()
if not db_task:
raise HTTPException(status_code=404, detail="Task not found")
db_task.deleted_at = None
db.commit()
db.refresh(db_task)
await manager.broadcast(json.dumps({"type": "refresh"}))
return db_task
@app.get("/workspace", response_model=schemas.Workspace)
async def read_workspace(db: Session = Depends(get_db)):
ws = db.query(models.Workspace).first()
if not ws:
ws = models.Workspace(id="default", name="murchison-auto", timezone="Pacific/Auckland")
db.add(ws)
db.commit()
db.refresh(ws)
await manager.broadcast(json.dumps({"type": "refresh"}))
return ws
@app.patch("/workspace", response_model=schemas.Workspace)
async def update_workspace(ws_update: schemas.WorkspaceUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
if current_user.account_type != "admin":
raise HTTPException(status_code=403, detail="Not enough permissions")
ws = db.query(models.Workspace).first()
if not ws:
ws = models.Workspace(id="default", name="murchison-auto", timezone="Pacific/Auckland")
db.add(ws)
update_data = ws_update.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(ws, key, value)
db.commit()
db.refresh(ws)
await manager.broadcast(json.dumps({"type": "refresh"}))
return ws
@app.get("/audit", response_model=List[schemas.AuditLog])
def read_audit(db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
return db.query(models.AuditLog).order_by(models.AuditLog.at.desc()).all()
@app.post("/audit", response_model=schemas.AuditLog)
async def create_audit(audit: schemas.AuditLogBase, db: Session = Depends(get_db), current_user: models.User = Depends(auth.get_current_user)):
audit_id = f"a_{uuid.uuid4().hex[:8]}"
db_audit = models.AuditLog(
id=audit_id,
actor=audit.actor,
action=audit.action,
summary=audit.summary,
target=audit.target
)
db.add(db_audit)
db.commit()
db.refresh(db_audit)
await manager.broadcast(json.dumps({"type": "refresh"}))
return db_audit