import json import os import sqlite3 from contextlib import closing from datetime import datetime from html import escape from urllib.parse import urlencode from fastapi import FastAPI, HTTPException, Query from fastapi.responses import HTMLResponse DB_PATH = os.getenv("WEBHOOK_DB_PATH", "./servicem8_webhooks.db") STATE_DB_PATH = os.getenv("WEBHOOK_STATE_DB_PATH", "./servicem8_quote_materials_state.db") APP_HOST = os.getenv("WEBHOOK_INSPECTOR_HOST", "127.0.0.1") APP_PORT = int(os.getenv("WEBHOOK_INSPECTOR_PORT", "18355")) PAGE_SIZE = 50 app = FastAPI(title="ServiceM8 Inspector", version="0.1.0") def get_conn(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def get_state_conn(): conn = sqlite3.connect(STATE_DB_PATH) conn.row_factory = sqlite3.Row return conn def html_page(title: str, body: str) -> HTMLResponse: nav = """ """ css = """ """ return HTMLResponse(f"{escape(title)}{css}{nav}

{escape(title)}

{body}") def pretty_json(value): try: if isinstance(value, str): parsed = json.loads(value) else: parsed = value return json.dumps(parsed, indent=2, ensure_ascii=False) except Exception: return str(value) def parse_json_field(value, default=None): if value is None: return default try: return json.loads(value) except Exception: return default def event_summary(payload): data = payload.get("data", {}) if isinstance(payload, dict) else {} related = payload.get("related", {}) if isinstance(payload, dict) else {} company = related.get("company", {}) if isinstance(related, dict) else {} job_number = data.get("generated_job_id", "") purchase_order_number = data.get("purchase_order_number", "") return { "event_type": payload.get("type", "") if isinstance(payload, dict) else "", "uuid": data.get("uuid", ""), "generated_job_id": job_number, "job_number": job_number, "purchase_order_number": purchase_order_number, "status": data.get("status", ""), "edit_date": data.get("edit_date", ""), "company_name": company.get("name", ""), } def format_pills(values): if not values: return "" return " ".join(f"{escape(str(v))}" for v in values) def link_with_params(path, **params): filtered = {k: v for k, v in params.items() if v not in (None, "")} return f"{path}?{urlencode(filtered)}" if filtered else path @app.get("/health") def health(): return {"ok": True, "db_path": DB_PATH, "state_db_path": STATE_DB_PATH} @app.get("/", response_class=HTMLResponse) def dashboard(): with closing(get_conn()) as conn: counts = { "webhook_events": conn.execute("select count(*) from webhook_events").fetchone()[0], "webhook_objects": conn.execute("select count(*) from webhook_objects").fetchone()[0], "webhook_form_responses": conn.execute("select count(*) from webhook_form_responses").fetchone()[0], } latest_event = conn.execute("select received_at from webhook_events order by id desc limit 1").fetchone() latest_object = conn.execute("select received_at from webhook_objects order by id desc limit 1").fetchone() latest_form = conn.execute("select received_at from webhook_form_responses order by id desc limit 1").fetchone() state_count = 0 latest_generated = None try: with closing(get_state_conn()) as conn: state_count = conn.execute("select count(*) from generated_job_materials").fetchone()[0] latest_generated = conn.execute("select updated_at from generated_job_materials order by id desc limit 1").fetchone() except sqlite3.Error: pass counts["generated_job_materials"] = state_count cards = "".join( f"
{escape(name)}
{count}
" for name, count in counts.items() ) body = f"""
{cards}
DB path
{escape(DB_PATH)}
State DB path
{escape(STATE_DB_PATH)}
Latest event
{escape(latest_event[0] if latest_event else '—')}
Latest object
{escape(latest_object[0] if latest_object else '—')}
Latest form response
{escape(latest_form[0] if latest_form else '—')}
Latest generated material row
{escape(latest_generated[0] if latest_generated else '—')}

Quick links

""" return html_page("ServiceM8 Inspector", body) @app.get("/events", response_class=HTMLResponse) def list_events(q: str = Query(""), page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE with closing(get_conn()) as conn: rows = conn.execute( "select id, received_at, payload_json, path from webhook_events order by id desc limit ? offset ?", (PAGE_SIZE, offset), ).fetchall() filtered = [] q_lower = q.lower().strip() q_digits = "".join(ch for ch in q if ch.isdigit()) for row in rows: payload = parse_json_field(row["payload_json"], {}) or {} summary = event_summary(payload) job_number = str(summary.get("job_number", "") or "") purchase_order_number = str(summary.get("purchase_order_number", "") or "") haystack = " ".join(str(v) for v in summary.values()).lower() + " " + str(payload).lower() job_match = bool(q_digits) and (q_digits in job_number or q_digits in purchase_order_number) if q_lower and not job_match and q_lower not in haystack: continue filtered.append((row, payload, summary)) table_rows = [] for row, payload, summary in filtered: table_rows.append( f"" f"{row['id']}" f"{escape(row['received_at'])}" f"{escape(summary['event_type'])}" f"{escape(summary['generated_job_id'])}" f"{escape(summary['uuid'])}" f"{escape(summary['status'])}" f"{escape(summary['company_name'])}" f"{escape(row['path'])}" f"" ) body = f"""
Search supports job number / generated job ID, UUID, status, company, or event type. Showing up to {PAGE_SIZE} latest rows, then filtering in-app.
{''.join(table_rows) or ""}
IDReceivedTypeJob IDUUIDStatusCompanyPath
No rows matched.
""" return html_page("Webhook events", body) @app.get("/events/{event_id}", response_class=HTMLResponse) def event_detail(event_id: int): with closing(get_conn()) as conn: row = conn.execute("select * from webhook_events where id = ?", (event_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Event not found") payload = parse_json_field(row["payload_json"], {}) or {} headers = parse_json_field(row["headers_json"], {}) or {} summary = event_summary(payload) related = payload.get("related", {}) if isinstance(payload, dict) else {} company = related.get("company", {}) if isinstance(related, dict) else {} contacts = related.get("jobContacts", []) if isinstance(related, dict) else [] materials = related.get("jobMaterials", []) if isinstance(related, dict) else [] body = f"""
ID
{row['id']}
Received
{escape(row['received_at'])}
Event type
{escape(summary['event_type'])}
UUID
{escape(summary['uuid'])}
Generated job ID
{escape(summary['generated_job_id'])}
Status
{escape(summary['status'])}
Company
{escape(summary['company_name'])}
Path
{escape(row['path'])}

Related summary

Company name
{escape(company.get('name', ''))}
Contacts
{len(contacts)}
Materials
{len(materials)}

Headers

{escape(pretty_json(headers))}

Payload

{escape(pretty_json(payload))}
""" return html_page(f"Event {event_id}", body) @app.get("/objects", response_class=HTMLResponse) def list_objects(page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE with closing(get_conn()) as conn: rows = conn.execute( "select id, received_at, object_type, object_uuid, changed_fields_json, object_time_utc, resource_url from webhook_objects order by id desc limit ? offset ?", (PAGE_SIZE, offset), ).fetchall() table_rows = [] for row in rows: changed = parse_json_field(row["changed_fields_json"], []) or [] table_rows.append( f"" f"{row['id']}" f"{escape(row['received_at'])}" f"{escape(row['object_type'] or '')}" f"{escape(row['object_uuid'] or '')}" f"{format_pills(changed[:8])}" f"{escape(row['object_time_utc'] or '')}" f"{escape(row['resource_url'] or '')}" f"" ) body = f""" {''.join(table_rows) or ""}
IDReceivedObjectUUIDChanged fieldsObject timeResource URL
No rows found.
""" return html_page("Object webhooks", body) @app.get("/objects/{object_id}", response_class=HTMLResponse) def object_detail(object_id: int): with closing(get_conn()) as conn: row = conn.execute("select * from webhook_objects where id = ?", (object_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Object row not found") payload = parse_json_field(row["payload_json"], {}) or {} headers = parse_json_field(row["headers_json"], {}) or {} changed = parse_json_field(row["changed_fields_json"], []) or [] body = f"""
ID
{row['id']}
Received
{escape(row['received_at'])}
Object type
{escape(row['object_type'] or '')}
Object UUID
{escape(row['object_uuid'] or '')}
Object time UTC
{escape(row['object_time_utc'] or '')}
Resource URL
{escape(row['resource_url'] or '')}
Changed fields
{format_pills(changed)}

Headers

{escape(pretty_json(headers))}

Payload

{escape(pretty_json(payload))}
""" return html_page(f"Object webhook {object_id}", body) @app.get("/form-responses", response_class=HTMLResponse) def list_form_responses(page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE with closing(get_conn()) as conn: rows = conn.execute( "select id, received_at, payload_json from webhook_form_responses order by id desc limit ? offset ?", (PAGE_SIZE, offset), ).fetchall() table_rows = [] for row in rows: payload = parse_json_field(row["payload_json"], {}) or {} data = payload.get("data", {}) if isinstance(payload, dict) else {} related = payload.get("related", {}) if isinstance(payload, dict) else {} job = related.get("job", {}) if isinstance(related, dict) else {} table_rows.append( f"" f"{row['id']}" f"{escape(row['received_at'])}" f"{escape(data.get('form_uuid', ''))}" f"{escape(data.get('regarding_object', ''))}" f"{escape(data.get('regarding_object_uuid', ''))}" f"{escape(job.get('generated_job_id', ''))}" f"{escape(job.get('status', ''))}" f"" ) body = f""" {''.join(table_rows) or ""}
IDReceivedForm UUIDRegardingObject UUIDJob IDJob status
No rows found.
""" return html_page("Form responses", body) @app.get("/generated-materials", response_class=HTMLResponse) def list_generated_materials(page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE try: with closing(get_state_conn()) as conn: rows = conn.execute( "select id, job_uuid, form_response_uuid, job_material_uuid, kind, source_question, source_text, updated_at from generated_job_materials order by id desc limit ? offset ?", (PAGE_SIZE, offset), ).fetchall() except sqlite3.Error as e: return html_page("Generated materials", f"
State DB unavailable: {escape(str(e))}
") table_rows = [] for row in rows: table_rows.append( f"" f"{row['id']}" f"{escape(row['job_uuid'] or '')}" f"{escape(row['form_response_uuid'] or '')}" f"{escape(row['job_material_uuid'] or '')}" f"{escape(row['kind'] or '')}" f"{escape(row['source_question'] or '')}" f"{escape(row['updated_at'] or '')}" f"" ) body = f""" {''.join(table_rows) or ""}
IDJob UUIDForm response UUIDJob material UUIDKindSource questionUpdated
No rows found.
""" return html_page("Generated materials", body) @app.get("/generated-materials/{row_id}", response_class=HTMLResponse) def generated_material_detail(row_id: int): try: with closing(get_state_conn()) as conn: row = conn.execute("select * from generated_job_materials where id = ?", (row_id,)).fetchone() except sqlite3.Error as e: return html_page("Generated material detail", f"
State DB unavailable: {escape(str(e))}
") if not row: raise HTTPException(status_code=404, detail="Generated material row not found") body = f"""
ID
{row['id']}
Job UUID
{escape(row['job_uuid'] or '')}
Form response UUID
{escape(row['form_response_uuid'] or '')}
Job material UUID
{escape(row['job_material_uuid'] or '')}
Kind
{escape(row['kind'] or '')}
Source field UUID
{escape(row['source_field_uuid'] or '')}
Source question
{escape(row['source_question'] or '')}
Source text
{escape(row['source_text'] or '')}
Created
{escape(row['created_at'] or '')}
Updated
{escape(row['updated_at'] or '')}
""" return html_page(f"Generated material {row_id}", body) @app.get("/form-responses/{row_id}", response_class=HTMLResponse) def form_response_detail(row_id: int): with closing(get_conn()) as conn: row = conn.execute("select * from webhook_form_responses where id = ?", (row_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Form response not found") payload = parse_json_field(row["payload_json"], {}) or {} headers = parse_json_field(row["headers_json"], {}) or {} data = payload.get("data", {}) if isinstance(payload, dict) else {} related = payload.get("related", {}) if isinstance(payload, dict) else {} job = related.get("job", {}) if isinstance(related, dict) else {} field_data_raw = data.get("field_data", "[]") field_data = parse_json_field(field_data_raw, []) or [] field_rows = [] for item in sorted(field_data, key=lambda x: x.get("SortOrder", 0)): field_rows.append( f"" f"{escape(str(item.get('SortOrder', '')))}" f"{escape(item.get('FieldType', ''))}" f"{escape(item.get('Question', ''))}" f"{escape(item.get('Response', ''))}" f"" ) body = f"""
ID
{row['id']}
Received
{escape(row['received_at'])}
Form UUID
{escape(data.get('form_uuid', ''))}
Regarding object
{escape(data.get('regarding_object', ''))}
Regarding UUID
{escape(data.get('regarding_object_uuid', ''))}
Timestamp
{escape(data.get('timestamp', ''))}
Job ID
{escape(job.get('generated_job_id', ''))}
Job status
{escape(job.get('status', ''))}

Decoded field data

{''.join(field_rows) or ""}
OrderTypeQuestionResponse
No decoded field data.

Related job

{escape(pretty_json(job))}

Headers

{escape(pretty_json(headers))}

Payload

{escape(pretty_json(payload))}
""" return html_page(f"Form response {row_id}", body) if __name__ == "__main__": import uvicorn uvicorn.run("servicem8_inspector:app", host=APP_HOST, port=APP_PORT, reload=False)