From 22c1040fe8d19000a0e881bed695280813c6e22b Mon Sep 17 00:00:00 2001 From: "Soren (Molty)" Date: Mon, 4 May 2026 11:18:17 +1000 Subject: [PATCH] Intial new scripts for pivot logic approach --- check_new_quote_form_responses.py | 154 +++++++++ poll_form_responses_since.py | 521 ++++++++++++++++++++++++++++++ 2 files changed, 675 insertions(+) create mode 100755 check_new_quote_form_responses.py create mode 100755 poll_form_responses_since.py diff --git a/check_new_quote_form_responses.py b/check_new_quote_form_responses.py new file mode 100755 index 0000000..7c1fb52 --- /dev/null +++ b/check_new_quote_form_responses.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +import argparse +import json +import sqlite3 +import sys +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional + +DB_PATH = Path(__file__).with_name("servicem8_webhooks.db") +STATE_PATH = Path(__file__).with_name(".quote_form_response_watch_state.json") +QUOTE_TEMPLATE_FORM_UUID = "3621b6be-1d19-4756-9ab4-9d5e4120f6d9" + + +def load_state(path: Path) -> Dict[str, Any]: + if not path.exists(): + return {} + try: + return json.loads(path.read_text()) + except Exception: + return {} + + +def save_state(path: Path, state: Dict[str, Any]) -> None: + path.write_text(json.dumps(state, indent=2, ensure_ascii=False) + "\n") + + +def get_conn(db_path: Path) -> sqlite3.Connection: + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + return conn + + +def query_latest_quote_response(db_path: Path) -> Optional[Dict[str, Any]]: + sql = """ + SELECT + id, + received_at, + json_extract(payload_json, '$.id') AS webhook_event_id, + json_extract(payload_json, '$.type') AS event_type, + json_extract(payload_json, '$.data.uuid') AS form_response_uuid, + json_extract(payload_json, '$.data.form_uuid') AS form_uuid, + json_extract(payload_json, '$.data.regarding_object_uuid') AS job_uuid, + json_extract(payload_json, '$.data.edit_date') AS edit_date, + json_extract(payload_json, '$.data.created_by_staff_uuid') AS created_by_staff_uuid, + json_extract(payload_json, '$.data.status') AS status, + payload_json + FROM webhook_form_responses + WHERE json_extract(payload_json, '$.data.form_uuid') = ? + ORDER BY datetime(received_at) DESC, id DESC + LIMIT 1 + """ + with get_conn(db_path) as conn: + row = conn.execute(sql, (QUOTE_TEMPLATE_FORM_UUID,)).fetchone() + return dict(row) if row else None + + +def build_summary(row: Dict[str, Any]) -> str: + received_at = row.get("received_at") or "unknown" + edit_date = row.get("edit_date") or "unknown" + job_uuid = row.get("job_uuid") or "unknown" + form_response_uuid = row.get("form_response_uuid") or "unknown" + return ( + "New quote template form response received\n" + f"- Received: {received_at}\n" + f"- Edit date: {edit_date}\n" + f"- Job UUID: {job_uuid}\n" + f"- Form response UUID: {form_response_uuid}" + ) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Check for new quote template form responses in the ServiceM8 webhook DB") + parser.add_argument("--db", default=str(DB_PATH), help="Path to servicem8_webhooks.db") + parser.add_argument("--state", default=str(STATE_PATH), help="Path to local state file") + parser.add_argument("--prime", action="store_true", help="Record the current latest event as seen without alerting") + parser.add_argument("--json", action="store_true", help="Print machine-readable JSON") + args = parser.parse_args() + + db_path = Path(args.db) + state_path = Path(args.state) + + if not db_path.exists(): + print(f"Database not found: {db_path}", file=sys.stderr) + return 2 + + latest = query_latest_quote_response(db_path) + state = load_state(state_path) + + if not latest: + result = { + "status": "no_quote_responses_found", + "alert": False, + "db_path": str(db_path), + "state_path": str(state_path), + } + print(json.dumps(result, indent=2) if args.json else "No quote template form responses found in DB") + return 0 + + current_marker = { + "id": latest.get("id"), + "received_at": latest.get("received_at"), + "form_response_uuid": latest.get("form_response_uuid"), + "job_uuid": latest.get("job_uuid"), + "edit_date": latest.get("edit_date"), + "checked_at": datetime.now().astimezone().isoformat(), + } + + if args.prime or not state: + save_state(state_path, current_marker) + result = { + "status": "primed", + "alert": False, + "latest": current_marker, + "db_path": str(db_path), + "state_path": str(state_path), + } + print(json.dumps(result, indent=2) if args.json else f"Primed latest quote template event at {current_marker['received_at']}") + return 0 + + last_seen_id = state.get("id") + last_seen_form_response_uuid = state.get("form_response_uuid") + is_new = (latest.get("id") != last_seen_id) or ( + latest.get("form_response_uuid") and latest.get("form_response_uuid") != last_seen_form_response_uuid + ) + + if is_new: + save_state(state_path, current_marker) + result = { + "status": "new_quote_response", + "alert": True, + "summary": build_summary(latest), + "latest": current_marker, + "previous": state, + "db_path": str(db_path), + "state_path": str(state_path), + } + print(json.dumps(result, indent=2) if args.json else result["summary"]) + return 0 + + result = { + "status": "no_new_quote_response", + "alert": False, + "latest": current_marker, + "previous": state, + "db_path": str(db_path), + "state_path": str(state_path), + } + print(json.dumps(result, indent=2) if args.json else f"No new quote template form response since {state.get('received_at')}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/poll_form_responses_since.py b/poll_form_responses_since.py new file mode 100755 index 0000000..955adb8 --- /dev/null +++ b/poll_form_responses_since.py @@ -0,0 +1,521 @@ +#!/usr/bin/env python3 +"""Poll ServiceM8 /formresponse.json for responses newer than a timestamp. + +This is the safety-net companion to the ServiceM8 form.response_created webhook. +It stores every API hit in a local SQLite DB, then identifies previously unseen +Quote Template form responses and parses them into desired jobMaterial rows using +servicem8_quote_template_parser.py. It does not write anything back to ServiceM8. +""" + +from __future__ import annotations + +import argparse +import importlib.util +import json +import os +import sqlite3 +import sys +from contextlib import closing +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Tuple + +import requests + +from servicem8_quote_template_parser import ( + QUOTE_TEMPLATE_FORM_UUID as PARSER_QUOTE_TEMPLATE_FORM_UUID, + parse_quote_template_form_response, +) + +SCRIPT_DIR = Path(__file__).resolve().parent +DEFAULT_DB_PATH = SCRIPT_DIR / "servicem8_formresponse_poll.db" +DEFAULT_QUEUE_PATH = SCRIPT_DIR / "quote-template-jobmaterials-poll-queue.jsonl" +DEFAULT_BASE_URL = "https://api.servicem8.com/api_1.0" +DEFAULT_ENDPOINT = "/formresponse.json" +DEFAULT_TIMEOUT_SECONDS = 30 + +# Current Quote Template form UUID. Kept as a CLI default so we can adjust without +# editing code if ServiceM8 form/template IDs change again. +DEFAULT_QUOTE_TEMPLATE_FORM_UUID = os.getenv( + "SERVICEM8_QUOTE_TEMPLATE_FORM_UUID", + PARSER_QUOTE_TEMPLATE_FORM_UUID, +) + + +def utc_now() -> str: + return datetime.now(timezone.utc).isoformat() + + +def load_api_key() -> str: + """Load API key from env, falling back to the existing local helper script. + + Prefer env vars for any new deployment. The fallback keeps this project script + runnable in the current /opt/webhooks setup without copying credentials. + """ + for name in ("SERVICEM8_ACCESS_TOKEN", "SERVICEM8_API_KEY"): + value = os.getenv(name) + if value: + return value + + fallback = SCRIPT_DIR / "servicem8-list-webhook-subscriptions-table.py" + if fallback.exists(): + spec = importlib.util.spec_from_file_location("servicem8_subs", fallback) + if spec and spec.loader: + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) # type: ignore[union-attr] + value = getattr(module, "ACCESS_TOKEN", None) + if value: + return str(value) + + raise RuntimeError("Missing SERVICEM8_ACCESS_TOKEN or SERVICEM8_API_KEY") + + +def get_conn(db_path: Path) -> sqlite3.Connection: + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + return conn + + +def init_db(db_path: Path) -> None: + db_path.parent.mkdir(parents=True, exist_ok=True) + with closing(get_conn(db_path)) as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS poll_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + started_at TEXT NOT NULL, + finished_at TEXT, + since_value TEXT NOT NULL, + filter_field TEXT NOT NULL, + filter_expression TEXT NOT NULL, + http_status INTEGER, + fetched_count INTEGER DEFAULT 0, + inserted_count INTEGER DEFAULT 0, + updated_count INTEGER DEFAULT 0, + quote_match_count INTEGER DEFAULT 0, + newly_queued_count INTEGER DEFAULT 0, + error TEXT + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS form_responses_raw ( + uuid TEXT PRIMARY KEY, + first_seen_at TEXT NOT NULL, + last_seen_at TEXT NOT NULL, + seen_count INTEGER NOT NULL DEFAULT 1, + active INTEGER, + edit_date TEXT, + form_uuid TEXT, + staff_uuid TEXT, + regarding_object TEXT, + regarding_object_uuid TEXT, + timestamp TEXT, + form_by_staff_uuid TEXT, + document_attachment_uuid TEXT, + asset_uuid TEXT, + is_quote_template INTEGER NOT NULL DEFAULT 0, + parse_status TEXT, + parse_error TEXT, + raw_json TEXT NOT NULL + ) + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_timestamp ON form_responses_raw(timestamp)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_edit_date ON form_responses_raw(edit_date)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_form_uuid ON form_responses_raw(form_uuid)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_quote ON form_responses_raw(is_quote_template, parse_status)") + conn.execute( + """ + CREATE TABLE IF NOT EXISTS quote_template_form_responses ( + form_response_uuid TEXT PRIMARY KEY, + discovered_at TEXT NOT NULL, + job_uuid TEXT, + form_uuid TEXT NOT NULL, + author_name TEXT, + description TEXT, + desired_job_materials_json TEXT NOT NULL, + parsed_json TEXT NOT NULL, + queued_at TEXT, + processed_at TEXT, + process_status TEXT NOT NULL DEFAULT 'parsed', + process_error TEXT + ) + """ + ) + conn.commit() + + +def create_poll_run(conn: sqlite3.Connection, since: str, filter_field: str, filter_expr: str) -> int: + cur = conn.execute( + """ + INSERT INTO poll_runs (started_at, since_value, filter_field, filter_expression) + VALUES (?, ?, ?, ?) + """, + (utc_now(), since, filter_field, filter_expr), + ) + conn.commit() + return int(cur.lastrowid) + + +def finish_poll_run(conn: sqlite3.Connection, run_id: int, **values: Any) -> None: + allowed = { + "finished_at", + "http_status", + "fetched_count", + "inserted_count", + "updated_count", + "quote_match_count", + "newly_queued_count", + "error", + } + values.setdefault("finished_at", utc_now()) + pairs = [(k, v) for k, v in values.items() if k in allowed] + if not pairs: + return + set_sql = ", ".join(f"{k} = ?" for k, _ in pairs) + params = [v for _, v in pairs] + [run_id] + conn.execute(f"UPDATE poll_runs SET {set_sql} WHERE id = ?", params) + conn.commit() + + +def fetch_form_responses( + *, + api_key: str, + base_url: str, + endpoint: str, + filter_field: str, + since: str, + timeout: int, +) -> Tuple[int, List[Dict[str, Any]], str]: + # ServiceM8 filter syntax confirmed against the live API: + # ?$filter=timestamp gt '2026-05-04 00:00:00' + filter_expr = f"{filter_field} gt '{since}'" + url = f"{base_url.rstrip('/')}{endpoint}" + headers = {"X-Api-Key": api_key, "Accept": "application/json"} + response = requests.get(url, headers=headers, params={"$filter": filter_expr}, timeout=timeout) + + if not response.ok: + raise RuntimeError(f"HTTP {response.status_code}: {response.text[:1000]}") + + data = response.json() + if not isinstance(data, list): + raise RuntimeError(f"Expected list response, got {type(data).__name__}") + return response.status_code, data, filter_expr + + +def insert_or_update_raw( + conn: sqlite3.Connection, + row: Dict[str, Any], + *, + quote_template_form_uuid: str, + now: str, +) -> Tuple[bool, bool]: + uuid = str(row.get("uuid") or "").strip() + if not uuid: + raise ValueError("Form response row missing uuid") + + is_quote = 1 if str(row.get("form_uuid") or "").strip() == quote_template_form_uuid else 0 + existing = conn.execute("SELECT uuid FROM form_responses_raw WHERE uuid = ?", (uuid,)).fetchone() + raw_json = json.dumps(row, ensure_ascii=False, sort_keys=True) + + values = ( + uuid, + now, + now, + row.get("active"), + row.get("edit_date"), + row.get("form_uuid"), + row.get("staff_uuid"), + row.get("regarding_object"), + row.get("regarding_object_uuid"), + row.get("timestamp"), + row.get("form_by_staff_uuid"), + row.get("document_attachment_uuid"), + row.get("asset_uuid"), + is_quote, + raw_json, + ) + + if existing: + conn.execute( + """ + UPDATE form_responses_raw + SET last_seen_at = ?, + seen_count = seen_count + 1, + active = ?, + edit_date = ?, + form_uuid = ?, + staff_uuid = ?, + regarding_object = ?, + regarding_object_uuid = ?, + timestamp = ?, + form_by_staff_uuid = ?, + document_attachment_uuid = ?, + asset_uuid = ?, + is_quote_template = ?, + raw_json = ? + WHERE uuid = ? + """, + ( + now, + row.get("active"), + row.get("edit_date"), + row.get("form_uuid"), + row.get("staff_uuid"), + row.get("regarding_object"), + row.get("regarding_object_uuid"), + row.get("timestamp"), + row.get("form_by_staff_uuid"), + row.get("document_attachment_uuid"), + row.get("asset_uuid"), + is_quote, + raw_json, + uuid, + ), + ) + return False, bool(is_quote) + + conn.execute( + """ + INSERT INTO form_responses_raw ( + uuid, first_seen_at, last_seen_at, active, edit_date, form_uuid, + staff_uuid, regarding_object, regarding_object_uuid, timestamp, + form_by_staff_uuid, document_attachment_uuid, asset_uuid, + is_quote_template, raw_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + values, + ) + return True, bool(is_quote) + + +def ensure_queue_writable(queue_path: Path) -> None: + queue_path.parent.mkdir(parents=True, exist_ok=True) + if queue_path.exists(): + with queue_path.open("a", encoding="utf-8"): + pass + else: + with queue_path.open("a", encoding="utf-8"): + pass + + +def queue_record(queue_path: Path, parsed: Dict[str, Any], discovered_at: str) -> None: + record = { + "queued_at": discovered_at, + "source": "formresponse_poll", + "form_uuid": parsed.get("form_uuid", ""), + "form_response_uuid": parsed.get("form_response_uuid", ""), + "job_uuid": parsed.get("job_uuid", ""), + "author_name": parsed.get("author_name", ""), + "description": parsed.get("description", ""), + "desired_job_materials": parsed.get("desired_job_materials", []), + } + with queue_path.open("a", encoding="utf-8") as fh: + fh.write(json.dumps(record, ensure_ascii=False) + "\n") + + +def parse_and_store_quote_response( + conn: sqlite3.Connection, + row: Dict[str, Any], + *, + queue_path: Path, + write_queue: bool, + discovered_at: str, +) -> bool: + uuid = str(row.get("uuid") or "").strip() + existing = conn.execute( + "SELECT form_response_uuid, parsed_json, queued_at FROM quote_template_form_responses WHERE form_response_uuid = ?", + (uuid,), + ).fetchone() + if existing: + if write_queue and not existing["queued_at"]: + parsed = json.loads(existing["parsed_json"]) + queue_record(queue_path, parsed, discovered_at) + conn.execute( + "UPDATE quote_template_form_responses SET queued_at = ? WHERE form_response_uuid = ?", + (discovered_at, uuid), + ) + return True + return False + + try: + parsed = parse_quote_template_form_response({"data": row}) + except Exception as exc: + conn.execute( + "UPDATE form_responses_raw SET parse_status = ?, parse_error = ? WHERE uuid = ?", + ("error", str(exc), uuid), + ) + return False + + parsed_json = json.dumps(parsed, ensure_ascii=False, sort_keys=True) + desired_json = json.dumps(parsed.get("desired_job_materials", []), ensure_ascii=False, sort_keys=True) + queued_at = discovered_at if write_queue else None + + conn.execute( + """ + INSERT INTO quote_template_form_responses ( + form_response_uuid, discovered_at, job_uuid, form_uuid, author_name, + description, desired_job_materials_json, parsed_json, queued_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + uuid, + discovered_at, + parsed.get("job_uuid", ""), + parsed.get("form_uuid", ""), + parsed.get("author_name", ""), + parsed.get("description", ""), + desired_json, + parsed_json, + queued_at, + ), + ) + conn.execute( + "UPDATE form_responses_raw SET parse_status = ?, parse_error = NULL WHERE uuid = ?", + ("parsed", uuid), + ) + + if write_queue: + queue_record(queue_path, parsed, discovered_at) + return True + + +def summarize_rows(rows: Iterable[Dict[str, Any]], limit: int) -> List[Dict[str, Any]]: + summary = [] + for row in list(rows)[:limit]: + field_data = row.get("field_data") + field_count: Optional[int] = None + if isinstance(field_data, str): + try: + parsed = json.loads(field_data) + if isinstance(parsed, list): + field_count = len(parsed) + except Exception: + field_count = None + summary.append( + { + "uuid": row.get("uuid"), + "timestamp": row.get("timestamp"), + "edit_date": row.get("edit_date"), + "form_uuid": row.get("form_uuid"), + "regarding_object": row.get("regarding_object"), + "regarding_object_uuid": row.get("regarding_object_uuid"), + "field_data_type": type(field_data).__name__, + "field_count": field_count, + } + ) + return summary + + +def main() -> int: + parser = argparse.ArgumentParser(description="Poll ServiceM8 form responses since a timestamp and store unseen Quote Template responses") + parser.add_argument("--since", required=True, help="Timestamp for ServiceM8 filter, e.g. '2026-05-04 10:00:00'") + parser.add_argument("--filter-field", default="timestamp", choices=("timestamp", "edit_date"), help="Field to apply ServiceM8 gt filter to") + parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Local SQLite DB for poll results") + parser.add_argument("--queue", default=str(DEFAULT_QUEUE_PATH), help="JSONL queue path for parsed Quote Template jobMaterials") + parser.add_argument("--quote-template-form-uuid", default=DEFAULT_QUOTE_TEMPLATE_FORM_UUID, help="Quote Template form UUID to process") + parser.add_argument("--base-url", default=DEFAULT_BASE_URL) + parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT) + parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_SECONDS) + parser.add_argument("--no-store", action="store_true", help="Fetch and print summary only; do not write DB/queue") + parser.add_argument("--no-queue", action="store_true", help="Store/parse matches but do not append to JSONL queue") + parser.add_argument("--dump-json", action="store_true", help="Print full fetched ServiceM8 JSON payload") + parser.add_argument("--summary-limit", type=int, default=10, help="Number of rows to include in summary output") + args = parser.parse_args() + + db_path = Path(args.db) + queue_path = Path(args.queue) + api_key = load_api_key() + filter_expr = f"{args.filter_field} gt '{args.since}'" + + conn: Optional[sqlite3.Connection] = None + run_id: Optional[int] = None + try: + if not args.no_store and not args.no_queue: + # Fail before DB work if the queue cannot be appended to. This avoids + # marking a response as queued when the JSONL append did not happen. + ensure_queue_writable(queue_path) + + if not args.no_store: + init_db(db_path) + conn = get_conn(db_path) + run_id = create_poll_run(conn, args.since, args.filter_field, filter_expr) + + http_status, rows, filter_expr = fetch_form_responses( + api_key=api_key, + base_url=args.base_url, + endpoint=args.endpoint, + filter_field=args.filter_field, + since=args.since, + timeout=args.timeout, + ) + + if args.dump_json: + print(json.dumps(rows, indent=2, ensure_ascii=False)) + + inserted = updated = quote_matches = newly_queued = 0 + now = utc_now() + if conn is not None: + for row in rows: + was_inserted, is_quote = insert_or_update_raw( + conn, + row, + quote_template_form_uuid=args.quote_template_form_uuid, + now=now, + ) + inserted += 1 if was_inserted else 0 + updated += 0 if was_inserted else 1 + if is_quote: + quote_matches += 1 + if parse_and_store_quote_response( + conn, + row, + queue_path=queue_path, + write_queue=not args.no_queue, + discovered_at=now, + ): + newly_queued += 1 + conn.commit() + if run_id is not None: + finish_poll_run( + conn, + run_id, + http_status=http_status, + fetched_count=len(rows), + inserted_count=inserted, + updated_count=updated, + quote_match_count=quote_matches, + newly_queued_count=newly_queued, + ) + + result = { + "ok": True, + "http_status": http_status, + "filter": filter_expr, + "fetched_count": len(rows), + "inserted_count": inserted, + "updated_count": updated, + "quote_template_form_uuid": args.quote_template_form_uuid, + "quote_match_count": quote_matches if conn is not None else sum(1 for r in rows if r.get("form_uuid") == args.quote_template_form_uuid), + "newly_queued_count": newly_queued, + "db_path": None if args.no_store else str(db_path), + "queue_path": None if args.no_queue or args.no_store else str(queue_path), + "sample": summarize_rows(rows, args.summary_limit), + } + print(json.dumps(result, indent=2, ensure_ascii=False)) + return 0 + + except Exception as exc: + if conn is not None and run_id is not None: + finish_poll_run(conn, run_id, error=str(exc)) + print(json.dumps({"ok": False, "error": str(exc), "filter": filter_expr}, indent=2), file=sys.stderr) + return 1 + finally: + if conn is not None: + conn.close() + + +if __name__ == "__main__": + raise SystemExit(main())