import json import os import sqlite3 from contextlib import closing from datetime import datetime from html import escape from urllib.parse import urlencode from fastapi import FastAPI, HTTPException, Query from fastapi.responses import HTMLResponse DB_PATH = os.getenv("WEBHOOK_DB_PATH", "./servicem8_webhooks.db") STATE_DB_PATH = os.getenv("WEBHOOK_STATE_DB_PATH", "./servicem8_quote_materials_state.db") POLL_DB_PATH = os.getenv("WEBHOOK_POLL_DB_PATH", "./servicem8_formresponse_poll.db") APP_HOST = os.getenv("WEBHOOK_INSPECTOR_HOST", "0.0.0.0") APP_PORT = int(os.getenv("WEBHOOK_INSPECTOR_PORT", "18355")) PAGE_SIZE = 50 app = FastAPI(title="ServiceM8 Inspector", version="0.1.0") def get_conn(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def get_state_conn(): conn = sqlite3.connect(STATE_DB_PATH) conn.row_factory = sqlite3.Row return conn def get_poll_conn(): conn = sqlite3.connect(POLL_DB_PATH) conn.row_factory = sqlite3.Row return conn def html_page(title: str, body: str) -> HTMLResponse: nav = """ """ css = """ """ return HTMLResponse(f"{escape(title)}{css}{nav}

{escape(title)}

{body}") def pretty_json(value): try: if isinstance(value, str): parsed = json.loads(value) else: parsed = value return json.dumps(parsed, indent=2, ensure_ascii=False) except Exception: return str(value) def html_value(value): if value is None: return "" if isinstance(value, (dict, list)): return pretty_json(value) return str(value) def parse_json_field(value, default=None): if value is None: return default try: return json.loads(value) except Exception: return default def event_summary(payload): data = payload.get("data", {}) if isinstance(payload, dict) else {} related = payload.get("related", {}) if isinstance(payload, dict) else {} company = related.get("company", {}) if isinstance(related, dict) else {} job_number = data.get("generated_job_id", "") purchase_order_number = data.get("purchase_order_number", "") return { "event_type": payload.get("type", "") if isinstance(payload, dict) else "", "uuid": data.get("uuid", ""), "generated_job_id": job_number, "job_number": job_number, "purchase_order_number": purchase_order_number, "status": data.get("status", ""), "edit_date": data.get("edit_date", ""), "company_name": company.get("name", ""), } def format_pills(values): if not values: return "" return " ".join(f"{escape(str(v))}" for v in values) def link_with_params(path, **params): filtered = {k: v for k, v in params.items() if v not in (None, "")} return f"{path}?{urlencode(filtered)}" if filtered else path def resolve_generated_job_id(job_uuid: str) -> str: job_uuid = str(job_uuid or "").strip() if not job_uuid: return "" try: with closing(get_poll_conn()) as conn: row = conn.execute( "select generated_job_id from job_metadata where job_uuid = ?", (job_uuid,), ).fetchone() if row and row["generated_job_id"]: return str(row["generated_job_id"]) except sqlite3.Error: pass try: with closing(get_conn()) as conn: row = conn.execute( """ with jobs as ( select json_extract(payload_json, '$.data.uuid') as job_uuid, json_extract(payload_json, '$.data.generated_job_id') as generated_job_id, received_at from webhook_events where json_extract(payload_json, '$.data.generated_job_id') is not null union all select json_extract(payload_json, '$.related.job.uuid') as job_uuid, json_extract(payload_json, '$.related.job.generated_job_id') as generated_job_id, received_at from webhook_form_responses where json_extract(payload_json, '$.related.job.generated_job_id') is not null ) select generated_job_id from jobs where job_uuid = ? and generated_job_id is not null order by received_at desc limit 1 """, (job_uuid,), ).fetchone() if row and row["generated_job_id"]: return str(row["generated_job_id"]) except sqlite3.Error: pass return "" def job_id_html(job_uuid: str) -> str: generated_job_id = resolve_generated_job_id(job_uuid) return f"{escape(generated_job_id)}" if generated_job_id else "" @app.get("/health") def health(): return {"ok": True, "db_path": DB_PATH, "state_db_path": STATE_DB_PATH, "poll_db_path": POLL_DB_PATH} @app.get("/", response_class=HTMLResponse) def dashboard(): with closing(get_conn()) as conn: counts = { "webhook_events": conn.execute("select count(*) from webhook_events").fetchone()[0], "webhook_objects": conn.execute("select count(*) from webhook_objects").fetchone()[0], "webhook_form_responses": conn.execute("select count(*) from webhook_form_responses").fetchone()[0], } latest_event = conn.execute("select received_at from webhook_events order by id desc limit 1").fetchone() latest_object = conn.execute("select received_at from webhook_objects order by id desc limit 1").fetchone() latest_form = conn.execute("select received_at from webhook_form_responses order by id desc limit 1").fetchone() state_count = 0 latest_generated = None try: with closing(get_state_conn()) as conn: state_count = conn.execute("select count(*) from generated_job_materials").fetchone()[0] latest_generated = conn.execute("select updated_at from generated_job_materials order by id desc limit 1").fetchone() except sqlite3.Error: pass poll_counts = { "poll_runs": 0, "polled_form_responses": 0, "polled_quote_templates": 0, "remote_existing_incidents": 0, } latest_poll_run = None latest_polled_form = None latest_polled_quote = None latest_remote_existing_incident = None try: with closing(get_poll_conn()) as conn: poll_counts["poll_runs"] = conn.execute("select count(*) from poll_runs").fetchone()[0] poll_counts["polled_form_responses"] = conn.execute("select count(*) from form_responses_raw").fetchone()[0] poll_counts["polled_quote_templates"] = conn.execute("select count(*) from quote_template_form_responses").fetchone()[0] try: poll_counts["remote_existing_incidents"] = conn.execute("select count(*) from quote_template_remote_existing_incidents").fetchone()[0] latest_remote_existing_incident = conn.execute("select detected_at from quote_template_remote_existing_incidents order by id desc limit 1").fetchone() except sqlite3.Error: pass latest_poll_run = conn.execute("select finished_at from poll_runs order by id desc limit 1").fetchone() latest_polled_form = conn.execute("select timestamp from form_responses_raw order by timestamp desc limit 1").fetchone() latest_polled_quote = conn.execute("select discovered_at from quote_template_form_responses order by discovered_at desc limit 1").fetchone() except sqlite3.Error: pass counts["generated_job_materials"] = state_count counts.update(poll_counts) cards = "".join( f"
{escape(name)}
{count}
" for name, count in counts.items() ) body = f"""
{cards}
DB path
{escape(DB_PATH)}
State DB path
{escape(STATE_DB_PATH)}
Poll DB path
{escape(POLL_DB_PATH)}
Latest event
{escape(latest_event[0] if latest_event else '—')}
Latest object
{escape(latest_object[0] if latest_object else '—')}
Latest form response
{escape(latest_form[0] if latest_form else '—')}
Latest generated material row
{escape(latest_generated[0] if latest_generated else '—')}
Latest poll run
{escape(latest_poll_run[0] if latest_poll_run else '—')}
Latest polled form timestamp
{escape(latest_polled_form[0] if latest_polled_form else '—')}
Latest polled quote discovered
{escape(latest_polled_quote[0] if latest_polled_quote else '—')}
Latest remote-existing incident
{escape(latest_remote_existing_incident[0] if latest_remote_existing_incident else '—')}

Quick links

""" return html_page("ServiceM8 Inspector", body) @app.get("/events", response_class=HTMLResponse) def list_events(q: str = Query(""), page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE with closing(get_conn()) as conn: rows = conn.execute( "select id, received_at, payload_json, path from webhook_events order by id desc limit ? offset ?", (PAGE_SIZE, offset), ).fetchall() filtered = [] q_lower = q.lower().strip() q_digits = "".join(ch for ch in q if ch.isdigit()) for row in rows: payload = parse_json_field(row["payload_json"], {}) or {} summary = event_summary(payload) job_number = str(summary.get("job_number", "") or "") purchase_order_number = str(summary.get("purchase_order_number", "") or "") haystack = " ".join(str(v) for v in summary.values()).lower() + " " + str(payload).lower() job_match = bool(q_digits) and (q_digits in job_number or q_digits in purchase_order_number) if q_lower and not job_match and q_lower not in haystack: continue filtered.append((row, payload, summary)) table_rows = [] for row, payload, summary in filtered: table_rows.append( f"" f"{row['id']}" f"{escape(row['received_at'])}" f"{escape(summary['event_type'])}" f"{escape(summary['generated_job_id'])}" f"{escape(summary['uuid'])}" f"{escape(summary['status'])}" f"{escape(summary['company_name'])}" f"{escape(row['path'])}" f"" ) body = f"""
Search supports job number / generated job ID, UUID, status, company, or event type. Showing up to {PAGE_SIZE} latest rows, then filtering in-app.
{''.join(table_rows) or ""}
IDReceivedTypeJob IDUUIDStatusCompanyPath
No rows matched.
""" return html_page("Webhook events", body) @app.get("/events/{event_id}", response_class=HTMLResponse) def event_detail(event_id: int): with closing(get_conn()) as conn: row = conn.execute("select * from webhook_events where id = ?", (event_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Event not found") payload = parse_json_field(row["payload_json"], {}) or {} headers = parse_json_field(row["headers_json"], {}) or {} summary = event_summary(payload) related = payload.get("related", {}) if isinstance(payload, dict) else {} company = related.get("company", {}) if isinstance(related, dict) else {} contacts = related.get("jobContacts", []) if isinstance(related, dict) else [] materials = related.get("jobMaterials", []) if isinstance(related, dict) else [] body = f"""
ID
{row['id']}
Received
{escape(row['received_at'])}
Event type
{escape(summary['event_type'])}
UUID
{escape(summary['uuid'])}
Generated job ID
{escape(summary['generated_job_id'])}
Status
{escape(summary['status'])}
Company
{escape(summary['company_name'])}
Path
{escape(row['path'])}

Related summary

Company name
{escape(company.get('name', ''))}
Contacts
{len(contacts)}
Materials
{len(materials)}

Headers

{escape(pretty_json(headers))}

Payload

{escape(pretty_json(payload))}
""" return html_page(f"Event {event_id}", body) @app.get("/objects", response_class=HTMLResponse) def list_objects(page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE with closing(get_conn()) as conn: rows = conn.execute( "select id, received_at, object_type, object_uuid, changed_fields_json, object_time_utc, resource_url from webhook_objects order by id desc limit ? offset ?", (PAGE_SIZE, offset), ).fetchall() table_rows = [] for row in rows: changed = parse_json_field(row["changed_fields_json"], []) or [] table_rows.append( f"" f"{row['id']}" f"{escape(row['received_at'])}" f"{escape(row['object_type'] or '')}" f"{escape(row['object_uuid'] or '')}" f"{format_pills(changed[:8])}" f"{escape(row['object_time_utc'] or '')}" f"{escape(row['resource_url'] or '')}" f"" ) body = f""" {''.join(table_rows) or ""}
IDReceivedObjectUUIDChanged fieldsObject timeResource URL
No rows found.
""" return html_page("Object webhooks", body) @app.get("/objects/{object_id}", response_class=HTMLResponse) def object_detail(object_id: int): with closing(get_conn()) as conn: row = conn.execute("select * from webhook_objects where id = ?", (object_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Object row not found") payload = parse_json_field(row["payload_json"], {}) or {} headers = parse_json_field(row["headers_json"], {}) or {} changed = parse_json_field(row["changed_fields_json"], []) or [] body = f"""
ID
{row['id']}
Received
{escape(row['received_at'])}
Object type
{escape(row['object_type'] or '')}
Object UUID
{escape(row['object_uuid'] or '')}
Object time UTC
{escape(row['object_time_utc'] or '')}
Resource URL
{escape(row['resource_url'] or '')}
Changed fields
{format_pills(changed)}

Headers

{escape(pretty_json(headers))}

Payload

{escape(pretty_json(payload))}
""" return html_page(f"Object webhook {object_id}", body) @app.get("/form-responses", response_class=HTMLResponse) def list_form_responses(page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE with closing(get_conn()) as conn: rows = conn.execute( "select id, received_at, payload_json from webhook_form_responses order by id desc limit ? offset ?", (PAGE_SIZE, offset), ).fetchall() table_rows = [] for row in rows: payload = parse_json_field(row["payload_json"], {}) or {} data = payload.get("data", {}) if isinstance(payload, dict) else {} related = payload.get("related", {}) if isinstance(payload, dict) else {} job = related.get("job", {}) if isinstance(related, dict) else {} table_rows.append( f"" f"{row['id']}" f"{escape(row['received_at'])}" f"{escape(data.get('form_uuid', ''))}" f"{escape(data.get('regarding_object', ''))}" f"{escape(data.get('regarding_object_uuid', ''))}" f"{escape(job.get('generated_job_id', ''))}" f"{escape(job.get('status', ''))}" f"" ) body = f""" {''.join(table_rows) or ""}
IDReceivedForm UUIDRegardingObject UUIDJob IDJob status
No rows found.
""" return html_page("Form responses", body) @app.get("/generated-materials", response_class=HTMLResponse) def list_generated_materials(page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE try: with closing(get_state_conn()) as conn: rows = conn.execute( "select id, job_uuid, form_response_uuid, job_material_uuid, kind, source_question, source_text, updated_at from generated_job_materials order by id desc limit ? offset ?", (PAGE_SIZE, offset), ).fetchall() except sqlite3.Error as e: return html_page("Generated materials", f"
State DB unavailable: {escape(str(e))}
") table_rows = [] for row in rows: table_rows.append( f"" f"{row['id']}" f"{job_id_html(row['job_uuid'])}" f"{escape(row['job_uuid'] or '')}" f"{escape(row['form_response_uuid'] or '')}" f"{escape(row['job_material_uuid'] or '')}" f"{escape(row['kind'] or '')}" f"{escape(row['source_question'] or '')}" f"{escape(row['updated_at'] or '')}" f"" ) body = f""" {''.join(table_rows) or ""}
IDJob IDJob UUIDForm response UUIDJob material UUIDKindSource questionUpdated
No rows found.
""" return html_page("Generated materials", body) @app.get("/generated-materials/{row_id}", response_class=HTMLResponse) def generated_material_detail(row_id: int): try: with closing(get_state_conn()) as conn: row = conn.execute("select * from generated_job_materials where id = ?", (row_id,)).fetchone() except sqlite3.Error as e: return html_page("Generated material detail", f"
State DB unavailable: {escape(str(e))}
") if not row: raise HTTPException(status_code=404, detail="Generated material row not found") body = f"""
ID
{row['id']}
Job ID
{job_id_html(row['job_uuid'])}
Job UUID
{escape(row['job_uuid'] or '')}
Form response UUID
{escape(row['form_response_uuid'] or '')}
Job material UUID
{escape(row['job_material_uuid'] or '')}
Kind
{escape(row['kind'] or '')}
Source field UUID
{escape(row['source_field_uuid'] or '')}
Source question
{escape(row['source_question'] or '')}
Source text
{escape(row['source_text'] or '')}
Created
{escape(row['created_at'] or '')}
Updated
{escape(row['updated_at'] or '')}
""" return html_page(f"Generated material {row_id}", body) @app.get("/form-responses/{row_id}", response_class=HTMLResponse) def form_response_detail(row_id: int): with closing(get_conn()) as conn: row = conn.execute("select * from webhook_form_responses where id = ?", (row_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="Form response not found") payload = parse_json_field(row["payload_json"], {}) or {} headers = parse_json_field(row["headers_json"], {}) or {} data = payload.get("data", {}) if isinstance(payload, dict) else {} related = payload.get("related", {}) if isinstance(payload, dict) else {} job = related.get("job", {}) if isinstance(related, dict) else {} field_data_raw = data.get("field_data", "[]") field_data = parse_json_field(field_data_raw, []) or [] field_rows = [] for item in sorted(field_data, key=lambda x: x.get("SortOrder", 0)): field_rows.append( f"" f"{escape(html_value(item.get('SortOrder')))}" f"{escape(html_value(item.get('FieldType')))}" f"{escape(html_value(item.get('Question')))}" f"{escape(html_value(item.get('Response')))}" f"" ) body = f"""
ID
{row['id']}
Received
{escape(row['received_at'])}
Form UUID
{escape(data.get('form_uuid', ''))}
Regarding object
{escape(data.get('regarding_object', ''))}
Regarding UUID
{escape(data.get('regarding_object_uuid', ''))}
Timestamp
{escape(data.get('timestamp', ''))}
Job ID
{escape(job.get('generated_job_id', ''))}
Job status
{escape(job.get('status', ''))}

Decoded field data

{''.join(field_rows) or ""}
OrderTypeQuestionResponse
No decoded field data.

Related job

{escape(pretty_json(job))}

Headers

{escape(pretty_json(headers))}

Payload

{escape(pretty_json(payload))}
""" return html_page(f"Form response {row_id}", body) @app.get("/poll/remote-existing-incidents", response_class=HTMLResponse) def list_remote_existing_incidents(page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE try: with closing(get_poll_conn()) as conn: rows = conn.execute( """ select id, detected_at, form_response_uuid, job_uuid, apply_run_id, desired_count, remote_count, remote_active_count, action, reason from quote_template_remote_existing_incidents order by id desc limit ? offset ? """, (PAGE_SIZE, offset), ).fetchall() except sqlite3.Error as e: return html_page("Remote existing incidents", f"
Incident table unavailable: {escape(str(e))}
") table_rows = [] for row in rows: run_link = f"{row['apply_run_id']}" if row['apply_run_id'] else "" table_rows.append( f"{row['id']}" f"{escape(row['detected_at'] or '')}{escape(row['action'] or '')}" f"{escape(row['form_response_uuid'])}" f"{job_id_html(row['job_uuid'])}{escape(row['job_uuid'] or '')}{run_link}" f"{row['desired_count']}{row['remote_count']}{row['remote_active_count']}" f"{escape((row['reason'] or '')[:180])}" ) body = f""" {''.join(table_rows) or ""}
IDDetectedActionForm response UUIDJob IDJob UUIDApply runDesiredRemote rowsActiveReason
No remote-existing incidents found.
""" return html_page("Remote existing incidents", body) @app.get("/poll/remote-existing-incidents/{incident_id}", response_class=HTMLResponse) def remote_existing_incident_detail(incident_id: int): try: with closing(get_poll_conn()) as conn: row = conn.execute("select * from quote_template_remote_existing_incidents where id = ?", (incident_id,)).fetchone() except sqlite3.Error as e: return html_page("Remote existing incident", f"
Incident table unavailable: {escape(str(e))}
") if not row: raise HTTPException(status_code=404, detail="Remote existing incident not found") remote_rows = parse_json_field(row["remote_rows_json"], []) or [] material_rows = [] if isinstance(remote_rows, list): for item in remote_rows: if isinstance(item, dict): material_rows.append( f"{escape(str(item.get('uuid', '')))}{escape(str(item.get('active', '')))}" f"{escape(str(item.get('name', '')))}{escape(str(item.get('material_uuid', '')))}" f"{escape(str(item.get('quantity', '')))}{escape(str(item.get('price', '')))}" f"{escape(str(item.get('sort_order', '')))}" ) run_link = f"{row['apply_run_id']}" if row['apply_run_id'] else "" body = f"""
ID
{row['id']}
Detected
{escape(row['detected_at'] or '')}
Action
{escape(row['action'] or '')}
Form response UUID
{escape(row['form_response_uuid'])}
Job ID
{job_id_html(row['job_uuid'])}
Job UUID
{escape(row['job_uuid'] or '')}
Apply run
{run_link}
Desired rows
{row['desired_count']}
Remote rows
{row['remote_count']}
Remote active rows
{row['remote_active_count']}
Reason
{escape(row['reason'] or '')}

Remote jobMaterial rows

{''.join(material_rows) or ""}
UUIDActiveNameMaterial UUIDQtyPriceSort
No remote rows captured.

Raw remote rows JSON

{escape(pretty_json(remote_rows))}
""" return html_page(f"Remote existing incident {incident_id}", body) @app.get("/poll/apply-runs", response_class=HTMLResponse) def list_apply_runs(page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE try: with closing(get_poll_conn()) as conn: rows = conn.execute( """ select id, form_response_uuid, job_uuid, mode, started_at, finished_at, desired_count, created_count, status, error from quote_template_apply_runs order by id desc limit ? offset ? """, (PAGE_SIZE, offset), ).fetchall() except sqlite3.Error as e: return html_page("Apply runs", f"
Apply-run table unavailable: {escape(str(e))}
") table_rows = [] for row in rows: table_rows.append( f"{row['id']}" f"{escape(row['mode'] or '')}{escape(row['status'] or '')}" f"{escape(row['form_response_uuid'])}" f"{job_id_html(row['job_uuid'])}{escape(row['job_uuid'] or '')}{escape(row['started_at'] or '')}{escape(row['finished_at'] or '')}" f"{row['desired_count']}{row['created_count']}{escape((row['error'] or '')[:160])}" ) body = f""" {''.join(table_rows) or ""}
IDModeStatusForm response UUIDJob IDJob UUIDStartedFinishedDesiredCreatedError
No apply runs found yet.
""" return html_page("Apply runs", body) @app.get("/poll/apply-runs/{run_id}", response_class=HTMLResponse) def apply_run_detail(run_id: int): try: with closing(get_poll_conn()) as conn: run = conn.execute("select * from quote_template_apply_runs where id = ?", (run_id,)).fetchone() rows = conn.execute( "select * from quote_template_apply_run_rows where run_id = ? order by row_index asc", (run_id,), ).fetchall() incidents = conn.execute( "select id, detected_at, action, remote_count, remote_active_count, reason from quote_template_remote_existing_incidents where apply_run_id = ? order by id desc", (run_id,), ).fetchall() except sqlite3.Error as e: return html_page("Apply run", f"
Apply-run table unavailable: {escape(str(e))}
") if not run: raise HTTPException(status_code=404, detail="Apply run not found") incident_rows = [] for incident in incidents: incident_rows.append( f"{incident['id']}" f"{escape(incident['detected_at'] or '')}{escape(incident['action'] or '')}" f"{incident['remote_count']}{incident['remote_active_count']}{escape((incident['reason'] or '')[:180])}" ) table_rows = [] for row in rows: payload = parse_json_field(row['api_payload_json'], {}) or {} table_rows.append( f"{row['row_index']}{escape(row['action'] or '')}{escape(row['kind'] or '')}" f"{escape(row['name'] or '')}{escape(row['job_material_uuid'] or '')}" f"{escape(row['source_question'] or '')}{escape(row['error'] or '')}" f"
API payload
{escape(pretty_json(payload))}
" ) body = f"""
Run ID
{run['id']}
Mode
{escape(run['mode'] or '')}
Status
{escape(run['status'] or '')}
Form response UUID
{escape(run['form_response_uuid'])}
Job ID
{job_id_html(run['job_uuid'])}
Job UUID
{escape(run['job_uuid'] or '')}
Started
{escape(run['started_at'] or '')}
Finished
{escape(run['finished_at'] or '')}
Desired
{run['desired_count']}
Created
{run['created_count']}
Error
{escape(run['error'] or '')}

Remote-existing incidents

{''.join(incident_rows) or ""}
IDDetectedActionRemote rowsActiveReason
No remote-existing incidents for this run.

Rows

{''.join(table_rows) or ""}
#ActionKindNameCreated UUIDSource questionError
No rows recorded.
""" return html_page(f"Apply run {run_id}", body) @app.get("/poll/runs", response_class=HTMLResponse) def list_poll_runs(page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE try: with closing(get_poll_conn()) as conn: rows = conn.execute( """ select id, started_at, finished_at, since_value, filter_field, fetched_count, inserted_count, updated_count, quote_match_count, newly_queued_count, error from poll_runs order by id desc limit ? offset ? """, (PAGE_SIZE, offset), ).fetchall() except sqlite3.Error as e: return html_page("Poll runs", f"
Poll DB unavailable: {escape(str(e))}
") table_rows = [] for row in rows: table_rows.append( f"{row['id']}{escape(row['started_at'] or '')}" f"{escape(row['finished_at'] or '')}" f"{escape(row['filter_field'] or '')} gt {escape(row['since_value'] or '')}" f"{row['fetched_count']}{row['inserted_count']}{row['updated_count']}" f"{row['quote_match_count']}{row['newly_queued_count']}" f"{escape((row['error'] or '')[:180])}" ) body = f""" {''.join(table_rows) or ""}
IDStartedFinishedFilterFetchedInsertedUpdatedQuote matchesQueuedError
No poll runs found.
""" return html_page("Poll runs", body) @app.get("/poll/form-responses", response_class=HTMLResponse) def list_polled_form_responses(q: str = Query(""), quote_only: int = Query(0), page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE clauses = [] params = [] if quote_only: clauses.append("is_quote_template = 1") if q.strip(): like = f"%{q.strip()}%" clauses.append("(uuid like ? or form_uuid like ? or regarding_object_uuid like ? or timestamp like ? or edit_date like ? or raw_json like ?)") params.extend([like, like, like, like, like, like]) where = " where " + " and ".join(clauses) if clauses else "" try: with closing(get_poll_conn()) as conn: rows = conn.execute( f""" select uuid, first_seen_at, last_seen_at, seen_count, timestamp, edit_date, form_uuid, regarding_object, regarding_object_uuid, is_quote_template, parse_status, parse_error from form_responses_raw {where} order by timestamp desc, edit_date desc limit ? offset ? """, (*params, PAGE_SIZE, offset), ).fetchall() except sqlite3.Error as e: return html_page("Polled form responses", f"
Poll DB unavailable: {escape(str(e))}
") table_rows = [] for row in rows: quote_pill = "Quote Template" if row["is_quote_template"] else "" table_rows.append( f"{escape(row['uuid'])}" f"{escape(row['timestamp'] or '')}{escape(row['edit_date'] or '')}" f"{escape(row['form_uuid'] or '')}
{quote_pill}" f"{escape(row['regarding_object'] or '')}{job_id_html(row['regarding_object_uuid'])}{escape(row['regarding_object_uuid'] or '')}" f"{escape(row['parse_status'] or '')}{row['seen_count']}{escape(row['last_seen_at'] or '')}" ) body = f"""
{''.join(table_rows) or ""}
UUIDTimestampEdit dateForm UUIDRegardingJob IDObject UUIDParseSeenLast seen
No rows found.
""" return html_page("Polled form responses", body) @app.get("/poll/form-responses/{form_response_uuid}", response_class=HTMLResponse) def polled_form_response_detail(form_response_uuid: str): try: with closing(get_poll_conn()) as conn: row = conn.execute("select * from form_responses_raw where uuid = ?", (form_response_uuid,)).fetchone() quote = conn.execute("select * from quote_template_form_responses where form_response_uuid = ?", (form_response_uuid,)).fetchone() except sqlite3.Error as e: return html_page("Polled form response", f"
Poll DB unavailable: {escape(str(e))}
") if not row: raise HTTPException(status_code=404, detail="Polled form response not found") raw = parse_json_field(row["raw_json"], {}) or {} field_data = parse_json_field(raw.get("field_data"), []) or [] field_rows = [] if isinstance(field_data, list): for item in sorted(field_data, key=lambda x: x.get("SortOrder", 0) if isinstance(x, dict) else 0): if isinstance(item, dict): field_rows.append( f"{escape(html_value(item.get('SortOrder')))}{escape(html_value(item.get('FieldType')))}" f"{escape(html_value(item.get('Question')))}{escape(html_value(item.get('Response')))}" ) quote_link = f"
Parsed quote
Open parsed Quote Template view
" if quote else "" body = f"""
UUID
{escape(row['uuid'])}
Timestamp
{escape(row['timestamp'] or '')}
Edit date
{escape(row['edit_date'] or '')}
Form UUID
{escape(row['form_uuid'] or '')}
Regarding object
{escape(row['regarding_object'] or '')}
Job ID
{job_id_html(row['regarding_object_uuid'])}
Regarding UUID
{escape(row['regarding_object_uuid'] or '')}
First seen
{escape(row['first_seen_at'] or '')}
Last seen
{escape(row['last_seen_at'] or '')}
Seen count
{row['seen_count']}
Parse status
{escape(row['parse_status'] or '')}
Parse error
{escape(row['parse_error'] or '')}
{quote_link}

Decoded field data

{''.join(field_rows) or ""}
OrderTypeQuestionResponse
No decoded field data.

Raw API row

{escape(pretty_json(raw))}
""" return html_page(f"Polled form response {form_response_uuid}", body) @app.get("/poll/quote-template", response_class=HTMLResponse) def list_polled_quote_templates(page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE try: with closing(get_poll_conn()) as conn: rows = conn.execute( """ select form_response_uuid, discovered_at, job_uuid, form_uuid, author_name, description, desired_job_materials_json, queued_at, processed_at, process_status, process_error from quote_template_form_responses order by discovered_at desc, form_response_uuid desc limit ? offset ? """, (PAGE_SIZE, offset), ).fetchall() except sqlite3.Error as e: return html_page("Polled Quote Template responses", f"
Poll DB unavailable: {escape(str(e))}
") table_rows = [] for row in rows: try: material_count = len(json.loads(row["desired_job_materials_json"] or "[]")) except Exception: material_count = "?" table_rows.append( f"{escape(row['form_response_uuid'])}" f"{escape(row['discovered_at'] or '')}{job_id_html(row['job_uuid'])}{escape(row['job_uuid'] or '')}" f"{escape(row['description'] or '')}{material_count}" f"{escape(row['queued_at'] or '')}{escape(row['process_status'] or '')}" ) body = f""" {''.join(table_rows) or ""}
Form response UUIDDiscoveredJob IDJob UUIDDescriptionRowsQueuedStatus
No quote template rows found.
""" return html_page("Polled Quote Template responses", body) @app.get("/poll/quote-template/{form_response_uuid}", response_class=HTMLResponse) def polled_quote_template_detail(form_response_uuid: str): try: with closing(get_poll_conn()) as conn: row = conn.execute("select * from quote_template_form_responses where form_response_uuid = ?", (form_response_uuid,)).fetchone() except sqlite3.Error as e: return html_page("Polled Quote Template response", f"
Poll DB unavailable: {escape(str(e))}
") if not row: raise HTTPException(status_code=404, detail="Polled quote template response not found") parsed = parse_json_field(row["parsed_json"], {}) or {} materials = parse_json_field(row["desired_job_materials_json"], []) or [] material_rows = [] for item in materials: material_rows.append( f"{escape(str(item.get('sort_order', '')))}{escape(item.get('kind', ''))}" f"{escape(item.get('name', ''))}{escape(item.get('material_uuid', ''))}" f"{escape(item.get('source_question', ''))}" ) dry_run_cmd = f"/opt/webhooks/apply_polled_quote_template_jobmaterials.py --uuid {row['form_response_uuid']} --pretty" apply_cmd = f"/opt/webhooks/apply_polled_quote_template_jobmaterials.py --uuid {row['form_response_uuid']} --apply --pretty" try: with closing(get_poll_conn()) as conn: recent_runs = conn.execute( "select id, mode, status, started_at, finished_at, desired_count, created_count, error from quote_template_apply_runs where form_response_uuid = ? order by id desc limit 8", (row['form_response_uuid'],), ).fetchall() recent_incidents = conn.execute( "select id, detected_at, action, remote_count, remote_active_count, reason from quote_template_remote_existing_incidents where form_response_uuid = ? order by id desc limit 8", (row['form_response_uuid'],), ).fetchall() except sqlite3.Error: recent_runs = [] recent_incidents = [] recent_run_rows = [] for run in recent_runs: recent_run_rows.append( f"{run['id']}{escape(run['mode'] or '')}{escape(run['status'] or '')}{escape(run['started_at'] or '')}{escape(run['finished_at'] or '')}{run['desired_count']}{run['created_count']}{escape((run['error'] or '')[:120])}" ) recent_incident_rows = [] for incident in recent_incidents: recent_incident_rows.append( f"{incident['id']}{escape(incident['detected_at'] or '')}{escape(incident['action'] or '')}{incident['remote_count']}{incident['remote_active_count']}{escape((incident['reason'] or '')[:120])}" ) body = f"""
Form response UUID
{escape(row['form_response_uuid'])}
Discovered
{escape(row['discovered_at'] or '')}
Job ID
{job_id_html(row['job_uuid'])}
Job UUID
{escape(row['job_uuid'] or '')}
Form UUID
{escape(row['form_uuid'] or '')}
Author
{escape(row['author_name'] or '')}
Description
{escape(row['description'] or '')}
Queued
{escape(row['queued_at'] or '')}
Processed
{escape(row['processed_at'] or '')}
Status
{escape(row['process_status'] or '')}
Error
{escape(row['process_error'] or '')}
Raw polled response
Open raw polled form response

Selective apply commands

Dry-run first:

{escape(dry_run_cmd)}

Apply to ServiceM8 only after checking the dry-run:

{escape(apply_cmd)}

Recent dry-run/apply runs for this response

{''.join(recent_run_rows) or ""}
IDModeStatusStartedFinishedDesiredCreatedError
No dry-run/apply runs yet.

Remote-existing incidents for this response

{''.join(recent_incident_rows) or ""}
IDDetectedActionRemote rowsActiveReason
No remote-existing incidents yet.

Desired jobMaterial rows

{''.join(material_rows) or ""}
SortKindNameMaterial UUIDSource question
No desired jobMaterial rows.

Parsed JSON

{escape(pretty_json(parsed))}
""" return html_page(f"Polled Quote Template {form_response_uuid}", body) if __name__ == "__main__": import uvicorn uvicorn.run("servicem8_inspector:app", host=APP_HOST, port=APP_PORT, reload=False)