Files
plumbing/poll_form_responses_since.py

661 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
"""Poll ServiceM8 /formresponse.json for responses newer than a timestamp.
This is the safety-net companion to the ServiceM8 form.response_created webhook.
It stores every API hit in a local SQLite DB, then identifies previously unseen
Quote Template form responses and parses them into desired jobMaterial rows using
servicem8_quote_template_parser.py. It does not write anything back to ServiceM8.
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import os
import sqlite3
import sys
from contextlib import closing
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
import requests
from servicem8_quote_template_parser import (
QUOTE_TEMPLATE_FORM_UUID as PARSER_QUOTE_TEMPLATE_FORM_UUID,
parse_quote_template_form_response,
)
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_DB_PATH = SCRIPT_DIR / "servicem8_formresponse_poll.db"
DEFAULT_QUEUE_PATH = SCRIPT_DIR / "quote-template-jobmaterials-poll-queue.jsonl"
DEFAULT_BASE_URL = "https://api.servicem8.com/api_1.0"
DEFAULT_ENDPOINT = "/formresponse.json"
DEFAULT_TIMEOUT_SECONDS = 30
# Current Quote Template form UUID. Kept as a CLI default so we can adjust without
# editing code if ServiceM8 form/template IDs change again.
DEFAULT_QUOTE_TEMPLATE_FORM_UUID = os.getenv(
"SERVICEM8_QUOTE_TEMPLATE_FORM_UUID",
PARSER_QUOTE_TEMPLATE_FORM_UUID,
)
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def load_api_key() -> str:
"""Load API key from env, falling back to the existing local helper script.
Prefer env vars for any new deployment. The fallback keeps this project script
runnable in the current /opt/webhooks setup without copying credentials.
"""
for name in ("SERVICEM8_ACCESS_TOKEN", "SERVICEM8_API_KEY"):
value = os.getenv(name)
if value:
return value
fallback = SCRIPT_DIR / "servicem8-list-webhook-subscriptions-table.py"
if fallback.exists():
spec = importlib.util.spec_from_file_location("servicem8_subs", fallback)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) # type: ignore[union-attr]
value = getattr(module, "ACCESS_TOKEN", None)
if value:
return str(value)
raise RuntimeError("Missing SERVICEM8_ACCESS_TOKEN or SERVICEM8_API_KEY")
def get_conn(db_path: Path) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
def init_db(db_path: Path) -> None:
db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(get_conn(db_path)) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS poll_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at TEXT NOT NULL,
finished_at TEXT,
since_value TEXT NOT NULL,
filter_field TEXT NOT NULL,
filter_expression TEXT NOT NULL,
http_status INTEGER,
fetched_count INTEGER DEFAULT 0,
inserted_count INTEGER DEFAULT 0,
updated_count INTEGER DEFAULT 0,
quote_match_count INTEGER DEFAULT 0,
newly_queued_count INTEGER DEFAULT 0,
error TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS form_responses_raw (
uuid TEXT PRIMARY KEY,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
seen_count INTEGER NOT NULL DEFAULT 1,
active INTEGER,
edit_date TEXT,
form_uuid TEXT,
staff_uuid TEXT,
regarding_object TEXT,
regarding_object_uuid TEXT,
timestamp TEXT,
form_by_staff_uuid TEXT,
document_attachment_uuid TEXT,
asset_uuid TEXT,
is_quote_template INTEGER NOT NULL DEFAULT 0,
parse_status TEXT,
parse_error TEXT,
raw_json TEXT NOT NULL
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_timestamp ON form_responses_raw(timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_edit_date ON form_responses_raw(edit_date)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_form_uuid ON form_responses_raw(form_uuid)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_form_responses_raw_quote ON form_responses_raw(is_quote_template, parse_status)")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS quote_template_form_responses (
form_response_uuid TEXT PRIMARY KEY,
discovered_at TEXT NOT NULL,
job_uuid TEXT,
form_uuid TEXT NOT NULL,
author_name TEXT,
description TEXT,
desired_job_materials_json TEXT NOT NULL,
parsed_json TEXT NOT NULL,
queued_at TEXT,
processed_at TEXT,
process_status TEXT NOT NULL DEFAULT 'parsed',
process_error TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS job_metadata (
job_uuid TEXT PRIMARY KEY,
generated_job_id TEXT,
job_address TEXT,
company_name TEXT,
raw_json TEXT NOT NULL,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
source TEXT NOT NULL
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_job_metadata_generated_job_id ON job_metadata(generated_job_id)")
conn.commit()
def create_poll_run(conn: sqlite3.Connection, since: str, filter_field: str, filter_expr: str) -> int:
cur = conn.execute(
"""
INSERT INTO poll_runs (started_at, since_value, filter_field, filter_expression)
VALUES (?, ?, ?, ?)
""",
(utc_now(), since, filter_field, filter_expr),
)
conn.commit()
return int(cur.lastrowid)
def finish_poll_run(conn: sqlite3.Connection, run_id: int, **values: Any) -> None:
allowed = {
"finished_at",
"http_status",
"fetched_count",
"inserted_count",
"updated_count",
"quote_match_count",
"newly_queued_count",
"error",
}
values.setdefault("finished_at", utc_now())
pairs = [(k, v) for k, v in values.items() if k in allowed]
if not pairs:
return
set_sql = ", ".join(f"{k} = ?" for k, _ in pairs)
params = [v for _, v in pairs] + [run_id]
conn.execute(f"UPDATE poll_runs SET {set_sql} WHERE id = ?", params)
conn.commit()
def fetch_form_responses(
*,
api_key: str,
base_url: str,
endpoint: str,
filter_field: str,
since: str,
timeout: int,
) -> Tuple[int, List[Dict[str, Any]], str]:
# ServiceM8 filter syntax confirmed against the live API:
# ?$filter=timestamp gt '2026-05-04 00:00:00'
filter_expr = f"{filter_field} gt '{since}'"
url = f"{base_url.rstrip('/')}{endpoint}"
headers = {"X-Api-Key": api_key, "Accept": "application/json"}
response = requests.get(url, headers=headers, params={"$filter": filter_expr}, timeout=timeout)
if not response.ok:
raise RuntimeError(f"HTTP {response.status_code}: {response.text[:1000]}")
data = response.json()
if not isinstance(data, list):
raise RuntimeError(f"Expected list response, got {type(data).__name__}")
return response.status_code, data, filter_expr
def retrieve_job(
*,
api_key: str,
base_url: str,
job_uuid: str,
timeout: int,
) -> Dict[str, Any]:
response = requests.get(
f"{base_url.rstrip('/')}/job/{job_uuid}.json",
headers={"X-Api-Key": api_key, "Accept": "application/json"},
timeout=timeout,
)
if not response.ok:
raise RuntimeError(f"Job retrieve failed for {job_uuid}: HTTP {response.status_code}: {response.text[:1000]}")
data = response.json()
if not isinstance(data, dict):
raise RuntimeError(f"Job retrieve expected object response, got {type(data).__name__}")
return data
def clean_text(value: Any) -> str:
if value is None:
return ""
return str(value).replace("\r\n", "\n").replace("\r", "\n").strip()
def first_text(*values: Any) -> str:
for value in values:
text = clean_text(value)
if text:
return text
return ""
def format_job_address(job: Dict[str, Any]) -> str:
direct = first_text(
job.get("job_address"),
job.get("site_address"),
job.get("address"),
job.get("location_address"),
job.get("billing_address"),
)
if direct:
return direct
parts = [
first_text(job.get("street"), job.get("street_address"), job.get("address_1"), job.get("address1")),
first_text(job.get("suburb"), job.get("city")),
first_text(job.get("state")),
first_text(job.get("postcode"), job.get("postal_code"), job.get("zip")),
]
return " ".join(part for part in parts if part)
def extract_company_name(job: Dict[str, Any]) -> str:
related = job.get("related")
if isinstance(related, dict):
company = related.get("company")
if isinstance(company, dict):
company_name = clean_text(company.get("name"))
if company_name:
return company_name
company = job.get("company")
if isinstance(company, dict):
company_name = clean_text(company.get("name"))
if company_name:
return company_name
return first_text(job.get("company_name"), job.get("customer_name"))
def upsert_job_metadata(conn: sqlite3.Connection, *, job_uuid: str, job: Dict[str, Any], now: str, source: str) -> None:
job_uuid = clean_text(job_uuid or job.get("uuid"))
if not job_uuid:
return
values = (
job_uuid,
clean_text(job.get("generated_job_id")),
format_job_address(job),
extract_company_name(job),
json.dumps(job, ensure_ascii=False, sort_keys=True),
now,
now,
source,
)
conn.execute(
"""
INSERT INTO job_metadata (
job_uuid, generated_job_id, job_address, company_name, raw_json,
first_seen_at, last_seen_at, source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(job_uuid) DO UPDATE SET
generated_job_id = excluded.generated_job_id,
job_address = excluded.job_address,
company_name = excluded.company_name,
raw_json = excluded.raw_json,
last_seen_at = excluded.last_seen_at,
source = excluded.source
""",
values,
)
def insert_or_update_raw(
conn: sqlite3.Connection,
row: Dict[str, Any],
*,
quote_template_form_uuid: str,
now: str,
) -> Tuple[bool, bool]:
uuid = str(row.get("uuid") or "").strip()
if not uuid:
raise ValueError("Form response row missing uuid")
is_quote = 1 if str(row.get("form_uuid") or "").strip() == quote_template_form_uuid else 0
existing = conn.execute("SELECT uuid FROM form_responses_raw WHERE uuid = ?", (uuid,)).fetchone()
raw_json = json.dumps(row, ensure_ascii=False, sort_keys=True)
values = (
uuid,
now,
now,
row.get("active"),
row.get("edit_date"),
row.get("form_uuid"),
row.get("staff_uuid"),
row.get("regarding_object"),
row.get("regarding_object_uuid"),
row.get("timestamp"),
row.get("form_by_staff_uuid"),
row.get("document_attachment_uuid"),
row.get("asset_uuid"),
is_quote,
raw_json,
)
if existing:
conn.execute(
"""
UPDATE form_responses_raw
SET last_seen_at = ?,
seen_count = seen_count + 1,
active = ?,
edit_date = ?,
form_uuid = ?,
staff_uuid = ?,
regarding_object = ?,
regarding_object_uuid = ?,
timestamp = ?,
form_by_staff_uuid = ?,
document_attachment_uuid = ?,
asset_uuid = ?,
is_quote_template = ?,
raw_json = ?
WHERE uuid = ?
""",
(
now,
row.get("active"),
row.get("edit_date"),
row.get("form_uuid"),
row.get("staff_uuid"),
row.get("regarding_object"),
row.get("regarding_object_uuid"),
row.get("timestamp"),
row.get("form_by_staff_uuid"),
row.get("document_attachment_uuid"),
row.get("asset_uuid"),
is_quote,
raw_json,
uuid,
),
)
return False, bool(is_quote)
conn.execute(
"""
INSERT INTO form_responses_raw (
uuid, first_seen_at, last_seen_at, active, edit_date, form_uuid,
staff_uuid, regarding_object, regarding_object_uuid, timestamp,
form_by_staff_uuid, document_attachment_uuid, asset_uuid,
is_quote_template, raw_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
values,
)
return True, bool(is_quote)
def ensure_queue_writable(queue_path: Path) -> None:
queue_path.parent.mkdir(parents=True, exist_ok=True)
if queue_path.exists():
with queue_path.open("a", encoding="utf-8"):
pass
else:
with queue_path.open("a", encoding="utf-8"):
pass
def queue_record(queue_path: Path, parsed: Dict[str, Any], discovered_at: str) -> None:
record = {
"queued_at": discovered_at,
"source": "formresponse_poll",
"form_uuid": parsed.get("form_uuid", ""),
"form_response_uuid": parsed.get("form_response_uuid", ""),
"job_uuid": parsed.get("job_uuid", ""),
"author_name": parsed.get("author_name", ""),
"description": parsed.get("description", ""),
"desired_job_materials": parsed.get("desired_job_materials", []),
}
with queue_path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record, ensure_ascii=False) + "\n")
def parse_and_store_quote_response(
conn: sqlite3.Connection,
row: Dict[str, Any],
*,
queue_path: Path,
write_queue: bool,
discovered_at: str,
) -> bool:
uuid = str(row.get("uuid") or "").strip()
existing = conn.execute(
"SELECT form_response_uuid, parsed_json, queued_at FROM quote_template_form_responses WHERE form_response_uuid = ?",
(uuid,),
).fetchone()
if existing:
if write_queue and not existing["queued_at"]:
parsed = json.loads(existing["parsed_json"])
queue_record(queue_path, parsed, discovered_at)
conn.execute(
"UPDATE quote_template_form_responses SET queued_at = ? WHERE form_response_uuid = ?",
(discovered_at, uuid),
)
return True
return False
try:
parsed = parse_quote_template_form_response({"data": row})
except Exception as exc:
conn.execute(
"UPDATE form_responses_raw SET parse_status = ?, parse_error = ? WHERE uuid = ?",
("error", str(exc), uuid),
)
return False
parsed_json = json.dumps(parsed, ensure_ascii=False, sort_keys=True)
desired_json = json.dumps(parsed.get("desired_job_materials", []), ensure_ascii=False, sort_keys=True)
queued_at = discovered_at if write_queue else None
conn.execute(
"""
INSERT INTO quote_template_form_responses (
form_response_uuid, discovered_at, job_uuid, form_uuid, author_name,
description, desired_job_materials_json, parsed_json, queued_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
uuid,
discovered_at,
parsed.get("job_uuid", ""),
parsed.get("form_uuid", ""),
parsed.get("author_name", ""),
parsed.get("description", ""),
desired_json,
parsed_json,
queued_at,
),
)
conn.execute(
"UPDATE form_responses_raw SET parse_status = ?, parse_error = NULL WHERE uuid = ?",
("parsed", uuid),
)
if write_queue:
queue_record(queue_path, parsed, discovered_at)
return True
def summarize_rows(rows: Iterable[Dict[str, Any]], limit: int) -> List[Dict[str, Any]]:
summary = []
for row in list(rows)[:limit]:
field_data = row.get("field_data")
field_count: Optional[int] = None
if isinstance(field_data, str):
try:
parsed = json.loads(field_data)
if isinstance(parsed, list):
field_count = len(parsed)
except Exception:
field_count = None
summary.append(
{
"uuid": row.get("uuid"),
"timestamp": row.get("timestamp"),
"edit_date": row.get("edit_date"),
"form_uuid": row.get("form_uuid"),
"regarding_object": row.get("regarding_object"),
"regarding_object_uuid": row.get("regarding_object_uuid"),
"field_data_type": type(field_data).__name__,
"field_count": field_count,
}
)
return summary
def main() -> int:
parser = argparse.ArgumentParser(description="Poll ServiceM8 form responses since a timestamp and store unseen Quote Template responses")
parser.add_argument("--since", required=True, help="Timestamp for ServiceM8 filter, e.g. '2026-05-04 10:00:00'")
parser.add_argument("--filter-field", default="timestamp", choices=("timestamp", "edit_date"), help="Field to apply ServiceM8 gt filter to")
parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Local SQLite DB for poll results")
parser.add_argument("--queue", default=str(DEFAULT_QUEUE_PATH), help="JSONL queue path for parsed Quote Template jobMaterials")
parser.add_argument("--quote-template-form-uuid", default=DEFAULT_QUOTE_TEMPLATE_FORM_UUID, help="Quote Template form UUID to process")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT)
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_SECONDS)
parser.add_argument("--no-store", action="store_true", help="Fetch and print summary only; do not write DB/queue")
parser.add_argument("--no-queue", action="store_true", help="Store/parse matches but do not append to JSONL queue")
parser.add_argument("--dump-json", action="store_true", help="Print full fetched ServiceM8 JSON payload")
parser.add_argument("--summary-limit", type=int, default=10, help="Number of rows to include in summary output")
args = parser.parse_args()
db_path = Path(args.db)
queue_path = Path(args.queue)
api_key = load_api_key()
filter_expr = f"{args.filter_field} gt '{args.since}'"
conn: Optional[sqlite3.Connection] = None
run_id: Optional[int] = None
try:
if not args.no_store and not args.no_queue:
# Fail before DB work if the queue cannot be appended to. This avoids
# marking a response as queued when the JSONL append did not happen.
ensure_queue_writable(queue_path)
if not args.no_store:
init_db(db_path)
conn = get_conn(db_path)
run_id = create_poll_run(conn, args.since, args.filter_field, filter_expr)
http_status, rows, filter_expr = fetch_form_responses(
api_key=api_key,
base_url=args.base_url,
endpoint=args.endpoint,
filter_field=args.filter_field,
since=args.since,
timeout=args.timeout,
)
if args.dump_json:
print(json.dumps(rows, indent=2, ensure_ascii=False))
inserted = updated = quote_matches = newly_queued = 0
now = utc_now()
fetched_job_uuids = set()
if conn is not None:
for row in rows:
was_inserted, is_quote = insert_or_update_raw(
conn,
row,
quote_template_form_uuid=args.quote_template_form_uuid,
now=now,
)
inserted += 1 if was_inserted else 0
updated += 0 if was_inserted else 1
if is_quote:
quote_matches += 1
job_uuid = clean_text(row.get("regarding_object_uuid"))
if job_uuid and job_uuid not in fetched_job_uuids:
try:
job = retrieve_job(
api_key=api_key,
base_url=args.base_url,
job_uuid=job_uuid,
timeout=args.timeout,
)
upsert_job_metadata(conn, job_uuid=job_uuid, job=job, now=now, source="formresponse_poll")
fetched_job_uuids.add(job_uuid)
except Exception as exc:
# Polling/parsing should still proceed if job metadata enrichment fails.
print(
json.dumps(
{"warning": "job_metadata_fetch_failed", "job_uuid": job_uuid, "error": str(exc)},
ensure_ascii=False,
),
file=sys.stderr,
)
if parse_and_store_quote_response(
conn,
row,
queue_path=queue_path,
write_queue=not args.no_queue,
discovered_at=now,
):
newly_queued += 1
conn.commit()
if run_id is not None:
finish_poll_run(
conn,
run_id,
http_status=http_status,
fetched_count=len(rows),
inserted_count=inserted,
updated_count=updated,
quote_match_count=quote_matches,
newly_queued_count=newly_queued,
)
result = {
"ok": True,
"http_status": http_status,
"filter": filter_expr,
"fetched_count": len(rows),
"inserted_count": inserted,
"updated_count": updated,
"quote_template_form_uuid": args.quote_template_form_uuid,
"quote_match_count": quote_matches if conn is not None else sum(1 for r in rows if r.get("form_uuid") == args.quote_template_form_uuid),
"newly_queued_count": newly_queued,
"db_path": None if args.no_store else str(db_path),
"queue_path": None if args.no_queue or args.no_store else str(queue_path),
"sample": summarize_rows(rows, args.summary_limit),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0
except Exception as exc:
if conn is not None and run_id is not None:
finish_poll_run(conn, run_id, error=str(exc))
print(json.dumps({"ok": False, "error": str(exc), "filter": filter_expr}, indent=2), file=sys.stderr)
return 1
finally:
if conn is not None:
conn.close()
if __name__ == "__main__":
raise SystemExit(main())