Compare commits

..

4 Commits

5 changed files with 1065 additions and 4 deletions
+154
View File
@@ -0,0 +1,154 @@
#!/usr/bin/env python3
import argparse
import json
import sqlite3
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
DB_PATH = Path(__file__).with_name("servicem8_webhooks.db")
STATE_PATH = Path(__file__).with_name(".quote_form_response_watch_state.json")
QUOTE_TEMPLATE_FORM_UUID = "3621b6be-1d19-4756-9ab4-9d5e4120f6d9"
def load_state(path: Path) -> Dict[str, Any]:
if not path.exists():
return {}
try:
return json.loads(path.read_text())
except Exception:
return {}
def save_state(path: Path, state: Dict[str, Any]) -> None:
path.write_text(json.dumps(state, indent=2, ensure_ascii=False) + "\n")
def get_conn(db_path: Path) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
def query_latest_quote_response(db_path: Path) -> Optional[Dict[str, Any]]:
sql = """
SELECT
id,
received_at,
json_extract(payload_json, '$.id') AS webhook_event_id,
json_extract(payload_json, '$.type') AS event_type,
json_extract(payload_json, '$.data.uuid') AS form_response_uuid,
json_extract(payload_json, '$.data.form_uuid') AS form_uuid,
json_extract(payload_json, '$.data.regarding_object_uuid') AS job_uuid,
json_extract(payload_json, '$.data.edit_date') AS edit_date,
json_extract(payload_json, '$.data.created_by_staff_uuid') AS created_by_staff_uuid,
json_extract(payload_json, '$.data.status') AS status,
payload_json
FROM webhook_form_responses
WHERE json_extract(payload_json, '$.data.form_uuid') = ?
ORDER BY datetime(received_at) DESC, id DESC
LIMIT 1
"""
with get_conn(db_path) as conn:
row = conn.execute(sql, (QUOTE_TEMPLATE_FORM_UUID,)).fetchone()
return dict(row) if row else None
def build_summary(row: Dict[str, Any]) -> str:
received_at = row.get("received_at") or "unknown"
edit_date = row.get("edit_date") or "unknown"
job_uuid = row.get("job_uuid") or "unknown"
form_response_uuid = row.get("form_response_uuid") or "unknown"
return (
"New quote template form response received\n"
f"- Received: {received_at}\n"
f"- Edit date: {edit_date}\n"
f"- Job UUID: {job_uuid}\n"
f"- Form response UUID: {form_response_uuid}"
)
def main() -> int:
parser = argparse.ArgumentParser(description="Check for new quote template form responses in the ServiceM8 webhook DB")
parser.add_argument("--db", default=str(DB_PATH), help="Path to servicem8_webhooks.db")
parser.add_argument("--state", default=str(STATE_PATH), help="Path to local state file")
parser.add_argument("--prime", action="store_true", help="Record the current latest event as seen without alerting")
parser.add_argument("--json", action="store_true", help="Print machine-readable JSON")
args = parser.parse_args()
db_path = Path(args.db)
state_path = Path(args.state)
if not db_path.exists():
print(f"Database not found: {db_path}", file=sys.stderr)
return 2
latest = query_latest_quote_response(db_path)
state = load_state(state_path)
if not latest:
result = {
"status": "no_quote_responses_found",
"alert": False,
"db_path": str(db_path),
"state_path": str(state_path),
}
print(json.dumps(result, indent=2) if args.json else "No quote template form responses found in DB")
return 0
current_marker = {
"id": latest.get("id"),
"received_at": latest.get("received_at"),
"form_response_uuid": latest.get("form_response_uuid"),
"job_uuid": latest.get("job_uuid"),
"edit_date": latest.get("edit_date"),
"checked_at": datetime.now().astimezone().isoformat(),
}
if args.prime or not state:
save_state(state_path, current_marker)
result = {
"status": "primed",
"alert": False,
"latest": current_marker,
"db_path": str(db_path),
"state_path": str(state_path),
}
print(json.dumps(result, indent=2) if args.json else f"Primed latest quote template event at {current_marker['received_at']}")
return 0
last_seen_id = state.get("id")
last_seen_form_response_uuid = state.get("form_response_uuid")
is_new = (latest.get("id") != last_seen_id) or (
latest.get("form_response_uuid") and latest.get("form_response_uuid") != last_seen_form_response_uuid
)
if is_new:
save_state(state_path, current_marker)
result = {
"status": "new_quote_response",
"alert": True,
"summary": build_summary(latest),
"latest": current_marker,
"previous": state,
"db_path": str(db_path),
"state_path": str(state_path),
}
print(json.dumps(result, indent=2) if args.json else result["summary"])
return 0
result = {
"status": "no_new_quote_response",
"alert": False,
"latest": current_marker,
"previous": state,
"db_path": str(db_path),
"state_path": str(state_path),
}
print(json.dumps(result, indent=2) if args.json else f"No new quote template form response since {state.get('received_at')}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+521
View File
@@ -0,0 +1,521 @@
#!/usr/bin/env python3
"""Poll ServiceM8 /formresponse.json for responses newer than a timestamp.
This is the safety-net companion to the ServiceM8 form.response_created webhook.
It stores every API hit in a local SQLite DB, then identifies previously unseen
Quote Template form responses and parses them into desired jobMaterial rows using
servicem8_quote_template_parser.py. It does not write anything back to ServiceM8.
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import os
import sqlite3
import sys
from contextlib import closing
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
import requests
from servicem8_quote_template_parser import (
QUOTE_TEMPLATE_FORM_UUID as PARSER_QUOTE_TEMPLATE_FORM_UUID,
parse_quote_template_form_response,
)
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_DB_PATH = SCRIPT_DIR / "servicem8_formresponse_poll.db"
DEFAULT_QUEUE_PATH = SCRIPT_DIR / "quote-template-jobmaterials-poll-queue.jsonl"
DEFAULT_BASE_URL = "https://api.servicem8.com/api_1.0"
DEFAULT_ENDPOINT = "/formresponse.json"
DEFAULT_TIMEOUT_SECONDS = 30
# Current Quote Template form UUID. Kept as a CLI default so we can adjust without
# editing code if ServiceM8 form/template IDs change again.
DEFAULT_QUOTE_TEMPLATE_FORM_UUID = os.getenv(
"SERVICEM8_QUOTE_TEMPLATE_FORM_UUID",
PARSER_QUOTE_TEMPLATE_FORM_UUID,
)
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def load_api_key() -> str:
"""Load API key from env, falling back to the existing local helper script.
Prefer env vars for any new deployment. The fallback keeps this project script
runnable in the current /opt/webhooks setup without copying credentials.
"""
for name in ("SERVICEM8_ACCESS_TOKEN", "SERVICEM8_API_KEY"):
value = os.getenv(name)
if value:
return value
fallback = SCRIPT_DIR / "servicem8-list-webhook-subscriptions-table.py"
if fallback.exists():
spec = importlib.util.spec_from_file_location("servicem8_subs", fallback)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) # type: ignore[union-attr]
value = getattr(module, "ACCESS_TOKEN", None)
if value:
return str(value)
raise RuntimeError("Missing SERVICEM8_ACCESS_TOKEN or SERVICEM8_API_KEY")
def get_conn(db_path: Path) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
def init_db(db_path: Path) -> None:
db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(get_conn(db_path)) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS poll_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at TEXT NOT NULL,
finished_at TEXT,
since_value TEXT NOT NULL,
filter_field TEXT NOT NULL,
filter_expression TEXT NOT NULL,
http_status INTEGER,
fetched_count INTEGER DEFAULT 0,
inserted_count INTEGER DEFAULT 0,
updated_count INTEGER DEFAULT 0,
quote_match_count INTEGER DEFAULT 0,
newly_queued_count INTEGER DEFAULT 0,
error TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS form_responses_raw (
uuid TEXT PRIMARY KEY,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
seen_count INTEGER NOT NULL DEFAULT 1,
active INTEGER,
edit_date TEXT,
form_uuid TEXT,
staff_uuid TEXT,
regarding_object TEXT,
regarding_object_uuid TEXT,
timestamp TEXT,
form_by_staff_uuid TEXT,
document_attachment_uuid TEXT,
asset_uuid TEXT,
is_quote_template INTEGER NOT NULL DEFAULT 0,
parse_status TEXT,
parse_error TEXT,
raw_json TEXT NOT NULL
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_timestamp ON form_responses_raw(timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_edit_date ON form_responses_raw(edit_date)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_form_uuid ON form_responses_raw(form_uuid)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_quote ON form_responses_raw(is_quote_template, parse_status)")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS quote_template_form_responses (
form_response_uuid TEXT PRIMARY KEY,
discovered_at TEXT NOT NULL,
job_uuid TEXT,
form_uuid TEXT NOT NULL,
author_name TEXT,
description TEXT,
desired_job_materials_json TEXT NOT NULL,
parsed_json TEXT NOT NULL,
queued_at TEXT,
processed_at TEXT,
process_status TEXT NOT NULL DEFAULT 'parsed',
process_error TEXT
)
"""
)
conn.commit()
def create_poll_run(conn: sqlite3.Connection, since: str, filter_field: str, filter_expr: str) -> int:
cur = conn.execute(
"""
INSERT INTO poll_runs (started_at, since_value, filter_field, filter_expression)
VALUES (?, ?, ?, ?)
""",
(utc_now(), since, filter_field, filter_expr),
)
conn.commit()
return int(cur.lastrowid)
def finish_poll_run(conn: sqlite3.Connection, run_id: int, **values: Any) -> None:
allowed = {
"finished_at",
"http_status",
"fetched_count",
"inserted_count",
"updated_count",
"quote_match_count",
"newly_queued_count",
"error",
}
values.setdefault("finished_at", utc_now())
pairs = [(k, v) for k, v in values.items() if k in allowed]
if not pairs:
return
set_sql = ", ".join(f"{k} = ?" for k, _ in pairs)
params = [v for _, v in pairs] + [run_id]
conn.execute(f"UPDATE poll_runs SET {set_sql} WHERE id = ?", params)
conn.commit()
def fetch_form_responses(
*,
api_key: str,
base_url: str,
endpoint: str,
filter_field: str,
since: str,
timeout: int,
) -> Tuple[int, List[Dict[str, Any]], str]:
# ServiceM8 filter syntax confirmed against the live API:
# ?$filter=timestamp gt '2026-05-04 00:00:00'
filter_expr = f"{filter_field} gt '{since}'"
url = f"{base_url.rstrip('/')}{endpoint}"
headers = {"X-Api-Key": api_key, "Accept": "application/json"}
response = requests.get(url, headers=headers, params={"$filter": filter_expr}, timeout=timeout)
if not response.ok:
raise RuntimeError(f"HTTP {response.status_code}: {response.text[:1000]}")
data = response.json()
if not isinstance(data, list):
raise RuntimeError(f"Expected list response, got {type(data).__name__}")
return response.status_code, data, filter_expr
def insert_or_update_raw(
conn: sqlite3.Connection,
row: Dict[str, Any],
*,
quote_template_form_uuid: str,
now: str,
) -> Tuple[bool, bool]:
uuid = str(row.get("uuid") or "").strip()
if not uuid:
raise ValueError("Form response row missing uuid")
is_quote = 1 if str(row.get("form_uuid") or "").strip() == quote_template_form_uuid else 0
existing = conn.execute("SELECT uuid FROM form_responses_raw WHERE uuid = ?", (uuid,)).fetchone()
raw_json = json.dumps(row, ensure_ascii=False, sort_keys=True)
values = (
uuid,
now,
now,
row.get("active"),
row.get("edit_date"),
row.get("form_uuid"),
row.get("staff_uuid"),
row.get("regarding_object"),
row.get("regarding_object_uuid"),
row.get("timestamp"),
row.get("form_by_staff_uuid"),
row.get("document_attachment_uuid"),
row.get("asset_uuid"),
is_quote,
raw_json,
)
if existing:
conn.execute(
"""
UPDATE form_responses_raw
SET last_seen_at = ?,
seen_count = seen_count + 1,
active = ?,
edit_date = ?,
form_uuid = ?,
staff_uuid = ?,
regarding_object = ?,
regarding_object_uuid = ?,
timestamp = ?,
form_by_staff_uuid = ?,
document_attachment_uuid = ?,
asset_uuid = ?,
is_quote_template = ?,
raw_json = ?
WHERE uuid = ?
""",
(
now,
row.get("active"),
row.get("edit_date"),
row.get("form_uuid"),
row.get("staff_uuid"),
row.get("regarding_object"),
row.get("regarding_object_uuid"),
row.get("timestamp"),
row.get("form_by_staff_uuid"),
row.get("document_attachment_uuid"),
row.get("asset_uuid"),
is_quote,
raw_json,
uuid,
),
)
return False, bool(is_quote)
conn.execute(
"""
INSERT INTO form_responses_raw (
uuid, first_seen_at, last_seen_at, active, edit_date, form_uuid,
staff_uuid, regarding_object, regarding_object_uuid, timestamp,
form_by_staff_uuid, document_attachment_uuid, asset_uuid,
is_quote_template, raw_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
values,
)
return True, bool(is_quote)
def ensure_queue_writable(queue_path: Path) -> None:
queue_path.parent.mkdir(parents=True, exist_ok=True)
if queue_path.exists():
with queue_path.open("a", encoding="utf-8"):
pass
else:
with queue_path.open("a", encoding="utf-8"):
pass
def queue_record(queue_path: Path, parsed: Dict[str, Any], discovered_at: str) -> None:
record = {
"queued_at": discovered_at,
"source": "formresponse_poll",
"form_uuid": parsed.get("form_uuid", ""),
"form_response_uuid": parsed.get("form_response_uuid", ""),
"job_uuid": parsed.get("job_uuid", ""),
"author_name": parsed.get("author_name", ""),
"description": parsed.get("description", ""),
"desired_job_materials": parsed.get("desired_job_materials", []),
}
with queue_path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record, ensure_ascii=False) + "\n")
def parse_and_store_quote_response(
conn: sqlite3.Connection,
row: Dict[str, Any],
*,
queue_path: Path,
write_queue: bool,
discovered_at: str,
) -> bool:
uuid = str(row.get("uuid") or "").strip()
existing = conn.execute(
"SELECT form_response_uuid, parsed_json, queued_at FROM quote_template_form_responses WHERE form_response_uuid = ?",
(uuid,),
).fetchone()
if existing:
if write_queue and not existing["queued_at"]:
parsed = json.loads(existing["parsed_json"])
queue_record(queue_path, parsed, discovered_at)
conn.execute(
"UPDATE quote_template_form_responses SET queued_at = ? WHERE form_response_uuid = ?",
(discovered_at, uuid),
)
return True
return False
try:
parsed = parse_quote_template_form_response({"data": row})
except Exception as exc:
conn.execute(
"UPDATE form_responses_raw SET parse_status = ?, parse_error = ? WHERE uuid = ?",
("error", str(exc), uuid),
)
return False
parsed_json = json.dumps(parsed, ensure_ascii=False, sort_keys=True)
desired_json = json.dumps(parsed.get("desired_job_materials", []), ensure_ascii=False, sort_keys=True)
queued_at = discovered_at if write_queue else None
conn.execute(
"""
INSERT INTO quote_template_form_responses (
form_response_uuid, discovered_at, job_uuid, form_uuid, author_name,
description, desired_job_materials_json, parsed_json, queued_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
uuid,
discovered_at,
parsed.get("job_uuid", ""),
parsed.get("form_uuid", ""),
parsed.get("author_name", ""),
parsed.get("description", ""),
desired_json,
parsed_json,
queued_at,
),
)
conn.execute(
"UPDATE form_responses_raw SET parse_status = ?, parse_error = NULL WHERE uuid = ?",
("parsed", uuid),
)
if write_queue:
queue_record(queue_path, parsed, discovered_at)
return True
def summarize_rows(rows: Iterable[Dict[str, Any]], limit: int) -> List[Dict[str, Any]]:
summary = []
for row in list(rows)[:limit]:
field_data = row.get("field_data")
field_count: Optional[int] = None
if isinstance(field_data, str):
try:
parsed = json.loads(field_data)
if isinstance(parsed, list):
field_count = len(parsed)
except Exception:
field_count = None
summary.append(
{
"uuid": row.get("uuid"),
"timestamp": row.get("timestamp"),
"edit_date": row.get("edit_date"),
"form_uuid": row.get("form_uuid"),
"regarding_object": row.get("regarding_object"),
"regarding_object_uuid": row.get("regarding_object_uuid"),
"field_data_type": type(field_data).__name__,
"field_count": field_count,
}
)
return summary
def main() -> int:
parser = argparse.ArgumentParser(description="Poll ServiceM8 form responses since a timestamp and store unseen Quote Template responses")
parser.add_argument("--since", required=True, help="Timestamp for ServiceM8 filter, e.g. '2026-05-04 10:00:00'")
parser.add_argument("--filter-field", default="timestamp", choices=("timestamp", "edit_date"), help="Field to apply ServiceM8 gt filter to")
parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Local SQLite DB for poll results")
parser.add_argument("--queue", default=str(DEFAULT_QUEUE_PATH), help="JSONL queue path for parsed Quote Template jobMaterials")
parser.add_argument("--quote-template-form-uuid", default=DEFAULT_QUOTE_TEMPLATE_FORM_UUID, help="Quote Template form UUID to process")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT)
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_SECONDS)
parser.add_argument("--no-store", action="store_true", help="Fetch and print summary only; do not write DB/queue")
parser.add_argument("--no-queue", action="store_true", help="Store/parse matches but do not append to JSONL queue")
parser.add_argument("--dump-json", action="store_true", help="Print full fetched ServiceM8 JSON payload")
parser.add_argument("--summary-limit", type=int, default=10, help="Number of rows to include in summary output")
args = parser.parse_args()
db_path = Path(args.db)
queue_path = Path(args.queue)
api_key = load_api_key()
filter_expr = f"{args.filter_field} gt '{args.since}'"
conn: Optional[sqlite3.Connection] = None
run_id: Optional[int] = None
try:
if not args.no_store and not args.no_queue:
# Fail before DB work if the queue cannot be appended to. This avoids
# marking a response as queued when the JSONL append did not happen.
ensure_queue_writable(queue_path)
if not args.no_store:
init_db(db_path)
conn = get_conn(db_path)
run_id = create_poll_run(conn, args.since, args.filter_field, filter_expr)
http_status, rows, filter_expr = fetch_form_responses(
api_key=api_key,
base_url=args.base_url,
endpoint=args.endpoint,
filter_field=args.filter_field,
since=args.since,
timeout=args.timeout,
)
if args.dump_json:
print(json.dumps(rows, indent=2, ensure_ascii=False))
inserted = updated = quote_matches = newly_queued = 0
now = utc_now()
if conn is not None:
for row in rows:
was_inserted, is_quote = insert_or_update_raw(
conn,
row,
quote_template_form_uuid=args.quote_template_form_uuid,
now=now,
)
inserted += 1 if was_inserted else 0
updated += 0 if was_inserted else 1
if is_quote:
quote_matches += 1
if parse_and_store_quote_response(
conn,
row,
queue_path=queue_path,
write_queue=not args.no_queue,
discovered_at=now,
):
newly_queued += 1
conn.commit()
if run_id is not None:
finish_poll_run(
conn,
run_id,
http_status=http_status,
fetched_count=len(rows),
inserted_count=inserted,
updated_count=updated,
quote_match_count=quote_matches,
newly_queued_count=newly_queued,
)
result = {
"ok": True,
"http_status": http_status,
"filter": filter_expr,
"fetched_count": len(rows),
"inserted_count": inserted,
"updated_count": updated,
"quote_template_form_uuid": args.quote_template_form_uuid,
"quote_match_count": quote_matches if conn is not None else sum(1 for r in rows if r.get("form_uuid") == args.quote_template_form_uuid),
"newly_queued_count": newly_queued,
"db_path": None if args.no_store else str(db_path),
"queue_path": None if args.no_queue or args.no_store else str(queue_path),
"sample": summarize_rows(rows, args.summary_limit),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0
except Exception as exc:
if conn is not None and run_id is not None:
finish_poll_run(conn, run_id, error=str(exc))
print(json.dumps({"ok": False, "error": str(exc), "filter": filter_expr}, indent=2), file=sys.stderr)
return 1
finally:
if conn is not None:
conn.close()
if __name__ == "__main__":
raise SystemExit(main())
+383 -4
View File
@@ -11,7 +11,8 @@ from fastapi.responses import HTMLResponse
DB_PATH = os.getenv("WEBHOOK_DB_PATH", "./servicem8_webhooks.db")
STATE_DB_PATH = os.getenv("WEBHOOK_STATE_DB_PATH", "./servicem8_quote_materials_state.db")
APP_HOST = os.getenv("WEBHOOK_INSPECTOR_HOST", "127.0.0.1")
POLL_DB_PATH = os.getenv("WEBHOOK_POLL_DB_PATH", "./servicem8_formresponse_poll.db")
APP_HOST = os.getenv("WEBHOOK_INSPECTOR_HOST", "0.0.0.0")
APP_PORT = int(os.getenv("WEBHOOK_INSPECTOR_PORT", "18355"))
PAGE_SIZE = 50
@@ -30,13 +31,22 @@ def get_state_conn():
return conn
def get_poll_conn():
conn = sqlite3.connect(POLL_DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def html_page(title: str, body: str) -> HTMLResponse:
nav = """
<nav>
<a href='/'>Dashboard</a>
<a href='/events'>Events</a>
<a href='/objects'>Objects</a>
<a href='/form-responses'>Form responses</a>
<a href='/form-responses'>Webhook form responses</a>
<a href='/poll/form-responses'>Polled form responses</a>
<a href='/poll/quote-template'>Polled quote templates</a>
<a href='/poll/apply-runs'>Apply runs</a>
<a href='/generated-materials'>Generated materials</a>
</nav>
"""
@@ -122,7 +132,7 @@ def link_with_params(path, **params):
@app.get("/health")
def health():
return {"ok": True, "db_path": DB_PATH, "state_db_path": STATE_DB_PATH}
return {"ok": True, "db_path": DB_PATH, "state_db_path": STATE_DB_PATH, "poll_db_path": POLL_DB_PATH}
@app.get("/", response_class=HTMLResponse)
@@ -146,7 +156,27 @@ def dashboard():
except sqlite3.Error:
pass
poll_counts = {
"poll_runs": 0,
"polled_form_responses": 0,
"polled_quote_templates": 0,
}
latest_poll_run = None
latest_polled_form = None
latest_polled_quote = None
try:
with closing(get_poll_conn()) as conn:
poll_counts["poll_runs"] = conn.execute("select count(*) from poll_runs").fetchone()[0]
poll_counts["polled_form_responses"] = conn.execute("select count(*) from form_responses_raw").fetchone()[0]
poll_counts["polled_quote_templates"] = conn.execute("select count(*) from quote_template_form_responses").fetchone()[0]
latest_poll_run = conn.execute("select finished_at from poll_runs order by id desc limit 1").fetchone()
latest_polled_form = conn.execute("select timestamp from form_responses_raw order by timestamp desc limit 1").fetchone()
latest_polled_quote = conn.execute("select discovered_at from quote_template_form_responses order by discovered_at desc limit 1").fetchone()
except sqlite3.Error:
pass
counts["generated_job_materials"] = state_count
counts.update(poll_counts)
cards = "".join(
f"<div class='card'><div class='muted'>{escape(name)}</div><div class='big'>{count}</div></div>"
@@ -158,10 +188,14 @@ def dashboard():
<div class='summary-grid'>
<div><strong>DB path</strong></div><div><code>{escape(DB_PATH)}</code></div>
<div><strong>State DB path</strong></div><div><code>{escape(STATE_DB_PATH)}</code></div>
<div><strong>Poll DB path</strong></div><div><code>{escape(POLL_DB_PATH)}</code></div>
<div><strong>Latest event</strong></div><div>{escape(latest_event[0] if latest_event else '')}</div>
<div><strong>Latest object</strong></div><div>{escape(latest_object[0] if latest_object else '')}</div>
<div><strong>Latest form response</strong></div><div>{escape(latest_form[0] if latest_form else '')}</div>
<div><strong>Latest generated material row</strong></div><div>{escape(latest_generated[0] if latest_generated else '')}</div>
<div><strong>Latest poll run</strong></div><div>{escape(latest_poll_run[0] if latest_poll_run else '')}</div>
<div><strong>Latest polled form timestamp</strong></div><div>{escape(latest_polled_form[0] if latest_polled_form else '')}</div>
<div><strong>Latest polled quote discovered</strong></div><div>{escape(latest_polled_quote[0] if latest_polled_quote else '')}</div>
</div>
</div>
<div class='section'>
@@ -169,7 +203,11 @@ def dashboard():
<ul>
<li><a href='/events'>Browse webhook events</a></li>
<li><a href='/objects'>Browse object webhooks</a></li>
<li><a href='/form-responses'>Browse form responses</a></li>
<li><a href='/form-responses'>Browse webhook form responses</a></li>
<li><a href='/poll/form-responses'>Browse polled form responses</a></li>
<li><a href='/poll/quote-template'>Browse parsed polled Quote Template responses</a></li>
<li><a href='/poll/runs'>Browse poll runs</a></li>
<li><a href='/poll/apply-runs'>Browse dry-run/apply runs</a></li>
<li><a href='/generated-materials'>Browse generated job-material state</a></li>
</ul>
</div>
@@ -504,6 +542,347 @@ def form_response_detail(row_id: int):
return html_page(f"Form response {row_id}", body)
@app.get("/poll/apply-runs", response_class=HTMLResponse)
def list_apply_runs(page: int = Query(1, ge=1)):
offset = (page - 1) * PAGE_SIZE
try:
with closing(get_poll_conn()) as conn:
rows = conn.execute(
"""
select id, form_response_uuid, job_uuid, mode, started_at, finished_at,
desired_count, created_count, status, error
from quote_template_apply_runs
order by id desc
limit ? offset ?
""",
(PAGE_SIZE, offset),
).fetchall()
except sqlite3.Error as e:
return html_page("Apply runs", f"<div class='card'>Apply-run table unavailable: {escape(str(e))}</div>")
table_rows = []
for row in rows:
table_rows.append(
f"<tr><td><a href='/poll/apply-runs/{row['id']}'>{row['id']}</a></td>"
f"<td>{escape(row['mode'] or '')}</td><td>{escape(row['status'] or '')}</td>"
f"<td><a href='/poll/quote-template/{escape(row['form_response_uuid'])}'>{escape(row['form_response_uuid'])}</a></td>"
f"<td>{escape(row['job_uuid'] or '')}</td><td>{escape(row['started_at'] or '')}</td><td>{escape(row['finished_at'] or '')}</td>"
f"<td>{row['desired_count']}</td><td>{row['created_count']}</td><td>{escape((row['error'] or '')[:160])}</td></tr>"
)
body = f"""
<table>
<thead><tr><th>ID</th><th>Mode</th><th>Status</th><th>Form response UUID</th><th>Job UUID</th><th>Started</th><th>Finished</th><th>Desired</th><th>Created</th><th>Error</th></tr></thead>
<tbody>{''.join(table_rows) or "<tr><td colspan='10'>No apply runs found yet.</td></tr>"}</tbody>
</table>
<div class='pagination'>
{f"<a href='{link_with_params('/poll/apply-runs', page=page-1)}'>← Prev</a>" if page > 1 else ''}
<a href='{link_with_params('/poll/apply-runs', page=page+1)}'>Next →</a>
</div>
"""
return html_page("Apply runs", body)
@app.get("/poll/apply-runs/{run_id}", response_class=HTMLResponse)
def apply_run_detail(run_id: int):
try:
with closing(get_poll_conn()) as conn:
run = conn.execute("select * from quote_template_apply_runs where id = ?", (run_id,)).fetchone()
rows = conn.execute(
"select * from quote_template_apply_run_rows where run_id = ? order by row_index asc",
(run_id,),
).fetchall()
except sqlite3.Error as e:
return html_page("Apply run", f"<div class='card'>Apply-run table unavailable: {escape(str(e))}</div>")
if not run:
raise HTTPException(status_code=404, detail="Apply run not found")
table_rows = []
for row in rows:
payload = parse_json_field(row['api_payload_json'], {}) or {}
table_rows.append(
f"<tr><td>{row['row_index']}</td><td>{escape(row['action'] or '')}</td><td>{escape(row['kind'] or '')}</td>"
f"<td>{escape(row['name'] or '')}</td><td>{escape(row['job_material_uuid'] or '')}</td>"
f"<td>{escape(row['source_question'] or '')}</td><td>{escape(row['error'] or '')}</td></tr>"
f"<tr><td></td><td colspan='6'><details><summary>API payload</summary><pre>{escape(pretty_json(payload))}</pre></details></td></tr>"
)
body = f"""
<div class='card summary-grid'>
<div><strong>Run ID</strong></div><div>{run['id']}</div>
<div><strong>Mode</strong></div><div>{escape(run['mode'] or '')}</div>
<div><strong>Status</strong></div><div>{escape(run['status'] or '')}</div>
<div><strong>Form response UUID</strong></div><div><a href='/poll/quote-template/{escape(run['form_response_uuid'])}'>{escape(run['form_response_uuid'])}</a></div>
<div><strong>Job UUID</strong></div><div>{escape(run['job_uuid'] or '')}</div>
<div><strong>Started</strong></div><div>{escape(run['started_at'] or '')}</div>
<div><strong>Finished</strong></div><div>{escape(run['finished_at'] or '')}</div>
<div><strong>Desired</strong></div><div>{run['desired_count']}</div>
<div><strong>Created</strong></div><div>{run['created_count']}</div>
<div><strong>Error</strong></div><div>{escape(run['error'] or '')}</div>
</div>
<div class='section'><h2>Rows</h2><table><thead><tr><th>#</th><th>Action</th><th>Kind</th><th>Name</th><th>Created UUID</th><th>Source question</th><th>Error</th></tr></thead><tbody>{''.join(table_rows) or "<tr><td colspan='7'>No rows recorded.</td></tr>"}</tbody></table></div>
"""
return html_page(f"Apply run {run_id}", body)
@app.get("/poll/runs", response_class=HTMLResponse)
def list_poll_runs(page: int = Query(1, ge=1)):
offset = (page - 1) * PAGE_SIZE
try:
with closing(get_poll_conn()) as conn:
rows = conn.execute(
"""
select id, started_at, finished_at, since_value, filter_field,
fetched_count, inserted_count, updated_count,
quote_match_count, newly_queued_count, error
from poll_runs order by id desc limit ? offset ?
""",
(PAGE_SIZE, offset),
).fetchall()
except sqlite3.Error as e:
return html_page("Poll runs", f"<div class='card'>Poll DB unavailable: {escape(str(e))}</div>")
table_rows = []
for row in rows:
table_rows.append(
f"<tr><td>{row['id']}</td><td>{escape(row['started_at'] or '')}</td>"
f"<td>{escape(row['finished_at'] or '')}</td>"
f"<td>{escape(row['filter_field'] or '')} gt {escape(row['since_value'] or '')}</td>"
f"<td>{row['fetched_count']}</td><td>{row['inserted_count']}</td><td>{row['updated_count']}</td>"
f"<td>{row['quote_match_count']}</td><td>{row['newly_queued_count']}</td>"
f"<td>{escape((row['error'] or '')[:180])}</td></tr>"
)
body = f"""
<table>
<thead><tr><th>ID</th><th>Started</th><th>Finished</th><th>Filter</th><th>Fetched</th><th>Inserted</th><th>Updated</th><th>Quote matches</th><th>Queued</th><th>Error</th></tr></thead>
<tbody>{''.join(table_rows) or "<tr><td colspan='10'>No poll runs found.</td></tr>"}</tbody>
</table>
<div class='pagination'>
{f"<a href='{link_with_params('/poll/runs', page=page-1)}'>← Prev</a>" if page > 1 else ''}
<a href='{link_with_params('/poll/runs', page=page+1)}'>Next →</a>
</div>
"""
return html_page("Poll runs", body)
@app.get("/poll/form-responses", response_class=HTMLResponse)
def list_polled_form_responses(q: str = Query(""), quote_only: int = Query(0), page: int = Query(1, ge=1)):
offset = (page - 1) * PAGE_SIZE
clauses = []
params = []
if quote_only:
clauses.append("is_quote_template = 1")
if q.strip():
like = f"%{q.strip()}%"
clauses.append("(uuid like ? or form_uuid like ? or regarding_object_uuid like ? or timestamp like ? or edit_date like ? or raw_json like ?)")
params.extend([like, like, like, like, like, like])
where = " where " + " and ".join(clauses) if clauses else ""
try:
with closing(get_poll_conn()) as conn:
rows = conn.execute(
f"""
select uuid, first_seen_at, last_seen_at, seen_count, timestamp, edit_date,
form_uuid, regarding_object, regarding_object_uuid,
is_quote_template, parse_status, parse_error
from form_responses_raw {where}
order by timestamp desc, edit_date desc limit ? offset ?
""",
(*params, PAGE_SIZE, offset),
).fetchall()
except sqlite3.Error as e:
return html_page("Polled form responses", f"<div class='card'>Poll DB unavailable: {escape(str(e))}</div>")
table_rows = []
for row in rows:
quote_pill = "<span class='pill'>Quote Template</span>" if row["is_quote_template"] else ""
table_rows.append(
f"<tr><td><a href='/poll/form-responses/{escape(row['uuid'])}'>{escape(row['uuid'])}</a></td>"
f"<td>{escape(row['timestamp'] or '')}</td><td>{escape(row['edit_date'] or '')}</td>"
f"<td>{escape(row['form_uuid'] or '')}<br>{quote_pill}</td>"
f"<td>{escape(row['regarding_object'] or '')}</td><td>{escape(row['regarding_object_uuid'] or '')}</td>"
f"<td>{escape(row['parse_status'] or '')}</td><td>{row['seen_count']}</td><td>{escape(row['last_seen_at'] or '')}</td></tr>"
)
body = f"""
<div class='toolbar'>
<form method='get'>
<label>Search <input type='text' name='q' value='{escape(q)}' placeholder='uuid, job uuid, form uuid, timestamp'></label>
<label><input type='checkbox' name='quote_only' value='1' {'checked' if quote_only else ''}> Quote Template only</label>
<input type='hidden' name='page' value='1'>
<button type='submit'>Filter</button>
</form>
</div>
<table>
<thead><tr><th>UUID</th><th>Timestamp</th><th>Edit date</th><th>Form UUID</th><th>Regarding</th><th>Object UUID</th><th>Parse</th><th>Seen</th><th>Last seen</th></tr></thead>
<tbody>{''.join(table_rows) or "<tr><td colspan='9'>No rows found.</td></tr>"}</tbody>
</table>
<div class='pagination'>
{f"<a href='{link_with_params('/poll/form-responses', q=q, quote_only=quote_only, page=page-1)}'>← Prev</a>" if page > 1 else ''}
<a href='{link_with_params('/poll/form-responses', q=q, quote_only=quote_only, page=page+1)}'>Next →</a>
</div>
"""
return html_page("Polled form responses", body)
@app.get("/poll/form-responses/{form_response_uuid}", response_class=HTMLResponse)
def polled_form_response_detail(form_response_uuid: str):
try:
with closing(get_poll_conn()) as conn:
row = conn.execute("select * from form_responses_raw where uuid = ?", (form_response_uuid,)).fetchone()
quote = conn.execute("select * from quote_template_form_responses where form_response_uuid = ?", (form_response_uuid,)).fetchone()
except sqlite3.Error as e:
return html_page("Polled form response", f"<div class='card'>Poll DB unavailable: {escape(str(e))}</div>")
if not row:
raise HTTPException(status_code=404, detail="Polled form response not found")
raw = parse_json_field(row["raw_json"], {}) or {}
field_data = parse_json_field(raw.get("field_data"), []) or []
field_rows = []
if isinstance(field_data, list):
for item in sorted(field_data, key=lambda x: x.get("SortOrder", 0) if isinstance(x, dict) else 0):
if isinstance(item, dict):
field_rows.append(
f"<tr><td>{escape(str(item.get('SortOrder', '')))}</td><td>{escape(item.get('FieldType', ''))}</td>"
f"<td>{escape(item.get('Question', ''))}</td><td>{escape(item.get('Response', ''))}</td></tr>"
)
quote_link = f"<div><strong>Parsed quote</strong></div><div><a href='/poll/quote-template/{escape(form_response_uuid)}'>Open parsed Quote Template view</a></div>" if quote else ""
body = f"""
<div class='card summary-grid'>
<div><strong>UUID</strong></div><div>{escape(row['uuid'])}</div>
<div><strong>Timestamp</strong></div><div>{escape(row['timestamp'] or '')}</div>
<div><strong>Edit date</strong></div><div>{escape(row['edit_date'] or '')}</div>
<div><strong>Form UUID</strong></div><div>{escape(row['form_uuid'] or '')}</div>
<div><strong>Regarding object</strong></div><div>{escape(row['regarding_object'] or '')}</div>
<div><strong>Regarding UUID</strong></div><div>{escape(row['regarding_object_uuid'] or '')}</div>
<div><strong>First seen</strong></div><div>{escape(row['first_seen_at'] or '')}</div>
<div><strong>Last seen</strong></div><div>{escape(row['last_seen_at'] or '')}</div>
<div><strong>Seen count</strong></div><div>{row['seen_count']}</div>
<div><strong>Parse status</strong></div><div>{escape(row['parse_status'] or '')}</div>
<div><strong>Parse error</strong></div><div>{escape(row['parse_error'] or '')}</div>
{quote_link}
</div>
<div class='section'><h2>Decoded field data</h2><table><thead><tr><th>Order</th><th>Type</th><th>Question</th><th>Response</th></tr></thead><tbody>{''.join(field_rows) or "<tr><td colspan='4'>No decoded field data.</td></tr>"}</tbody></table></div>
<div class='section'><h2>Raw API row</h2><pre>{escape(pretty_json(raw))}</pre></div>
"""
return html_page(f"Polled form response {form_response_uuid}", body)
@app.get("/poll/quote-template", response_class=HTMLResponse)
def list_polled_quote_templates(page: int = Query(1, ge=1)):
offset = (page - 1) * PAGE_SIZE
try:
with closing(get_poll_conn()) as conn:
rows = conn.execute(
"""
select form_response_uuid, discovered_at, job_uuid, form_uuid, author_name,
description, desired_job_materials_json, queued_at, processed_at,
process_status, process_error
from quote_template_form_responses
order by discovered_at desc, form_response_uuid desc limit ? offset ?
""",
(PAGE_SIZE, offset),
).fetchall()
except sqlite3.Error as e:
return html_page("Polled Quote Template responses", f"<div class='card'>Poll DB unavailable: {escape(str(e))}</div>")
table_rows = []
for row in rows:
try:
material_count = len(json.loads(row["desired_job_materials_json"] or "[]"))
except Exception:
material_count = "?"
table_rows.append(
f"<tr><td><a href='/poll/quote-template/{escape(row['form_response_uuid'])}'>{escape(row['form_response_uuid'])}</a></td>"
f"<td>{escape(row['discovered_at'] or '')}</td><td>{escape(row['job_uuid'] or '')}</td>"
f"<td>{escape(row['description'] or '')}</td><td>{material_count}</td>"
f"<td>{escape(row['queued_at'] or '')}</td><td>{escape(row['process_status'] or '')}</td></tr>"
)
body = f"""
<table>
<thead><tr><th>Form response UUID</th><th>Discovered</th><th>Job UUID</th><th>Description</th><th>Rows</th><th>Queued</th><th>Status</th></tr></thead>
<tbody>{''.join(table_rows) or "<tr><td colspan='7'>No quote template rows found.</td></tr>"}</tbody>
</table>
<div class='pagination'>
{f"<a href='{link_with_params('/poll/quote-template', page=page-1)}'>← Prev</a>" if page > 1 else ''}
<a href='{link_with_params('/poll/quote-template', page=page+1)}'>Next →</a>
</div>
"""
return html_page("Polled Quote Template responses", body)
@app.get("/poll/quote-template/{form_response_uuid}", response_class=HTMLResponse)
def polled_quote_template_detail(form_response_uuid: str):
try:
with closing(get_poll_conn()) as conn:
row = conn.execute("select * from quote_template_form_responses where form_response_uuid = ?", (form_response_uuid,)).fetchone()
except sqlite3.Error as e:
return html_page("Polled Quote Template response", f"<div class='card'>Poll DB unavailable: {escape(str(e))}</div>")
if not row:
raise HTTPException(status_code=404, detail="Polled quote template response not found")
parsed = parse_json_field(row["parsed_json"], {}) or {}
materials = parse_json_field(row["desired_job_materials_json"], []) or []
material_rows = []
for item in materials:
material_rows.append(
f"<tr><td>{escape(str(item.get('sort_order', '')))}</td><td>{escape(item.get('kind', ''))}</td>"
f"<td>{escape(item.get('name', ''))}</td><td>{escape(item.get('material_uuid', ''))}</td>"
f"<td>{escape(item.get('source_question', ''))}</td></tr>"
)
dry_run_cmd = f"/opt/webhooks/apply_polled_quote_template_jobmaterials.py --uuid {row['form_response_uuid']} --pretty"
apply_cmd = f"/opt/webhooks/apply_polled_quote_template_jobmaterials.py --uuid {row['form_response_uuid']} --apply --pretty"
try:
with closing(get_poll_conn()) as conn:
recent_runs = conn.execute(
"select id, mode, status, started_at, finished_at, desired_count, created_count, error from quote_template_apply_runs where form_response_uuid = ? order by id desc limit 8",
(row['form_response_uuid'],),
).fetchall()
except sqlite3.Error:
recent_runs = []
recent_run_rows = []
for run in recent_runs:
recent_run_rows.append(
f"<tr><td><a href='/poll/apply-runs/{run['id']}'>{run['id']}</a></td><td>{escape(run['mode'] or '')}</td><td>{escape(run['status'] or '')}</td><td>{escape(run['started_at'] or '')}</td><td>{escape(run['finished_at'] or '')}</td><td>{run['desired_count']}</td><td>{run['created_count']}</td><td>{escape((run['error'] or '')[:120])}</td></tr>"
)
body = f"""
<div class='card summary-grid'>
<div><strong>Form response UUID</strong></div><div>{escape(row['form_response_uuid'])}</div>
<div><strong>Discovered</strong></div><div>{escape(row['discovered_at'] or '')}</div>
<div><strong>Job UUID</strong></div><div>{escape(row['job_uuid'] or '')}</div>
<div><strong>Form UUID</strong></div><div>{escape(row['form_uuid'] or '')}</div>
<div><strong>Author</strong></div><div>{escape(row['author_name'] or '')}</div>
<div><strong>Description</strong></div><div>{escape(row['description'] or '')}</div>
<div><strong>Queued</strong></div><div>{escape(row['queued_at'] or '')}</div>
<div><strong>Processed</strong></div><div>{escape(row['processed_at'] or '')}</div>
<div><strong>Status</strong></div><div>{escape(row['process_status'] or '')}</div>
<div><strong>Error</strong></div><div>{escape(row['process_error'] or '')}</div>
<div><strong>Raw polled response</strong></div><div><a href='/poll/form-responses/{escape(row['form_response_uuid'])}'>Open raw polled form response</a></div>
</div>
<div class='section'>
<h2>Selective apply commands</h2>
<div class='card'>
<p><strong>Dry-run first:</strong></p>
<pre>{escape(dry_run_cmd)}</pre>
<p><strong>Apply to ServiceM8 only after checking the dry-run:</strong></p>
<pre>{escape(apply_cmd)}</pre>
</div>
</div>
<div class='section'>
<h2>Recent dry-run/apply runs for this response</h2>
<table><thead><tr><th>ID</th><th>Mode</th><th>Status</th><th>Started</th><th>Finished</th><th>Desired</th><th>Created</th><th>Error</th></tr></thead><tbody>{''.join(recent_run_rows) or "<tr><td colspan='8'>No dry-run/apply runs yet.</td></tr>"}</tbody></table>
</div>
<div class='section'><h2>Desired jobMaterial rows</h2><table><thead><tr><th>Sort</th><th>Kind</th><th>Name</th><th>Material UUID</th><th>Source question</th></tr></thead><tbody>{''.join(material_rows) or "<tr><td colspan='5'>No desired jobMaterial rows.</td></tr>"}</tbody></table></div>
<div class='section'><h2>Parsed JSON</h2><pre>{escape(pretty_json(parsed))}</pre></div>
"""
return html_page(f"Polled Quote Template {form_response_uuid}", body)
if __name__ == "__main__":
import uvicorn
+6
View File
@@ -145,6 +145,7 @@ def build_job_material_line(
def parse_quote_template_field_rows(field_rows: List[Dict[str, Any]]) -> Dict[str, Any]:
ordered = sorted(field_rows, key=sort_key)
description = ""
author_name = ""
include_items: List[Dict[str, Any]] = []
exclude_items: List[Dict[str, Any]] = []
extra_include_items: List[Dict[str, Any]] = []
@@ -161,6 +162,10 @@ def parse_quote_template_field_rows(field_rows: List[Dict[str, Any]]) -> Dict[st
description = response
continue
if question == "Name":
author_name = response
continue
if question.startswith("Item ") and response:
include_items.append(
{
@@ -253,6 +258,7 @@ def parse_quote_template_field_rows(field_rows: List[Dict[str, Any]]) -> Dict[st
return {
"description": description,
"author_name": author_name,
"include_items": include_items,
"extra_include_items": extra_include_items,
"exclude_items": exclude_items,
+1
View File
@@ -170,6 +170,7 @@ def queue_quote_template_jobmaterials(payload, received_at: str):
"form_uuid": form_uuid,
"form_response_uuid": parsed.get("form_response_uuid", ""),
"job_uuid": parsed.get("job_uuid", ""),
"author_name": parsed.get("author_name", ""),
"description": parsed.get("description", ""),
"desired_job_materials": parsed.get("desired_job_materials", []),
}