Files
plumbing/apply_polled_quote_template_jobmaterials.py
T

400 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""Create ServiceM8 jobMaterials for one selected polled Quote Template response.
Safe-by-default:
- Requires an explicit form response UUID.
- Dry-run unless --apply is provided.
- Refuses to apply twice for the same form response unless --force is used.
- Records every dry-run/apply attempt in the poll DB for inspector visibility.
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import os
import sqlite3
import sys
from contextlib import closing
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import requests
from servicem8_quote_template_parser import STATE_DB_PATH, init_state_db, record_generated_job_material
SCRIPT_DIR = Path(__file__).resolve().parent
POLL_DB_PATH = Path(os.getenv("WEBHOOK_POLL_DB_PATH", SCRIPT_DIR / "servicem8_formresponse_poll.db"))
BASE_URL = os.getenv("SERVICEM8_BASE_URL", "https://api.servicem8.com/api_1.0")
REQUEST_TIMEOUT = int(os.getenv("SERVICEM8_TIMEOUT", "30"))
DEV_QUOTE_MATERIAL_UUID = "f78b1d23-b9fa-40fe-a806-2425fe09cc0b"
QUOTE_INCLUDE_HEADER_MATERIAL_UUID = "1924893b-917f-474a-adaa-2093bd622d4b"
QUOTE_EXCLUDE_HEADER_MATERIAL_UUID = "4947bfd7-4875-48f7-9caf-2093b9751b9b"
DEV_QUOTE_TAX_RATE_UUID = "84e4dd28-06b3-452b-a796-1f58a20ac49b"
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def material_uuid_for_row(row: dict) -> str:
kind = row.get("kind", "")
if kind == "include_header":
return QUOTE_INCLUDE_HEADER_MATERIAL_UUID
if kind == "exclude_header":
return QUOTE_EXCLUDE_HEADER_MATERIAL_UUID
return DEV_QUOTE_MATERIAL_UUID
def build_payload(job_uuid: str, row: dict) -> dict:
return {
"job_uuid": job_uuid,
"material_uuid": material_uuid_for_row(row),
"tax_rate_uuid": DEV_QUOTE_TAX_RATE_UUID,
"name": row["name"],
"quantity": "0",
"price": "",
"displayed_amount": "",
"displayed_amount_is_tax_inclusive": "",
"sort_order": row["sort_order"],
}
def create_job_material(session: requests.Session, payload: dict) -> str:
response = session.post(f"{BASE_URL}/jobmaterial.json", json=payload, timeout=REQUEST_TIMEOUT)
if not response.ok:
raise RuntimeError(f"Create failed: HTTP {response.status_code} :: {response.text}")
record_uuid = response.headers.get("x-record-uuid", "")
if not record_uuid:
raise RuntimeError("Create succeeded but x-record-uuid header was missing")
return record_uuid
def load_api_key() -> str:
for name in ("SERVICEM8_ACCESS_TOKEN", "SERVICEM8_API_KEY"):
value = os.getenv(name)
if value:
return value
fallback = SCRIPT_DIR / "servicem8-list-webhook-subscriptions-table.py"
if fallback.exists():
spec = importlib.util.spec_from_file_location("servicem8_subs", fallback)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) # type: ignore[union-attr]
value = getattr(module, "ACCESS_TOKEN", None)
if value:
return str(value)
raise RuntimeError("Missing SERVICEM8_ACCESS_TOKEN or SERVICEM8_API_KEY")
def get_conn(db_path: Path = POLL_DB_PATH) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
def init_apply_tables(conn: sqlite3.Connection) -> None:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS quote_template_apply_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
form_response_uuid TEXT NOT NULL,
job_uuid TEXT NOT NULL,
mode TEXT NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT,
desired_count INTEGER NOT NULL DEFAULT 0,
created_count INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL,
error TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS quote_template_apply_run_rows (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id INTEGER NOT NULL,
form_response_uuid TEXT NOT NULL,
job_uuid TEXT NOT NULL,
row_index INTEGER NOT NULL,
kind TEXT,
source_question TEXT,
name TEXT,
api_payload_json TEXT NOT NULL,
action TEXT NOT NULL,
job_material_uuid TEXT,
error TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY(run_id) REFERENCES quote_template_apply_runs(id)
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_apply_runs_form_response ON quote_template_apply_runs(form_response_uuid)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_apply_rows_run ON quote_template_apply_run_rows(run_id)")
conn.commit()
def load_quote(conn: sqlite3.Connection, form_response_uuid: str) -> sqlite3.Row:
row = conn.execute(
"select * from quote_template_form_responses where form_response_uuid = ?",
(form_response_uuid,),
).fetchone()
if not row:
raise SystemExit(f"No parsed polled Quote Template response found for UUID: {form_response_uuid}")
return row
def existing_created_for_form(form_response_uuid: str) -> int:
if not STATE_DB_PATH.exists():
return 0
try:
with closing(sqlite3.connect(STATE_DB_PATH)) as conn:
row = conn.execute(
"select count(*) from generated_job_materials where form_response_uuid = ?",
(form_response_uuid,),
).fetchone()
return int(row[0] if row else 0)
except sqlite3.Error:
return 0
def create_apply_run(conn: sqlite3.Connection, *, form_response_uuid: str, job_uuid: str, mode: str, desired_count: int) -> int:
cur = conn.execute(
"""
INSERT INTO quote_template_apply_runs (
form_response_uuid, job_uuid, mode, started_at, desired_count, status
) VALUES (?, ?, ?, ?, ?, ?)
""",
(form_response_uuid, job_uuid, mode, utc_now(), desired_count, "running"),
)
conn.commit()
return int(cur.lastrowid)
def finish_apply_run(conn: sqlite3.Connection, run_id: int, *, status: str, created_count: int, error: Optional[str] = None) -> None:
conn.execute(
"""
UPDATE quote_template_apply_runs
SET finished_at = ?, status = ?, created_count = ?, error = ?
WHERE id = ?
""",
(utc_now(), status, created_count, error, run_id),
)
conn.commit()
def record_apply_row(
conn: sqlite3.Connection,
*,
run_id: int,
form_response_uuid: str,
job_uuid: str,
row_index: int,
row: Dict[str, Any],
api_payload: Dict[str, Any],
action: str,
job_material_uuid: str = "",
error: Optional[str] = None,
) -> None:
conn.execute(
"""
INSERT INTO quote_template_apply_run_rows (
run_id, form_response_uuid, job_uuid, row_index, kind, source_question,
name, api_payload_json, action, job_material_uuid, error, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
run_id,
form_response_uuid,
job_uuid,
row_index,
row.get("kind", ""),
row.get("source_question", ""),
row.get("name", ""),
json.dumps(api_payload, ensure_ascii=False, sort_keys=True),
action,
job_material_uuid,
error,
utc_now(),
),
)
conn.commit()
def list_pending(conn: sqlite3.Connection) -> List[Dict[str, Any]]:
rows = conn.execute(
"""
select form_response_uuid, discovered_at, job_uuid, description, queued_at,
processed_at, process_status, desired_job_materials_json
from quote_template_form_responses
order by discovered_at desc
"""
).fetchall()
out = []
for row in rows:
existing = existing_created_for_form(row["form_response_uuid"])
try:
desired_count = len(json.loads(row["desired_job_materials_json"] or "[]"))
except Exception:
desired_count = None
out.append(
{
"form_response_uuid": row["form_response_uuid"],
"discovered_at": row["discovered_at"],
"job_uuid": row["job_uuid"],
"description": row["description"],
"desired_count": desired_count,
"created_in_state_db": existing,
"processed_at": row["processed_at"],
"process_status": row["process_status"],
}
)
return out
def main() -> int:
parser = argparse.ArgumentParser(description="Dry-run or apply one selected polled Quote Template response to ServiceM8 jobMaterials")
parser.add_argument("--uuid", help="Polled Quote Template form_response_uuid to process")
parser.add_argument("--db", default=str(POLL_DB_PATH), help="Poll DB path")
parser.add_argument("--apply", action="store_true", help="Actually create ServiceM8 jobMaterial records. Default is dry-run.")
parser.add_argument("--force", action="store_true", help="Allow apply even if generated_job_materials already contains rows for this form response")
parser.add_argument("--list", action="store_true", help="List parsed polled Quote Template responses and exit")
parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON output")
args = parser.parse_args()
db_path = Path(args.db)
if not db_path.exists():
raise SystemExit(f"Poll DB not found: {db_path}")
with closing(get_conn(db_path)) as conn:
init_apply_tables(conn)
if args.list:
print(json.dumps({"ok": True, "rows": list_pending(conn)}, indent=2 if args.pretty else None, ensure_ascii=False))
return 0
if not args.uuid:
raise SystemExit("--uuid is required unless using --list")
quote = load_quote(conn, args.uuid)
form_response_uuid = quote["form_response_uuid"]
job_uuid = quote["job_uuid"]
desired_rows = json.loads(quote["desired_job_materials_json"] or "[]")
mode = "apply" if args.apply else "dry-run"
existing_created = existing_created_for_form(form_response_uuid)
if args.apply and existing_created and not args.force:
raise SystemExit(
f"Refusing duplicate apply: {existing_created} generated_job_materials already recorded for {form_response_uuid}. Use --force only if intentional."
)
run_id = create_apply_run(
conn,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
mode=mode,
desired_count=len(desired_rows),
)
result = {
"ok": True,
"mode": mode,
"run_id": run_id,
"form_response_uuid": form_response_uuid,
"job_uuid": job_uuid,
"description": quote["description"],
"desired_count": len(desired_rows),
"created_count": 0,
"state_db_path": str(STATE_DB_PATH),
"rows": [],
}
try:
if not args.apply:
for idx, row in enumerate(desired_rows, start=1):
api_payload = build_payload(job_uuid, row)
record_apply_row(
conn,
run_id=run_id,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
row_index=idx,
row=row,
api_payload=api_payload,
action="would_create",
)
result["rows"].append({"action": "would_create", "kind": row.get("kind"), "payload": api_payload})
finish_apply_run(conn, run_id, status="dry-run", created_count=0)
conn.execute(
"UPDATE quote_template_form_responses SET process_status = ? WHERE form_response_uuid = ?",
("dry-run", form_response_uuid),
)
conn.commit()
print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False))
return 0
api_key = load_api_key()
init_state_db(STATE_DB_PATH)
session = requests.Session()
session.headers.update({"X-Api-Key": api_key, "Accept": "application/json", "Content-Type": "application/json"})
created_count = 0
for idx, row in enumerate(desired_rows, start=1):
api_payload = build_payload(job_uuid, row)
created_uuid = create_job_material(session, api_payload)
created_count += 1
record_generated_job_material(
job_uuid=job_uuid,
form_response_uuid=form_response_uuid,
job_material_uuid=created_uuid,
kind=row.get("kind", ""),
source_field_uuid=row.get("source_field_uuid", ""),
source_question=row.get("source_question", ""),
source_text=row.get("name", ""),
db_path=STATE_DB_PATH,
)
record_apply_row(
conn,
run_id=run_id,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
row_index=idx,
row=row,
api_payload=api_payload,
action="created",
job_material_uuid=created_uuid,
)
result["rows"].append({"action": "created", "kind": row.get("kind"), "job_material_uuid": created_uuid, "payload": api_payload})
result["created_count"] = created_count
finish_apply_run(conn, run_id, status="applied", created_count=created_count)
conn.execute(
"UPDATE quote_template_form_responses SET processed_at = ?, process_status = ?, process_error = NULL WHERE form_response_uuid = ?",
(utc_now(), "applied", form_response_uuid),
)
conn.commit()
print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False))
return 0
except Exception as exc:
finish_apply_run(conn, run_id, status="error", created_count=result.get("created_count", 0), error=str(exc))
conn.execute(
"UPDATE quote_template_form_responses SET process_status = ?, process_error = ? WHERE form_response_uuid = ?",
("error", str(exc), form_response_uuid),
)
conn.commit()
raise
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(str(exc), file=sys.stderr)
raise SystemExit(1)