{body}")
def pretty_json(value):
try:
if isinstance(value, str):
parsed = json.loads(value)
else:
parsed = value
return json.dumps(parsed, indent=2, ensure_ascii=False)
except Exception:
return str(value)
def html_value(value):
if value is None:
return ""
if isinstance(value, (dict, list)):
return pretty_json(value)
return str(value)
def parse_json_field(value, default=None):
if value is None:
return default
try:
return json.loads(value)
except Exception:
return default
def event_summary(payload):
data = payload.get("data", {}) if isinstance(payload, dict) else {}
related = payload.get("related", {}) if isinstance(payload, dict) else {}
company = related.get("company", {}) if isinstance(related, dict) else {}
job_number = data.get("generated_job_id", "")
purchase_order_number = data.get("purchase_order_number", "")
return {
"event_type": payload.get("type", "") if isinstance(payload, dict) else "",
"uuid": data.get("uuid", ""),
"generated_job_id": job_number,
"job_number": job_number,
"purchase_order_number": purchase_order_number,
"status": data.get("status", ""),
"edit_date": data.get("edit_date", ""),
"company_name": company.get("name", ""),
}
def format_pills(values):
if not values:
return ""
return " ".join(f"{escape(str(v))}" for v in values)
def link_with_params(path, **params):
filtered = {k: v for k, v in params.items() if v not in (None, "")}
return f"{path}?{urlencode(filtered)}" if filtered else path
def resolve_generated_job_id(job_uuid: str) -> str:
job_uuid = str(job_uuid or "").strip()
if not job_uuid:
return ""
try:
with closing(get_poll_conn()) as conn:
row = conn.execute(
"select generated_job_id from job_metadata where job_uuid = ?",
(job_uuid,),
).fetchone()
if row and row["generated_job_id"]:
return str(row["generated_job_id"])
except sqlite3.Error:
pass
try:
with closing(get_conn()) as conn:
row = conn.execute(
"""
with jobs as (
select
json_extract(payload_json, '$.data.uuid') as job_uuid,
json_extract(payload_json, '$.data.generated_job_id') as generated_job_id,
received_at
from webhook_events
where json_extract(payload_json, '$.data.generated_job_id') is not null
union all
select
json_extract(payload_json, '$.related.job.uuid') as job_uuid,
json_extract(payload_json, '$.related.job.generated_job_id') as generated_job_id,
received_at
from webhook_form_responses
where json_extract(payload_json, '$.related.job.generated_job_id') is not null
)
select generated_job_id
from jobs
where job_uuid = ?
and generated_job_id is not null
order by received_at desc
limit 1
""",
(job_uuid,),
).fetchone()
if row and row["generated_job_id"]:
return str(row["generated_job_id"])
except sqlite3.Error:
pass
return ""
def job_id_html(job_uuid: str) -> str:
generated_job_id = resolve_generated_job_id(job_uuid)
return f"{escape(generated_job_id)}" if generated_job_id else ""
@app.get("/health")
def health():
return {"ok": True, "db_path": DB_PATH, "state_db_path": STATE_DB_PATH, "poll_db_path": POLL_DB_PATH}
@app.get("/", response_class=HTMLResponse)
def dashboard():
with closing(get_conn()) as conn:
counts = {
"webhook_events": conn.execute("select count(*) from webhook_events").fetchone()[0],
"webhook_objects": conn.execute("select count(*) from webhook_objects").fetchone()[0],
"webhook_form_responses": conn.execute("select count(*) from webhook_form_responses").fetchone()[0],
}
latest_event = conn.execute("select received_at from webhook_events order by id desc limit 1").fetchone()
latest_object = conn.execute("select received_at from webhook_objects order by id desc limit 1").fetchone()
latest_form = conn.execute("select received_at from webhook_form_responses order by id desc limit 1").fetchone()
state_count = 0
latest_generated = None
try:
with closing(get_state_conn()) as conn:
state_count = conn.execute("select count(*) from generated_job_materials").fetchone()[0]
latest_generated = conn.execute("select updated_at from generated_job_materials order by id desc limit 1").fetchone()
except sqlite3.Error:
pass
poll_counts = {
"poll_runs": 0,
"polled_form_responses": 0,
"polled_quote_templates": 0,
"remote_existing_incidents": 0,
}
latest_poll_run = None
latest_polled_form = None
latest_polled_quote = None
latest_remote_existing_incident = None
try:
with closing(get_poll_conn()) as conn:
poll_counts["poll_runs"] = conn.execute("select count(*) from poll_runs").fetchone()[0]
poll_counts["polled_form_responses"] = conn.execute("select count(*) from form_responses_raw").fetchone()[0]
poll_counts["polled_quote_templates"] = conn.execute("select count(*) from quote_template_form_responses").fetchone()[0]
try:
poll_counts["remote_existing_incidents"] = conn.execute("select count(*) from quote_template_remote_existing_incidents").fetchone()[0]
latest_remote_existing_incident = conn.execute("select detected_at from quote_template_remote_existing_incidents order by id desc limit 1").fetchone()
except sqlite3.Error:
pass
latest_poll_run = conn.execute("select finished_at from poll_runs order by id desc limit 1").fetchone()
latest_polled_form = conn.execute("select timestamp from form_responses_raw order by timestamp desc limit 1").fetchone()
latest_polled_quote = conn.execute("select discovered_at from quote_template_form_responses order by discovered_at desc limit 1").fetchone()
except sqlite3.Error:
pass
counts["generated_job_materials"] = state_count
counts.update(poll_counts)
cards = "".join(
f"
{escape(name)}
{count}
"
for name, count in counts.items()
)
body = f"""
{cards}
DB path
{escape(DB_PATH)}
State DB path
{escape(STATE_DB_PATH)}
Poll DB path
{escape(POLL_DB_PATH)}
Latest event
{escape(latest_event[0] if latest_event else '—')}
Latest object
{escape(latest_object[0] if latest_object else '—')}
Latest form response
{escape(latest_form[0] if latest_form else '—')}
Latest generated material row
{escape(latest_generated[0] if latest_generated else '—')}
Latest poll run
{escape(latest_poll_run[0] if latest_poll_run else '—')}
Latest polled form timestamp
{escape(latest_polled_form[0] if latest_polled_form else '—')}
Latest polled quote discovered
{escape(latest_polled_quote[0] if latest_polled_quote else '—')}
Latest remote-existing incident
{escape(latest_remote_existing_incident[0] if latest_remote_existing_incident else '—')}
"""
return html_page("ServiceM8 Inspector", body)
@app.get("/events", response_class=HTMLResponse)
def list_events(q: str = Query(""), page: int = Query(1, ge=1)):
offset = (page - 1) * PAGE_SIZE
with closing(get_conn()) as conn:
rows = conn.execute(
"select id, received_at, payload_json, path from webhook_events order by id desc limit ? offset ?",
(PAGE_SIZE, offset),
).fetchall()
filtered = []
q_lower = q.lower().strip()
q_digits = "".join(ch for ch in q if ch.isdigit())
for row in rows:
payload = parse_json_field(row["payload_json"], {}) or {}
summary = event_summary(payload)
job_number = str(summary.get("job_number", "") or "")
purchase_order_number = str(summary.get("purchase_order_number", "") or "")
haystack = " ".join(str(v) for v in summary.values()).lower() + " " + str(payload).lower()
job_match = bool(q_digits) and (q_digits in job_number or q_digits in purchase_order_number)
if q_lower and not job_match and q_lower not in haystack:
continue
filtered.append((row, payload, summary))
table_rows = []
for row, payload, summary in filtered:
table_rows.append(
f"
"""
return html_page("Object webhooks", body)
@app.get("/objects/{object_id}", response_class=HTMLResponse)
def object_detail(object_id: int):
with closing(get_conn()) as conn:
row = conn.execute("select * from webhook_objects where id = ?", (object_id,)).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Object row not found")
payload = parse_json_field(row["payload_json"], {}) or {}
headers = parse_json_field(row["headers_json"], {}) or {}
changed = parse_json_field(row["changed_fields_json"], []) or []
body = f"""
"""
return html_page("Generated materials", body)
@app.get("/generated-materials/{row_id}", response_class=HTMLResponse)
def generated_material_detail(row_id: int):
try:
with closing(get_state_conn()) as conn:
row = conn.execute("select * from generated_job_materials where id = ?", (row_id,)).fetchone()
except sqlite3.Error as e:
return html_page("Generated material detail", f"
State DB unavailable: {escape(str(e))}
")
if not row:
raise HTTPException(status_code=404, detail="Generated material row not found")
body = f"""
ID
{row['id']}
Job ID
{job_id_html(row['job_uuid'])}
Job UUID
{escape(row['job_uuid'] or '')}
Form response UUID
{escape(row['form_response_uuid'] or '')}
Job material UUID
{escape(row['job_material_uuid'] or '')}
Kind
{escape(row['kind'] or '')}
Source field UUID
{escape(row['source_field_uuid'] or '')}
Source question
{escape(row['source_question'] or '')}
Source text
{escape(row['source_text'] or '')}
Created
{escape(row['created_at'] or '')}
Updated
{escape(row['updated_at'] or '')}
"""
return html_page(f"Generated material {row_id}", body)
@app.get("/form-responses/{row_id}", response_class=HTMLResponse)
def form_response_detail(row_id: int):
with closing(get_conn()) as conn:
row = conn.execute("select * from webhook_form_responses where id = ?", (row_id,)).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Form response not found")
payload = parse_json_field(row["payload_json"], {}) or {}
headers = parse_json_field(row["headers_json"], {}) or {}
data = payload.get("data", {}) if isinstance(payload, dict) else {}
related = payload.get("related", {}) if isinstance(payload, dict) else {}
job = related.get("job", {}) if isinstance(related, dict) else {}
field_data_raw = data.get("field_data", "[]")
field_data = parse_json_field(field_data_raw, []) or []
field_rows = []
for item in sorted(field_data, key=lambda x: x.get("SortOrder", 0)):
field_rows.append(
f"
"
f"
{escape(html_value(item.get('SortOrder')))}
"
f"
{escape(html_value(item.get('FieldType')))}
"
f"
{escape(html_value(item.get('Question')))}
"
f"
{escape(html_value(item.get('Response')))}
"
f"
"
)
body = f"""
ID
{row['id']}
Received
{escape(row['received_at'])}
Form UUID
{escape(data.get('form_uuid', ''))}
Regarding object
{escape(data.get('regarding_object', ''))}
Regarding UUID
{escape(data.get('regarding_object_uuid', ''))}
Timestamp
{escape(data.get('timestamp', ''))}
Job ID
{escape(job.get('generated_job_id', ''))}
Job status
{escape(job.get('status', ''))}
Decoded field data
Order
Type
Question
Response
{''.join(field_rows) or "
No decoded field data.
"}
Related job
{escape(pretty_json(job))}
Headers
{escape(pretty_json(headers))}
Payload
{escape(pretty_json(payload))}
"""
return html_page(f"Form response {row_id}", body)
@app.get("/poll/remote-existing-incidents", response_class=HTMLResponse)
def list_remote_existing_incidents(page: int = Query(1, ge=1)):
offset = (page - 1) * PAGE_SIZE
try:
with closing(get_poll_conn()) as conn:
rows = conn.execute(
"""
select id, detected_at, form_response_uuid, job_uuid, apply_run_id,
desired_count, remote_count, remote_active_count, action, reason
from quote_template_remote_existing_incidents
order by id desc
limit ? offset ?
""",
(PAGE_SIZE, offset),
).fetchall()
except sqlite3.Error as e:
return html_page("Remote existing incidents", f"
Incident table unavailable: {escape(str(e))}
")
table_rows = []
for row in rows:
run_link = f"{row['apply_run_id']}" if row['apply_run_id'] else ""
table_rows.append(
f"
"""
return html_page("Remote existing incidents", body)
@app.get("/poll/remote-existing-incidents/{incident_id}", response_class=HTMLResponse)
def remote_existing_incident_detail(incident_id: int):
try:
with closing(get_poll_conn()) as conn:
row = conn.execute("select * from quote_template_remote_existing_incidents where id = ?", (incident_id,)).fetchone()
except sqlite3.Error as e:
return html_page("Remote existing incident", f"
Incident table unavailable: {escape(str(e))}
")
if not row:
raise HTTPException(status_code=404, detail="Remote existing incident not found")
remote_rows = parse_json_field(row["remote_rows_json"], []) or []
material_rows = []
if isinstance(remote_rows, list):
for item in remote_rows:
if isinstance(item, dict):
material_rows.append(
f"
"""
return html_page("Apply runs", body)
@app.get("/poll/apply-runs/{run_id}", response_class=HTMLResponse)
def apply_run_detail(run_id: int):
try:
with closing(get_poll_conn()) as conn:
run = conn.execute("select * from quote_template_apply_runs where id = ?", (run_id,)).fetchone()
rows = conn.execute(
"select * from quote_template_apply_run_rows where run_id = ? order by row_index asc",
(run_id,),
).fetchall()
incidents = conn.execute(
"select id, detected_at, action, remote_count, remote_active_count, reason from quote_template_remote_existing_incidents where apply_run_id = ? order by id desc",
(run_id,),
).fetchall()
except sqlite3.Error as e:
return html_page("Apply run", f"
Apply-run table unavailable: {escape(str(e))}
")
if not run:
raise HTTPException(status_code=404, detail="Apply run not found")
incident_rows = []
for incident in incidents:
incident_rows.append(
f"
"""
return html_page("Poll runs", body)
@app.get("/poll/form-responses", response_class=HTMLResponse)
def list_polled_form_responses(q: str = Query(""), quote_only: int = Query(0), page: int = Query(1, ge=1)):
offset = (page - 1) * PAGE_SIZE
clauses = []
params = []
if quote_only:
clauses.append("is_quote_template = 1")
if q.strip():
like = f"%{q.strip()}%"
clauses.append("(uuid like ? or form_uuid like ? or regarding_object_uuid like ? or timestamp like ? or edit_date like ? or raw_json like ?)")
params.extend([like, like, like, like, like, like])
where = " where " + " and ".join(clauses) if clauses else ""
try:
with closing(get_poll_conn()) as conn:
rows = conn.execute(
f"""
select uuid, first_seen_at, last_seen_at, seen_count, timestamp, edit_date,
form_uuid, regarding_object, regarding_object_uuid,
is_quote_template, parse_status, parse_error
from form_responses_raw {where}
order by timestamp desc, edit_date desc limit ? offset ?
""",
(*params, PAGE_SIZE, offset),
).fetchall()
except sqlite3.Error as e:
return html_page("Polled form responses", f"
Poll DB unavailable: {escape(str(e))}
")
table_rows = []
for row in rows:
quote_pill = "Quote Template" if row["is_quote_template"] else ""
table_rows.append(
f"
"""
return html_page("Polled form responses", body)
@app.get("/poll/form-responses/{form_response_uuid}", response_class=HTMLResponse)
def polled_form_response_detail(form_response_uuid: str):
try:
with closing(get_poll_conn()) as conn:
row = conn.execute("select * from form_responses_raw where uuid = ?", (form_response_uuid,)).fetchone()
quote = conn.execute("select * from quote_template_form_responses where form_response_uuid = ?", (form_response_uuid,)).fetchone()
except sqlite3.Error as e:
return html_page("Polled form response", f"
Poll DB unavailable: {escape(str(e))}
")
if not row:
raise HTTPException(status_code=404, detail="Polled form response not found")
raw = parse_json_field(row["raw_json"], {}) or {}
field_data = parse_json_field(raw.get("field_data"), []) or []
field_rows = []
if isinstance(field_data, list):
for item in sorted(field_data, key=lambda x: x.get("SortOrder", 0) if isinstance(x, dict) else 0):
if isinstance(item, dict):
field_rows.append(
f"
"""
return html_page("Polled Quote Template responses", body)
@app.get("/poll/quote-template/{form_response_uuid}", response_class=HTMLResponse)
def polled_quote_template_detail(form_response_uuid: str):
try:
with closing(get_poll_conn()) as conn:
row = conn.execute("select * from quote_template_form_responses where form_response_uuid = ?", (form_response_uuid,)).fetchone()
except sqlite3.Error as e:
return html_page("Polled Quote Template response", f"
Poll DB unavailable: {escape(str(e))}
")
if not row:
raise HTTPException(status_code=404, detail="Polled quote template response not found")
parsed = parse_json_field(row["parsed_json"], {}) or {}
materials = parse_json_field(row["desired_job_materials_json"], []) or []
material_rows = []
for item in materials:
material_rows.append(
f"
{escape(str(item.get('sort_order', '')))}
{escape(item.get('kind', ''))}
"
f"
{escape(item.get('name', ''))}
{escape(item.get('material_uuid', ''))}
"
f"
{escape(item.get('source_question', ''))}
"
)
dry_run_cmd = f"/opt/webhooks/apply_polled_quote_template_jobmaterials.py --uuid {row['form_response_uuid']} --pretty"
apply_cmd = f"/opt/webhooks/apply_polled_quote_template_jobmaterials.py --uuid {row['form_response_uuid']} --apply --pretty"
try:
with closing(get_poll_conn()) as conn:
recent_runs = conn.execute(
"select id, mode, status, started_at, finished_at, desired_count, created_count, error from quote_template_apply_runs where form_response_uuid = ? order by id desc limit 8",
(row['form_response_uuid'],),
).fetchall()
recent_incidents = conn.execute(
"select id, detected_at, action, remote_count, remote_active_count, reason from quote_template_remote_existing_incidents where form_response_uuid = ? order by id desc limit 8",
(row['form_response_uuid'],),
).fetchall()
except sqlite3.Error:
recent_runs = []
recent_incidents = []
recent_run_rows = []
for run in recent_runs:
recent_run_rows.append(
f"