#!/usr/bin/env python3 """Create ServiceM8 jobMaterials for one selected polled Quote Template response. Safe-by-default: - Requires an explicit form response UUID. - Dry-run unless --apply is provided. - Refuses to apply twice for the same form response unless --force is used. - Records every dry-run/apply attempt in the poll DB for inspector visibility. """ from __future__ import annotations import argparse import importlib.util import json import os import sqlite3 import sys from contextlib import closing from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional import requests from servicem8_quote_template_parser import STATE_DB_PATH, init_state_db, record_generated_job_material SCRIPT_DIR = Path(__file__).resolve().parent POLL_DB_PATH = Path(os.getenv("WEBHOOK_POLL_DB_PATH", SCRIPT_DIR / "servicem8_formresponse_poll.db")) BASE_URL = os.getenv("SERVICEM8_BASE_URL", "https://api.servicem8.com/api_1.0") REQUEST_TIMEOUT = int(os.getenv("SERVICEM8_TIMEOUT", "30")) DEV_QUOTE_MATERIAL_UUID = "f78b1d23-b9fa-40fe-a806-2425fe09cc0b" QUOTE_INCLUDE_HEADER_MATERIAL_UUID = "1924893b-917f-474a-adaa-2093bd622d4b" QUOTE_EXCLUDE_HEADER_MATERIAL_UUID = "4947bfd7-4875-48f7-9caf-2093b9751b9b" DEV_QUOTE_TAX_RATE_UUID = "84e4dd28-06b3-452b-a796-1f58a20ac49b" def utc_now() -> str: return datetime.now(timezone.utc).isoformat() def material_uuid_for_row(row: dict) -> str: kind = row.get("kind", "") if kind == "include_header": return QUOTE_INCLUDE_HEADER_MATERIAL_UUID if kind == "exclude_header": return QUOTE_EXCLUDE_HEADER_MATERIAL_UUID return DEV_QUOTE_MATERIAL_UUID def build_payload(job_uuid: str, row: dict) -> dict: return { "job_uuid": job_uuid, "material_uuid": material_uuid_for_row(row), "tax_rate_uuid": DEV_QUOTE_TAX_RATE_UUID, "name": row["name"], "quantity": "0", "price": "", "displayed_amount": "", "displayed_amount_is_tax_inclusive": "", "sort_order": row["sort_order"], } def create_job_material(session: requests.Session, payload: dict) -> str: response = session.post(f"{BASE_URL}/jobmaterial.json", json=payload, timeout=REQUEST_TIMEOUT) if not response.ok: raise RuntimeError(f"Create failed: HTTP {response.status_code} :: {response.text}") record_uuid = response.headers.get("x-record-uuid", "") if not record_uuid: raise RuntimeError("Create succeeded but x-record-uuid header was missing") return record_uuid def load_api_key() -> str: for name in ("SERVICEM8_ACCESS_TOKEN", "SERVICEM8_API_KEY"): value = os.getenv(name) if value: return value fallback = SCRIPT_DIR / "servicem8-list-webhook-subscriptions-table.py" if fallback.exists(): spec = importlib.util.spec_from_file_location("servicem8_subs", fallback) if spec and spec.loader: module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # type: ignore[union-attr] value = getattr(module, "ACCESS_TOKEN", None) if value: return str(value) raise RuntimeError("Missing SERVICEM8_ACCESS_TOKEN or SERVICEM8_API_KEY") def get_conn(db_path: Path = POLL_DB_PATH) -> sqlite3.Connection: conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row return conn def init_apply_tables(conn: sqlite3.Connection) -> None: conn.execute( """ CREATE TABLE IF NOT EXISTS quote_template_apply_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, form_response_uuid TEXT NOT NULL, job_uuid TEXT NOT NULL, mode TEXT NOT NULL, started_at TEXT NOT NULL, finished_at TEXT, desired_count INTEGER NOT NULL DEFAULT 0, created_count INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL, error TEXT ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS quote_template_apply_run_rows ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id INTEGER NOT NULL, form_response_uuid TEXT NOT NULL, job_uuid TEXT NOT NULL, row_index INTEGER NOT NULL, kind TEXT, source_question TEXT, name TEXT, api_payload_json TEXT NOT NULL, action TEXT NOT NULL, job_material_uuid TEXT, error TEXT, created_at TEXT NOT NULL, FOREIGN KEY(run_id) REFERENCES quote_template_apply_runs(id) ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_apply_runs_form_response ON quote_template_apply_runs(form_response_uuid)") conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_apply_rows_run ON quote_template_apply_run_rows(run_id)") conn.commit() def load_quote(conn: sqlite3.Connection, form_response_uuid: str) -> sqlite3.Row: row = conn.execute( "select * from quote_template_form_responses where form_response_uuid = ?", (form_response_uuid,), ).fetchone() if not row: raise SystemExit(f"No parsed polled Quote Template response found for UUID: {form_response_uuid}") return row def existing_created_for_form(form_response_uuid: str) -> int: if not STATE_DB_PATH.exists(): return 0 try: with closing(sqlite3.connect(STATE_DB_PATH)) as conn: row = conn.execute( "select count(*) from generated_job_materials where form_response_uuid = ?", (form_response_uuid,), ).fetchone() return int(row[0] if row else 0) except sqlite3.Error: return 0 def create_apply_run(conn: sqlite3.Connection, *, form_response_uuid: str, job_uuid: str, mode: str, desired_count: int) -> int: cur = conn.execute( """ INSERT INTO quote_template_apply_runs ( form_response_uuid, job_uuid, mode, started_at, desired_count, status ) VALUES (?, ?, ?, ?, ?, ?) """, (form_response_uuid, job_uuid, mode, utc_now(), desired_count, "running"), ) conn.commit() return int(cur.lastrowid) def finish_apply_run(conn: sqlite3.Connection, run_id: int, *, status: str, created_count: int, error: Optional[str] = None) -> None: conn.execute( """ UPDATE quote_template_apply_runs SET finished_at = ?, status = ?, created_count = ?, error = ? WHERE id = ? """, (utc_now(), status, created_count, error, run_id), ) conn.commit() def record_apply_row( conn: sqlite3.Connection, *, run_id: int, form_response_uuid: str, job_uuid: str, row_index: int, row: Dict[str, Any], api_payload: Dict[str, Any], action: str, job_material_uuid: str = "", error: Optional[str] = None, ) -> None: conn.execute( """ INSERT INTO quote_template_apply_run_rows ( run_id, form_response_uuid, job_uuid, row_index, kind, source_question, name, api_payload_json, action, job_material_uuid, error, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( run_id, form_response_uuid, job_uuid, row_index, row.get("kind", ""), row.get("source_question", ""), row.get("name", ""), json.dumps(api_payload, ensure_ascii=False, sort_keys=True), action, job_material_uuid, error, utc_now(), ), ) conn.commit() def list_pending(conn: sqlite3.Connection) -> List[Dict[str, Any]]: rows = conn.execute( """ select form_response_uuid, discovered_at, job_uuid, description, queued_at, processed_at, process_status, desired_job_materials_json from quote_template_form_responses order by discovered_at desc """ ).fetchall() out = [] for row in rows: existing = existing_created_for_form(row["form_response_uuid"]) try: desired_count = len(json.loads(row["desired_job_materials_json"] or "[]")) except Exception: desired_count = None out.append( { "form_response_uuid": row["form_response_uuid"], "discovered_at": row["discovered_at"], "job_uuid": row["job_uuid"], "description": row["description"], "desired_count": desired_count, "created_in_state_db": existing, "processed_at": row["processed_at"], "process_status": row["process_status"], } ) return out def main() -> int: parser = argparse.ArgumentParser(description="Dry-run or apply one selected polled Quote Template response to ServiceM8 jobMaterials") parser.add_argument("--uuid", help="Polled Quote Template form_response_uuid to process") parser.add_argument("--db", default=str(POLL_DB_PATH), help="Poll DB path") parser.add_argument("--apply", action="store_true", help="Actually create ServiceM8 jobMaterial records. Default is dry-run.") parser.add_argument("--force", action="store_true", help="Allow apply even if generated_job_materials already contains rows for this form response") parser.add_argument("--list", action="store_true", help="List parsed polled Quote Template responses and exit") parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON output") args = parser.parse_args() db_path = Path(args.db) if not db_path.exists(): raise SystemExit(f"Poll DB not found: {db_path}") with closing(get_conn(db_path)) as conn: init_apply_tables(conn) if args.list: print(json.dumps({"ok": True, "rows": list_pending(conn)}, indent=2 if args.pretty else None, ensure_ascii=False)) return 0 if not args.uuid: raise SystemExit("--uuid is required unless using --list") quote = load_quote(conn, args.uuid) form_response_uuid = quote["form_response_uuid"] job_uuid = quote["job_uuid"] desired_rows = json.loads(quote["desired_job_materials_json"] or "[]") mode = "apply" if args.apply else "dry-run" existing_created = existing_created_for_form(form_response_uuid) if args.apply and existing_created and not args.force: raise SystemExit( f"Refusing duplicate apply: {existing_created} generated_job_materials already recorded for {form_response_uuid}. Use --force only if intentional." ) run_id = create_apply_run( conn, form_response_uuid=form_response_uuid, job_uuid=job_uuid, mode=mode, desired_count=len(desired_rows), ) result = { "ok": True, "mode": mode, "run_id": run_id, "form_response_uuid": form_response_uuid, "job_uuid": job_uuid, "description": quote["description"], "desired_count": len(desired_rows), "created_count": 0, "state_db_path": str(STATE_DB_PATH), "rows": [], } try: if not args.apply: for idx, row in enumerate(desired_rows, start=1): api_payload = build_payload(job_uuid, row) record_apply_row( conn, run_id=run_id, form_response_uuid=form_response_uuid, job_uuid=job_uuid, row_index=idx, row=row, api_payload=api_payload, action="would_create", ) result["rows"].append({"action": "would_create", "kind": row.get("kind"), "payload": api_payload}) finish_apply_run(conn, run_id, status="dry-run", created_count=0) conn.execute( "UPDATE quote_template_form_responses SET process_status = ? WHERE form_response_uuid = ?", ("dry-run", form_response_uuid), ) conn.commit() print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False)) return 0 api_key = load_api_key() init_state_db(STATE_DB_PATH) session = requests.Session() session.headers.update({"X-Api-Key": api_key, "Accept": "application/json", "Content-Type": "application/json"}) created_count = 0 for idx, row in enumerate(desired_rows, start=1): api_payload = build_payload(job_uuid, row) created_uuid = create_job_material(session, api_payload) created_count += 1 record_generated_job_material( job_uuid=job_uuid, form_response_uuid=form_response_uuid, job_material_uuid=created_uuid, kind=row.get("kind", ""), source_field_uuid=row.get("source_field_uuid", ""), source_question=row.get("source_question", ""), source_text=row.get("name", ""), db_path=STATE_DB_PATH, ) record_apply_row( conn, run_id=run_id, form_response_uuid=form_response_uuid, job_uuid=job_uuid, row_index=idx, row=row, api_payload=api_payload, action="created", job_material_uuid=created_uuid, ) result["rows"].append({"action": "created", "kind": row.get("kind"), "job_material_uuid": created_uuid, "payload": api_payload}) result["created_count"] = created_count finish_apply_run(conn, run_id, status="applied", created_count=created_count) conn.execute( "UPDATE quote_template_form_responses SET processed_at = ?, process_status = ?, process_error = NULL WHERE form_response_uuid = ?", (utc_now(), "applied", form_response_uuid), ) conn.commit() print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False)) return 0 except Exception as exc: finish_apply_run(conn, run_id, status="error", created_count=result.get("created_count", 0), error=str(exc)) conn.execute( "UPDATE quote_template_form_responses SET process_status = ?, process_error = ? WHERE form_response_uuid = ?", ("error", str(exc), form_response_uuid), ) conn.commit() raise if __name__ == "__main__": try: raise SystemExit(main()) except Exception as exc: print(str(exc), file=sys.stderr) raise SystemExit(1)