#!/usr/bin/env python3 """Poll ServiceM8 /formresponse.json for responses newer than a timestamp. This is the safety-net companion to the ServiceM8 form.response_created webhook. It stores every API hit in a local SQLite DB, then identifies previously unseen Quote Template form responses and parses them into desired jobMaterial rows using servicem8_quote_template_parser.py. It does not write anything back to ServiceM8. """ from __future__ import annotations import argparse import importlib.util import json import os import sqlite3 import sys from contextlib import closing from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple import requests from servicem8_quote_template_parser import ( QUOTE_TEMPLATE_FORM_UUID as PARSER_QUOTE_TEMPLATE_FORM_UUID, parse_quote_template_form_response, ) SCRIPT_DIR = Path(__file__).resolve().parent DEFAULT_DB_PATH = SCRIPT_DIR / "servicem8_formresponse_poll.db" DEFAULT_QUEUE_PATH = SCRIPT_DIR / "quote-template-jobmaterials-poll-queue.jsonl" DEFAULT_BASE_URL = "https://api.servicem8.com/api_1.0" DEFAULT_ENDPOINT = "/formresponse.json" DEFAULT_TIMEOUT_SECONDS = 30 # Current Quote Template form UUID. Kept as a CLI default so we can adjust without # editing code if ServiceM8 form/template IDs change again. DEFAULT_QUOTE_TEMPLATE_FORM_UUID = os.getenv( "SERVICEM8_QUOTE_TEMPLATE_FORM_UUID", PARSER_QUOTE_TEMPLATE_FORM_UUID, ) def utc_now() -> str: return datetime.now(timezone.utc).isoformat() def load_api_key() -> str: """Load API key from env, falling back to the existing local helper script. Prefer env vars for any new deployment. The fallback keeps this project script runnable in the current /opt/webhooks setup without copying credentials. """ for name in ("SERVICEM8_ACCESS_TOKEN", "SERVICEM8_API_KEY"): value = os.getenv(name) if value: return value fallback = SCRIPT_DIR / "servicem8-list-webhook-subscriptions-table.py" if fallback.exists(): spec = importlib.util.spec_from_file_location("servicem8_subs", fallback) if spec and spec.loader: module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # type: ignore[union-attr] value = getattr(module, "ACCESS_TOKEN", None) if value: return str(value) raise RuntimeError("Missing SERVICEM8_ACCESS_TOKEN or SERVICEM8_API_KEY") def get_conn(db_path: Path) -> sqlite3.Connection: conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row return conn def init_db(db_path: Path) -> None: db_path.parent.mkdir(parents=True, exist_ok=True) with closing(get_conn(db_path)) as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS poll_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, started_at TEXT NOT NULL, finished_at TEXT, since_value TEXT NOT NULL, filter_field TEXT NOT NULL, filter_expression TEXT NOT NULL, http_status INTEGER, fetched_count INTEGER DEFAULT 0, inserted_count INTEGER DEFAULT 0, updated_count INTEGER DEFAULT 0, quote_match_count INTEGER DEFAULT 0, newly_queued_count INTEGER DEFAULT 0, error TEXT ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS form_responses_raw ( uuid TEXT PRIMARY KEY, first_seen_at TEXT NOT NULL, last_seen_at TEXT NOT NULL, seen_count INTEGER NOT NULL DEFAULT 1, active INTEGER, edit_date TEXT, form_uuid TEXT, staff_uuid TEXT, regarding_object TEXT, regarding_object_uuid TEXT, timestamp TEXT, form_by_staff_uuid TEXT, document_attachment_uuid TEXT, asset_uuid TEXT, is_quote_template INTEGER NOT NULL DEFAULT 0, parse_status TEXT, parse_error TEXT, raw_json TEXT NOT NULL ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_timestamp ON form_responses_raw(timestamp)") conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_edit_date ON form_responses_raw(edit_date)") conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_form_uuid ON form_responses_raw(form_uuid)") conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_quote ON form_responses_raw(is_quote_template, parse_status)") conn.execute( """ CREATE TABLE IF NOT EXISTS quote_template_form_responses ( form_response_uuid TEXT PRIMARY KEY, discovered_at TEXT NOT NULL, job_uuid TEXT, form_uuid TEXT NOT NULL, author_name TEXT, description TEXT, desired_job_materials_json TEXT NOT NULL, parsed_json TEXT NOT NULL, queued_at TEXT, processed_at TEXT, process_status TEXT NOT NULL DEFAULT 'parsed', process_error TEXT ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS job_metadata ( job_uuid TEXT PRIMARY KEY, generated_job_id TEXT, job_address TEXT, company_name TEXT, raw_json TEXT NOT NULL, first_seen_at TEXT NOT NULL, last_seen_at TEXT NOT NULL, source TEXT NOT NULL ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_job_metadata_generated_job_id ON job_metadata(generated_job_id)") conn.commit() def create_poll_run(conn: sqlite3.Connection, since: str, filter_field: str, filter_expr: str) -> int: cur = conn.execute( """ INSERT INTO poll_runs (started_at, since_value, filter_field, filter_expression) VALUES (?, ?, ?, ?) """, (utc_now(), since, filter_field, filter_expr), ) conn.commit() return int(cur.lastrowid) def finish_poll_run(conn: sqlite3.Connection, run_id: int, **values: Any) -> None: allowed = { "finished_at", "http_status", "fetched_count", "inserted_count", "updated_count", "quote_match_count", "newly_queued_count", "error", } values.setdefault("finished_at", utc_now()) pairs = [(k, v) for k, v in values.items() if k in allowed] if not pairs: return set_sql = ", ".join(f"{k} = ?" for k, _ in pairs) params = [v for _, v in pairs] + [run_id] conn.execute(f"UPDATE poll_runs SET {set_sql} WHERE id = ?", params) conn.commit() def fetch_form_responses( *, api_key: str, base_url: str, endpoint: str, filter_field: str, since: str, timeout: int, ) -> Tuple[int, List[Dict[str, Any]], str]: # ServiceM8 filter syntax confirmed against the live API: # ?$filter=timestamp gt '2026-05-04 00:00:00' filter_expr = f"{filter_field} gt '{since}'" url = f"{base_url.rstrip('/')}{endpoint}" headers = {"X-Api-Key": api_key, "Accept": "application/json"} response = requests.get(url, headers=headers, params={"$filter": filter_expr}, timeout=timeout) if not response.ok: raise RuntimeError(f"HTTP {response.status_code}: {response.text[:1000]}") data = response.json() if not isinstance(data, list): raise RuntimeError(f"Expected list response, got {type(data).__name__}") return response.status_code, data, filter_expr def retrieve_job( *, api_key: str, base_url: str, job_uuid: str, timeout: int, ) -> Dict[str, Any]: response = requests.get( f"{base_url.rstrip('/')}/job/{job_uuid}.json", headers={"X-Api-Key": api_key, "Accept": "application/json"}, timeout=timeout, ) if not response.ok: raise RuntimeError(f"Job retrieve failed for {job_uuid}: HTTP {response.status_code}: {response.text[:1000]}") data = response.json() if not isinstance(data, dict): raise RuntimeError(f"Job retrieve expected object response, got {type(data).__name__}") return data def clean_text(value: Any) -> str: if value is None: return "" return str(value).replace("\r\n", "\n").replace("\r", "\n").strip() def first_text(*values: Any) -> str: for value in values: text = clean_text(value) if text: return text return "" def format_job_address(job: Dict[str, Any]) -> str: direct = first_text( job.get("job_address"), job.get("site_address"), job.get("address"), job.get("location_address"), job.get("billing_address"), ) if direct: return direct parts = [ first_text(job.get("street"), job.get("street_address"), job.get("address_1"), job.get("address1")), first_text(job.get("suburb"), job.get("city")), first_text(job.get("state")), first_text(job.get("postcode"), job.get("postal_code"), job.get("zip")), ] return " ".join(part for part in parts if part) def extract_company_name(job: Dict[str, Any]) -> str: related = job.get("related") if isinstance(related, dict): company = related.get("company") if isinstance(company, dict): company_name = clean_text(company.get("name")) if company_name: return company_name company = job.get("company") if isinstance(company, dict): company_name = clean_text(company.get("name")) if company_name: return company_name return first_text(job.get("company_name"), job.get("customer_name")) def upsert_job_metadata(conn: sqlite3.Connection, *, job_uuid: str, job: Dict[str, Any], now: str, source: str) -> None: job_uuid = clean_text(job_uuid or job.get("uuid")) if not job_uuid: return values = ( job_uuid, clean_text(job.get("generated_job_id")), format_job_address(job), extract_company_name(job), json.dumps(job, ensure_ascii=False, sort_keys=True), now, now, source, ) conn.execute( """ INSERT INTO job_metadata ( job_uuid, generated_job_id, job_address, company_name, raw_json, first_seen_at, last_seen_at, source ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(job_uuid) DO UPDATE SET generated_job_id = excluded.generated_job_id, job_address = excluded.job_address, company_name = excluded.company_name, raw_json = excluded.raw_json, last_seen_at = excluded.last_seen_at, source = excluded.source """, values, ) def insert_or_update_raw( conn: sqlite3.Connection, row: Dict[str, Any], *, quote_template_form_uuid: str, now: str, ) -> Tuple[bool, bool]: uuid = str(row.get("uuid") or "").strip() if not uuid: raise ValueError("Form response row missing uuid") is_quote = 1 if str(row.get("form_uuid") or "").strip() == quote_template_form_uuid else 0 existing = conn.execute("SELECT uuid FROM form_responses_raw WHERE uuid = ?", (uuid,)).fetchone() raw_json = json.dumps(row, ensure_ascii=False, sort_keys=True) values = ( uuid, now, now, row.get("active"), row.get("edit_date"), row.get("form_uuid"), row.get("staff_uuid"), row.get("regarding_object"), row.get("regarding_object_uuid"), row.get("timestamp"), row.get("form_by_staff_uuid"), row.get("document_attachment_uuid"), row.get("asset_uuid"), is_quote, raw_json, ) if existing: conn.execute( """ UPDATE form_responses_raw SET last_seen_at = ?, seen_count = seen_count + 1, active = ?, edit_date = ?, form_uuid = ?, staff_uuid = ?, regarding_object = ?, regarding_object_uuid = ?, timestamp = ?, form_by_staff_uuid = ?, document_attachment_uuid = ?, asset_uuid = ?, is_quote_template = ?, raw_json = ? WHERE uuid = ? """, ( now, row.get("active"), row.get("edit_date"), row.get("form_uuid"), row.get("staff_uuid"), row.get("regarding_object"), row.get("regarding_object_uuid"), row.get("timestamp"), row.get("form_by_staff_uuid"), row.get("document_attachment_uuid"), row.get("asset_uuid"), is_quote, raw_json, uuid, ), ) return False, bool(is_quote) conn.execute( """ INSERT INTO form_responses_raw ( uuid, first_seen_at, last_seen_at, active, edit_date, form_uuid, staff_uuid, regarding_object, regarding_object_uuid, timestamp, form_by_staff_uuid, document_attachment_uuid, asset_uuid, is_quote_template, raw_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, values, ) return True, bool(is_quote) def ensure_queue_writable(queue_path: Path) -> None: queue_path.parent.mkdir(parents=True, exist_ok=True) if queue_path.exists(): with queue_path.open("a", encoding="utf-8"): pass else: with queue_path.open("a", encoding="utf-8"): pass def queue_record(queue_path: Path, parsed: Dict[str, Any], discovered_at: str) -> None: record = { "queued_at": discovered_at, "source": "formresponse_poll", "form_uuid": parsed.get("form_uuid", ""), "form_response_uuid": parsed.get("form_response_uuid", ""), "job_uuid": parsed.get("job_uuid", ""), "author_name": parsed.get("author_name", ""), "description": parsed.get("description", ""), "desired_job_materials": parsed.get("desired_job_materials", []), } with queue_path.open("a", encoding="utf-8") as fh: fh.write(json.dumps(record, ensure_ascii=False) + "\n") def parse_and_store_quote_response( conn: sqlite3.Connection, row: Dict[str, Any], *, queue_path: Path, write_queue: bool, discovered_at: str, ) -> bool: uuid = str(row.get("uuid") or "").strip() existing = conn.execute( "SELECT form_response_uuid, parsed_json, queued_at FROM quote_template_form_responses WHERE form_response_uuid = ?", (uuid,), ).fetchone() if existing: if write_queue and not existing["queued_at"]: parsed = json.loads(existing["parsed_json"]) queue_record(queue_path, parsed, discovered_at) conn.execute( "UPDATE quote_template_form_responses SET queued_at = ? WHERE form_response_uuid = ?", (discovered_at, uuid), ) return True return False try: parsed = parse_quote_template_form_response({"data": row}) except Exception as exc: conn.execute( "UPDATE form_responses_raw SET parse_status = ?, parse_error = ? WHERE uuid = ?", ("error", str(exc), uuid), ) return False parsed_json = json.dumps(parsed, ensure_ascii=False, sort_keys=True) desired_json = json.dumps(parsed.get("desired_job_materials", []), ensure_ascii=False, sort_keys=True) queued_at = discovered_at if write_queue else None conn.execute( """ INSERT INTO quote_template_form_responses ( form_response_uuid, discovered_at, job_uuid, form_uuid, author_name, description, desired_job_materials_json, parsed_json, queued_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( uuid, discovered_at, parsed.get("job_uuid", ""), parsed.get("form_uuid", ""), parsed.get("author_name", ""), parsed.get("description", ""), desired_json, parsed_json, queued_at, ), ) conn.execute( "UPDATE form_responses_raw SET parse_status = ?, parse_error = NULL WHERE uuid = ?", ("parsed", uuid), ) if write_queue: queue_record(queue_path, parsed, discovered_at) return True def summarize_rows(rows: Iterable[Dict[str, Any]], limit: int) -> List[Dict[str, Any]]: summary = [] for row in list(rows)[:limit]: field_data = row.get("field_data") field_count: Optional[int] = None if isinstance(field_data, str): try: parsed = json.loads(field_data) if isinstance(parsed, list): field_count = len(parsed) except Exception: field_count = None summary.append( { "uuid": row.get("uuid"), "timestamp": row.get("timestamp"), "edit_date": row.get("edit_date"), "form_uuid": row.get("form_uuid"), "regarding_object": row.get("regarding_object"), "regarding_object_uuid": row.get("regarding_object_uuid"), "field_data_type": type(field_data).__name__, "field_count": field_count, } ) return summary def main() -> int: parser = argparse.ArgumentParser(description="Poll ServiceM8 form responses since a timestamp and store unseen Quote Template responses") parser.add_argument("--since", required=True, help="Timestamp for ServiceM8 filter, e.g. '2026-05-04 10:00:00'") parser.add_argument("--filter-field", default="timestamp", choices=("timestamp", "edit_date"), help="Field to apply ServiceM8 gt filter to") parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Local SQLite DB for poll results") parser.add_argument("--queue", default=str(DEFAULT_QUEUE_PATH), help="JSONL queue path for parsed Quote Template jobMaterials") parser.add_argument("--quote-template-form-uuid", default=DEFAULT_QUOTE_TEMPLATE_FORM_UUID, help="Quote Template form UUID to process") parser.add_argument("--base-url", default=DEFAULT_BASE_URL) parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT) parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_SECONDS) parser.add_argument("--no-store", action="store_true", help="Fetch and print summary only; do not write DB/queue") parser.add_argument("--no-queue", action="store_true", help="Store/parse matches but do not append to JSONL queue") parser.add_argument("--dump-json", action="store_true", help="Print full fetched ServiceM8 JSON payload") parser.add_argument("--summary-limit", type=int, default=10, help="Number of rows to include in summary output") args = parser.parse_args() db_path = Path(args.db) queue_path = Path(args.queue) api_key = load_api_key() filter_expr = f"{args.filter_field} gt '{args.since}'" conn: Optional[sqlite3.Connection] = None run_id: Optional[int] = None try: if not args.no_store and not args.no_queue: # Fail before DB work if the queue cannot be appended to. This avoids # marking a response as queued when the JSONL append did not happen. ensure_queue_writable(queue_path) if not args.no_store: init_db(db_path) conn = get_conn(db_path) run_id = create_poll_run(conn, args.since, args.filter_field, filter_expr) http_status, rows, filter_expr = fetch_form_responses( api_key=api_key, base_url=args.base_url, endpoint=args.endpoint, filter_field=args.filter_field, since=args.since, timeout=args.timeout, ) if args.dump_json: print(json.dumps(rows, indent=2, ensure_ascii=False)) inserted = updated = quote_matches = newly_queued = 0 now = utc_now() fetched_job_uuids = set() if conn is not None: for row in rows: was_inserted, is_quote = insert_or_update_raw( conn, row, quote_template_form_uuid=args.quote_template_form_uuid, now=now, ) inserted += 1 if was_inserted else 0 updated += 0 if was_inserted else 1 if is_quote: quote_matches += 1 job_uuid = clean_text(row.get("regarding_object_uuid")) if job_uuid and job_uuid not in fetched_job_uuids: try: job = retrieve_job( api_key=api_key, base_url=args.base_url, job_uuid=job_uuid, timeout=args.timeout, ) upsert_job_metadata(conn, job_uuid=job_uuid, job=job, now=now, source="formresponse_poll") fetched_job_uuids.add(job_uuid) except Exception as exc: # Polling/parsing should still proceed if job metadata enrichment fails. print( json.dumps( {"warning": "job_metadata_fetch_failed", "job_uuid": job_uuid, "error": str(exc)}, ensure_ascii=False, ), file=sys.stderr, ) if parse_and_store_quote_response( conn, row, queue_path=queue_path, write_queue=not args.no_queue, discovered_at=now, ): newly_queued += 1 conn.commit() if run_id is not None: finish_poll_run( conn, run_id, http_status=http_status, fetched_count=len(rows), inserted_count=inserted, updated_count=updated, quote_match_count=quote_matches, newly_queued_count=newly_queued, ) result = { "ok": True, "http_status": http_status, "filter": filter_expr, "fetched_count": len(rows), "inserted_count": inserted, "updated_count": updated, "quote_template_form_uuid": args.quote_template_form_uuid, "quote_match_count": quote_matches if conn is not None else sum(1 for r in rows if r.get("form_uuid") == args.quote_template_form_uuid), "newly_queued_count": newly_queued, "db_path": None if args.no_store else str(db_path), "queue_path": None if args.no_queue or args.no_store else str(queue_path), "sample": summarize_rows(rows, args.summary_limit), } print(json.dumps(result, indent=2, ensure_ascii=False)) return 0 except Exception as exc: if conn is not None and run_id is not None: finish_poll_run(conn, run_id, error=str(exc)) print(json.dumps({"ok": False, "error": str(exc), "filter": filter_expr}, indent=2), file=sys.stderr) return 1 finally: if conn is not None: conn.close() if __name__ == "__main__": raise SystemExit(main())