import json
import os
import sqlite3
from contextlib import closing
from datetime import datetime
from html import escape
from urllib.parse import urlencode
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import HTMLResponse
DB_PATH = os.getenv("WEBHOOK_DB_PATH", "./servicem8_webhooks.db")
APP_HOST = os.getenv("WEBHOOK_INSPECTOR_HOST", "127.0.0.1")
APP_PORT = int(os.getenv("WEBHOOK_INSPECTOR_PORT", "18355"))
PAGE_SIZE = 50
app = FastAPI(title="ServiceM8 Inspector", version="0.1.0")
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def html_page(title: str, body: str) -> HTMLResponse:
nav = """
Dashboard
Events
Objects
Form responses
"""
css = """
"""
return HTMLResponse(f"
{escape(title)} {css}{nav}{escape(title)} {body}")
def pretty_json(value):
try:
if isinstance(value, str):
parsed = json.loads(value)
else:
parsed = value
return json.dumps(parsed, indent=2, ensure_ascii=False)
except Exception:
return str(value)
def parse_json_field(value, default=None):
if value is None:
return default
try:
return json.loads(value)
except Exception:
return default
def event_summary(payload):
data = payload.get("data", {}) if isinstance(payload, dict) else {}
related = payload.get("related", {}) if isinstance(payload, dict) else {}
company = related.get("company", {}) if isinstance(related, dict) else {}
job_number = data.get("generated_job_id", "")
purchase_order_number = data.get("purchase_order_number", "")
return {
"event_type": payload.get("type", "") if isinstance(payload, dict) else "",
"uuid": data.get("uuid", ""),
"generated_job_id": job_number,
"job_number": job_number,
"purchase_order_number": purchase_order_number,
"status": data.get("status", ""),
"edit_date": data.get("edit_date", ""),
"company_name": company.get("name", ""),
}
def format_pills(values):
if not values:
return ""
return " ".join(f"{escape(str(v))} " for v in values)
def link_with_params(path, **params):
filtered = {k: v for k, v in params.items() if v not in (None, "")}
return f"{path}?{urlencode(filtered)}" if filtered else path
@app.get("/health")
def health():
return {"ok": True, "db_path": DB_PATH}
@app.get("/", response_class=HTMLResponse)
def dashboard():
with closing(get_conn()) as conn:
counts = {
"webhook_events": conn.execute("select count(*) from webhook_events").fetchone()[0],
"webhook_objects": conn.execute("select count(*) from webhook_objects").fetchone()[0],
"webhook_form_responses": conn.execute("select count(*) from webhook_form_responses").fetchone()[0],
}
latest_event = conn.execute("select received_at from webhook_events order by id desc limit 1").fetchone()
latest_object = conn.execute("select received_at from webhook_objects order by id desc limit 1").fetchone()
latest_form = conn.execute("select received_at from webhook_form_responses order by id desc limit 1").fetchone()
cards = "".join(
f""
for name, count in counts.items()
)
body = f"""
{cards}
DB path
{escape(DB_PATH)}
Latest event
{escape(latest_event[0] if latest_event else '—')}
Latest object
{escape(latest_object[0] if latest_object else '—')}
Latest form response
{escape(latest_form[0] if latest_form else '—')}
"""
return html_page("ServiceM8 Inspector", body)
@app.get("/events", response_class=HTMLResponse)
def list_events(q: str = Query(""), page: int = Query(1, ge=1)):
offset = (page - 1) * PAGE_SIZE
with closing(get_conn()) as conn:
rows = conn.execute(
"select id, received_at, payload_json, path from webhook_events order by id desc limit ? offset ?",
(PAGE_SIZE, offset),
).fetchall()
filtered = []
q_lower = q.lower().strip()
q_digits = "".join(ch for ch in q if ch.isdigit())
for row in rows:
payload = parse_json_field(row["payload_json"], {}) or {}
summary = event_summary(payload)
job_number = str(summary.get("job_number", "") or "")
purchase_order_number = str(summary.get("purchase_order_number", "") or "")
haystack = " ".join(str(v) for v in summary.values()).lower() + " " + str(payload).lower()
job_match = bool(q_digits) and (q_digits in job_number or q_digits in purchase_order_number)
if q_lower and not job_match and q_lower not in haystack:
continue
filtered.append((row, payload, summary))
table_rows = []
for row, payload, summary in filtered:
table_rows.append(
f""
f"{row['id']} "
f"{escape(row['received_at'])} "
f"{escape(summary['event_type'])} "
f"{escape(summary['generated_job_id'])} "
f"{escape(summary['uuid'])} "
f"{escape(summary['status'])} "
f"{escape(summary['company_name'])} "
f"{escape(row['path'])} "
f" "
)
body = f"""
ID Received Type Job ID UUID Status Company Path
{''.join(table_rows) or "No rows matched. "}
"""
return html_page("Webhook events", body)
@app.get("/events/{event_id}", response_class=HTMLResponse)
def event_detail(event_id: int):
with closing(get_conn()) as conn:
row = conn.execute("select * from webhook_events where id = ?", (event_id,)).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Event not found")
payload = parse_json_field(row["payload_json"], {}) or {}
headers = parse_json_field(row["headers_json"], {}) or {}
summary = event_summary(payload)
related = payload.get("related", {}) if isinstance(payload, dict) else {}
company = related.get("company", {}) if isinstance(related, dict) else {}
contacts = related.get("jobContacts", []) if isinstance(related, dict) else []
materials = related.get("jobMaterials", []) if isinstance(related, dict) else []
body = f"""
ID
{row['id']}
Received
{escape(row['received_at'])}
Event type
{escape(summary['event_type'])}
UUID
{escape(summary['uuid'])}
Generated job ID
{escape(summary['generated_job_id'])}
Status
{escape(summary['status'])}
Company
{escape(summary['company_name'])}
Path
{escape(row['path'])}
Related summary
Company name
{escape(company.get('name', ''))}
Contacts
{len(contacts)}
Materials
{len(materials)}
Headers
{escape(pretty_json(headers))}
Payload
{escape(pretty_json(payload))}
"""
return html_page(f"Event {event_id}", body)
@app.get("/objects", response_class=HTMLResponse)
def list_objects(page: int = Query(1, ge=1)):
offset = (page - 1) * PAGE_SIZE
with closing(get_conn()) as conn:
rows = conn.execute(
"select id, received_at, object_type, object_uuid, changed_fields_json, object_time_utc, resource_url from webhook_objects order by id desc limit ? offset ?",
(PAGE_SIZE, offset),
).fetchall()
table_rows = []
for row in rows:
changed = parse_json_field(row["changed_fields_json"], []) or []
table_rows.append(
f""
f"{row['id']} "
f"{escape(row['received_at'])} "
f"{escape(row['object_type'] or '')} "
f"{escape(row['object_uuid'] or '')} "
f"{format_pills(changed[:8])} "
f"{escape(row['object_time_utc'] or '')} "
f"{escape(row['resource_url'] or '')} "
f" "
)
body = f"""
ID Received Object UUID Changed fields Object time Resource URL
{''.join(table_rows) or "No rows found. "}
"""
return html_page("Object webhooks", body)
@app.get("/objects/{object_id}", response_class=HTMLResponse)
def object_detail(object_id: int):
with closing(get_conn()) as conn:
row = conn.execute("select * from webhook_objects where id = ?", (object_id,)).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Object row not found")
payload = parse_json_field(row["payload_json"], {}) or {}
headers = parse_json_field(row["headers_json"], {}) or {}
changed = parse_json_field(row["changed_fields_json"], []) or []
body = f"""
ID
{row['id']}
Received
{escape(row['received_at'])}
Object type
{escape(row['object_type'] or '')}
Object UUID
{escape(row['object_uuid'] or '')}
Object time UTC
{escape(row['object_time_utc'] or '')}
Resource URL
Changed fields
{format_pills(changed)}
Headers {escape(pretty_json(headers))}
Payload {escape(pretty_json(payload))}
"""
return html_page(f"Object webhook {object_id}", body)
@app.get("/form-responses", response_class=HTMLResponse)
def list_form_responses(page: int = Query(1, ge=1)):
offset = (page - 1) * PAGE_SIZE
with closing(get_conn()) as conn:
rows = conn.execute(
"select id, received_at, payload_json from webhook_form_responses order by id desc limit ? offset ?",
(PAGE_SIZE, offset),
).fetchall()
table_rows = []
for row in rows:
payload = parse_json_field(row["payload_json"], {}) or {}
data = payload.get("data", {}) if isinstance(payload, dict) else {}
related = payload.get("related", {}) if isinstance(payload, dict) else {}
job = related.get("job", {}) if isinstance(related, dict) else {}
table_rows.append(
f""
f"{row['id']} "
f"{escape(row['received_at'])} "
f"{escape(data.get('form_uuid', ''))} "
f"{escape(data.get('regarding_object', ''))} "
f"{escape(data.get('regarding_object_uuid', ''))} "
f"{escape(job.get('generated_job_id', ''))} "
f"{escape(job.get('status', ''))} "
f" "
)
body = f"""
ID Received Form UUID Regarding Object UUID Job ID Job status
{''.join(table_rows) or "No rows found. "}
"""
return html_page("Form responses", body)
@app.get("/form-responses/{row_id}", response_class=HTMLResponse)
def form_response_detail(row_id: int):
with closing(get_conn()) as conn:
row = conn.execute("select * from webhook_form_responses where id = ?", (row_id,)).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Form response not found")
payload = parse_json_field(row["payload_json"], {}) or {}
headers = parse_json_field(row["headers_json"], {}) or {}
data = payload.get("data", {}) if isinstance(payload, dict) else {}
related = payload.get("related", {}) if isinstance(payload, dict) else {}
job = related.get("job", {}) if isinstance(related, dict) else {}
field_data_raw = data.get("field_data", "[]")
field_data = parse_json_field(field_data_raw, []) or []
field_rows = []
for item in sorted(field_data, key=lambda x: x.get("SortOrder", 0)):
field_rows.append(
f""
f"{escape(str(item.get('SortOrder', '')))} "
f"{escape(item.get('FieldType', ''))} "
f"{escape(item.get('Question', ''))} "
f"{escape(item.get('Response', ''))} "
f" "
)
body = f"""
ID
{row['id']}
Received
{escape(row['received_at'])}
Form UUID
{escape(data.get('form_uuid', ''))}
Regarding object
{escape(data.get('regarding_object', ''))}
Regarding UUID
{escape(data.get('regarding_object_uuid', ''))}
Timestamp
{escape(data.get('timestamp', ''))}
Job ID
{escape(job.get('generated_job_id', ''))}
Job status
{escape(job.get('status', ''))}
Decoded field data
Order Type Question Response
{''.join(field_rows) or "No decoded field data. "}
Related job {escape(pretty_json(job))}
Headers {escape(pretty_json(headers))}
Payload {escape(pretty_json(payload))}
"""
return html_page(f"Form response {row_id}", body)
if __name__ == "__main__":
import uvicorn
uvicorn.run("servicem8_inspector:app", host=APP_HOST, port=APP_PORT, reload=False)