Files
plumbing/apply_polled_quote_template_jobmaterials.py

729 lines
29 KiB
Python
Executable File

#!/usr/bin/env python3
"""Create ServiceM8 jobMaterials for one selected polled Quote Template response.
Safe-by-default:
- Requires an explicit form response UUID.
- Dry-run unless --apply is provided.
- Refuses to apply twice for the same form response unless --force is used.
- Records every dry-run/apply attempt in the poll DB for inspector visibility.
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import os
import sqlite3
import sys
from contextlib import closing
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import requests
from servicem8_quote_template_parser import STATE_DB_PATH, init_state_db, record_generated_job_material
SCRIPT_DIR = Path(__file__).resolve().parent
POLL_DB_PATH = Path(os.getenv("WEBHOOK_POLL_DB_PATH", SCRIPT_DIR / "servicem8_formresponse_poll.db"))
BASE_URL = os.getenv("SERVICEM8_BASE_URL", "https://api.servicem8.com/api_1.0")
REQUEST_TIMEOUT = int(os.getenv("SERVICEM8_TIMEOUT", "30"))
DEV_QUOTE_MATERIAL_UUID = "f78b1d23-b9fa-40fe-a806-2425fe09cc0b"
QUOTE_INCLUDE_HEADER_MATERIAL_UUID = "1924893b-917f-474a-adaa-2093bd622d4b"
QUOTE_EXCLUDE_HEADER_MATERIAL_UUID = "4947bfd7-4875-48f7-9caf-2093b9751b9b"
DEV_QUOTE_TAX_RATE_UUID = "84e4dd28-06b3-452b-a796-1f58a20ac49b"
QUOTE_DESCRIPTION_PREFIX = "Thank you for the opportunity to quote to "
QUOTE_DESCRIPTION_SUFFIX = (
"Please find below a detailed breakdown of the proposed costs included in the quotation. "
"If you have any questions or concerns, please do not hesitate to contact us."
)
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def escape_filter_value(value: str) -> str:
return value.replace("'", "''")
def material_uuid_for_row(row: dict) -> str:
kind = row.get("kind", "")
if kind == "include_header":
return QUOTE_INCLUDE_HEADER_MATERIAL_UUID
if kind == "exclude_header":
return QUOTE_EXCLUDE_HEADER_MATERIAL_UUID
return DEV_QUOTE_MATERIAL_UUID
def build_payload(job_uuid: str, row: dict) -> dict:
return {
"job_uuid": job_uuid,
"material_uuid": material_uuid_for_row(row),
"tax_rate_uuid": DEV_QUOTE_TAX_RATE_UUID,
"name": row["name"],
"quantity": "0",
"price": "",
"displayed_amount": "",
"displayed_amount_is_tax_inclusive": "",
"sort_order": row["sort_order"],
}
def clean_text(value: Any) -> str:
if value is None:
return ""
return str(value).replace("\r\n", "\n").replace("\r", "\n").strip()
def first_text(*values: Any) -> str:
for value in values:
text = clean_text(value)
if text:
return text
return ""
def format_job_address(job: Dict[str, Any]) -> str:
direct = first_text(
job.get("job_address"),
job.get("site_address"),
job.get("address"),
job.get("location_address"),
job.get("billing_address"),
)
if direct:
return direct
parts = [
first_text(job.get("street"), job.get("street_address"), job.get("address_1"), job.get("address1")),
first_text(job.get("suburb"), job.get("city")),
first_text(job.get("state")),
first_text(job.get("postcode"), job.get("postal_code"), job.get("zip")),
]
return " ".join(part for part in parts if part)
def build_quote_description_text(description: str, job: Dict[str, Any]) -> str:
description = clean_text(description)
if not description:
return ""
address = format_job_address(job) or "the job address"
return f"{QUOTE_DESCRIPTION_PREFIX} {description} at {address}. {QUOTE_DESCRIPTION_SUFFIX}"
def build_job_update_payload(description: str, job: Dict[str, Any]) -> dict:
quote_description = build_quote_description_text(description, job)
return {"work_done_description": quote_description} if quote_description else {}
def retrieve_job(session: requests.Session, job_uuid: str) -> Dict[str, Any]:
response = session.get(f"{BASE_URL}/job/{job_uuid}.json", timeout=REQUEST_TIMEOUT)
if not response.ok:
raise RuntimeError(f"Job retrieve failed: HTTP {response.status_code} :: {response.text[:1000]}")
data = response.json()
if not isinstance(data, dict):
raise RuntimeError(f"Job retrieve expected object response, got {type(data).__name__}")
return data
def update_job_description(session: requests.Session, job_uuid: str, payload: dict) -> None:
response = session.post(f"{BASE_URL}/job/{job_uuid}.json", json=payload, timeout=REQUEST_TIMEOUT)
if not response.ok:
raise RuntimeError(f"Job quote description update failed: HTTP {response.status_code} :: {response.text[:1000]}")
def extract_company_name(job: Dict[str, Any]) -> str:
related = job.get("related")
if isinstance(related, dict):
company = related.get("company")
if isinstance(company, dict):
company_name = clean_text(company.get("name"))
if company_name:
return company_name
company = job.get("company")
if isinstance(company, dict):
company_name = clean_text(company.get("name"))
if company_name:
return company_name
return first_text(job.get("company_name"), job.get("customer_name"))
def upsert_job_metadata(conn: sqlite3.Connection, *, job_uuid: str, job: Dict[str, Any], source: str) -> None:
job_uuid = clean_text(job_uuid or job.get("uuid"))
if not job_uuid:
return
now = utc_now()
conn.execute(
"""
INSERT INTO job_metadata (
job_uuid, generated_job_id, job_address, company_name, raw_json,
first_seen_at, last_seen_at, source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(job_uuid) DO UPDATE SET
generated_job_id = excluded.generated_job_id,
job_address = excluded.job_address,
company_name = excluded.company_name,
raw_json = excluded.raw_json,
last_seen_at = excluded.last_seen_at,
source = excluded.source
""",
(
job_uuid,
clean_text(job.get("generated_job_id")),
format_job_address(job),
extract_company_name(job),
json.dumps(job, ensure_ascii=False, sort_keys=True),
now,
now,
source,
),
)
conn.commit()
def create_job_material(session: requests.Session, payload: dict) -> str:
response = session.post(f"{BASE_URL}/jobmaterial.json", json=payload, timeout=REQUEST_TIMEOUT)
if not response.ok:
raise RuntimeError(f"Create failed: HTTP {response.status_code} :: {response.text}")
record_uuid = response.headers.get("x-record-uuid", "")
if not record_uuid:
raise RuntimeError("Create succeeded but x-record-uuid header was missing")
return record_uuid
def list_remote_job_materials(session: requests.Session, job_uuid: str) -> List[Dict[str, Any]]:
"""Return existing remote ServiceM8 jobMaterial rows for a job."""
filter_expr = f"job_uuid eq '{escape_filter_value(job_uuid)}'"
response = session.get(f"{BASE_URL}/jobmaterial.json", params={"$filter": filter_expr}, timeout=REQUEST_TIMEOUT)
if not response.ok:
raise RuntimeError(f"Remote jobMaterial preflight failed: HTTP {response.status_code} :: {response.text[:1000]}")
data = response.json()
if not isinstance(data, list):
raise RuntimeError(f"Remote jobMaterial preflight expected list response, got {type(data).__name__}")
return data
def is_active_remote_job_material(row: Dict[str, Any]) -> bool:
value = row.get("active", 1)
return value not in (0, "0", False, "false", "False")
def load_api_key() -> str:
for name in ("SERVICEM8_ACCESS_TOKEN", "SERVICEM8_API_KEY"):
value = os.getenv(name)
if value:
return value
fallback = SCRIPT_DIR / "servicem8-list-webhook-subscriptions-table.py"
if fallback.exists():
spec = importlib.util.spec_from_file_location("servicem8_subs", fallback)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) # type: ignore[union-attr]
value = getattr(module, "ACCESS_TOKEN", None)
if value:
return str(value)
raise RuntimeError("Missing SERVICEM8_ACCESS_TOKEN or SERVICEM8_API_KEY")
def get_conn(db_path: Path = POLL_DB_PATH) -> sqlite3.Connection:
conn = sqlite3.connect(db_path, timeout=30)
conn.row_factory = sqlite3.Row
return conn
def init_apply_tables(conn: sqlite3.Connection) -> None:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS job_metadata (
job_uuid TEXT PRIMARY KEY,
generated_job_id TEXT,
job_address TEXT,
company_name TEXT,
raw_json TEXT NOT NULL,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
source TEXT NOT NULL
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_job_metadata_generated_job_id ON job_metadata(generated_job_id)")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS quote_template_apply_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
form_response_uuid TEXT NOT NULL,
job_uuid TEXT NOT NULL,
mode TEXT NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT,
desired_count INTEGER NOT NULL DEFAULT 0,
created_count INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL,
error TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS quote_template_apply_run_rows (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id INTEGER NOT NULL,
form_response_uuid TEXT NOT NULL,
job_uuid TEXT NOT NULL,
row_index INTEGER NOT NULL,
kind TEXT,
source_question TEXT,
name TEXT,
api_payload_json TEXT NOT NULL,
action TEXT NOT NULL,
job_material_uuid TEXT,
error TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY(run_id) REFERENCES quote_template_apply_runs(id)
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS quote_template_remote_existing_incidents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
detected_at TEXT NOT NULL,
form_response_uuid TEXT NOT NULL,
job_uuid TEXT NOT NULL,
apply_run_id INTEGER,
desired_count INTEGER NOT NULL DEFAULT 0,
remote_count INTEGER NOT NULL DEFAULT 0,
remote_active_count INTEGER NOT NULL DEFAULT 0,
action TEXT NOT NULL,
reason TEXT,
remote_rows_json TEXT NOT NULL,
FOREIGN KEY(apply_run_id) REFERENCES quote_template_apply_runs(id)
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_apply_runs_form_response ON quote_template_apply_runs(form_response_uuid)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_apply_rows_run ON quote_template_apply_run_rows(run_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_remote_existing_form_response ON quote_template_remote_existing_incidents(form_response_uuid)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_remote_existing_job_uuid ON quote_template_remote_existing_incidents(job_uuid)")
conn.commit()
def load_quote(conn: sqlite3.Connection, form_response_uuid: str) -> sqlite3.Row:
row = conn.execute(
"select * from quote_template_form_responses where form_response_uuid = ?",
(form_response_uuid,),
).fetchone()
if not row:
raise SystemExit(f"No parsed polled Quote Template response found for UUID: {form_response_uuid}")
return row
def existing_created_for_form(form_response_uuid: str) -> int:
if not STATE_DB_PATH.exists():
return 0
try:
with closing(sqlite3.connect(STATE_DB_PATH, timeout=30)) as conn:
row = conn.execute(
"select count(*) from generated_job_materials where form_response_uuid = ?",
(form_response_uuid,),
).fetchone()
return int(row[0] if row else 0)
except sqlite3.Error:
return 0
def create_apply_run(conn: sqlite3.Connection, *, form_response_uuid: str, job_uuid: str, mode: str, desired_count: int) -> int:
cur = conn.execute(
"""
INSERT INTO quote_template_apply_runs (
form_response_uuid, job_uuid, mode, started_at, desired_count, status
) VALUES (?, ?, ?, ?, ?, ?)
""",
(form_response_uuid, job_uuid, mode, utc_now(), desired_count, "running"),
)
conn.commit()
return int(cur.lastrowid)
def finish_apply_run(conn: sqlite3.Connection, run_id: int, *, status: str, created_count: int, error: Optional[str] = None) -> None:
conn.execute(
"""
UPDATE quote_template_apply_runs
SET finished_at = ?, status = ?, created_count = ?, error = ?
WHERE id = ?
""",
(utc_now(), status, created_count, error, run_id),
)
conn.commit()
def record_apply_row(
conn: sqlite3.Connection,
*,
run_id: int,
form_response_uuid: str,
job_uuid: str,
row_index: int,
row: Dict[str, Any],
api_payload: Dict[str, Any],
action: str,
job_material_uuid: str = "",
error: Optional[str] = None,
) -> None:
conn.execute(
"""
INSERT INTO quote_template_apply_run_rows (
run_id, form_response_uuid, job_uuid, row_index, kind, source_question,
name, api_payload_json, action, job_material_uuid, error, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
run_id,
form_response_uuid,
job_uuid,
row_index,
row.get("kind", ""),
row.get("source_question", ""),
row.get("name", ""),
json.dumps(api_payload, ensure_ascii=False, sort_keys=True),
action,
job_material_uuid,
error,
utc_now(),
),
)
conn.commit()
def record_remote_existing_incident(
conn: sqlite3.Connection,
*,
form_response_uuid: str,
job_uuid: str,
apply_run_id: int,
desired_count: int,
remote_rows: List[Dict[str, Any]],
action: str,
reason: str,
) -> int:
remote_active_count = sum(1 for row in remote_rows if is_active_remote_job_material(row))
cur = conn.execute(
"""
INSERT INTO quote_template_remote_existing_incidents (
detected_at, form_response_uuid, job_uuid, apply_run_id, desired_count,
remote_count, remote_active_count, action, reason, remote_rows_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
utc_now(),
form_response_uuid,
job_uuid,
apply_run_id,
desired_count,
len(remote_rows),
remote_active_count,
action,
reason,
json.dumps(remote_rows, ensure_ascii=False, sort_keys=True),
),
)
conn.commit()
return int(cur.lastrowid)
def list_pending(conn: sqlite3.Connection) -> List[Dict[str, Any]]:
rows = conn.execute(
"""
select form_response_uuid, discovered_at, job_uuid, description, queued_at,
processed_at, process_status, desired_job_materials_json
from quote_template_form_responses
order by discovered_at desc
"""
).fetchall()
out = []
for row in rows:
existing = existing_created_for_form(row["form_response_uuid"])
try:
desired_count = len(json.loads(row["desired_job_materials_json"] or "[]"))
except Exception:
desired_count = None
out.append(
{
"form_response_uuid": row["form_response_uuid"],
"discovered_at": row["discovered_at"],
"job_uuid": row["job_uuid"],
"description": row["description"],
"desired_count": desired_count,
"created_in_state_db": existing,
"processed_at": row["processed_at"],
"process_status": row["process_status"],
}
)
return out
def main() -> int:
parser = argparse.ArgumentParser(description="Dry-run or apply one selected polled Quote Template response to ServiceM8 jobMaterials")
parser.add_argument("--uuid", help="Polled Quote Template form_response_uuid to process")
parser.add_argument("--db", default=str(POLL_DB_PATH), help="Poll DB path")
parser.add_argument("--apply", action="store_true", help="Actually create ServiceM8 jobMaterial records. Default is dry-run.")
parser.add_argument("--force", action="store_true", help="Allow apply even if generated_job_materials already contains rows for this form response")
parser.add_argument("--force-remote-existing", action="store_true", help="Allow apply even when ServiceM8 already has jobMaterial rows for the target job; records a forced incident before creating")
parser.add_argument("--list", action="store_true", help="List parsed polled Quote Template responses and exit")
parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON output")
args = parser.parse_args()
db_path = Path(args.db)
if not db_path.exists():
raise SystemExit(f"Poll DB not found: {db_path}")
with closing(get_conn(db_path)) as conn:
init_apply_tables(conn)
if args.list:
print(json.dumps({"ok": True, "rows": list_pending(conn)}, indent=2 if args.pretty else None, ensure_ascii=False))
return 0
if not args.uuid:
raise SystemExit("--uuid is required unless using --list")
quote = load_quote(conn, args.uuid)
form_response_uuid = quote["form_response_uuid"]
job_uuid = quote["job_uuid"]
desired_rows = json.loads(quote["desired_job_materials_json"] or "[]")
mode = "apply" if args.apply else "dry-run"
existing_created = existing_created_for_form(form_response_uuid)
if args.apply and existing_created and not args.force:
raise SystemExit(
f"Refusing duplicate apply: {existing_created} generated_job_materials already recorded for {form_response_uuid}. Use --force only if intentional."
)
run_id = create_apply_run(
conn,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
mode=mode,
desired_count=len(desired_rows),
)
result = {
"ok": True,
"mode": mode,
"run_id": run_id,
"form_response_uuid": form_response_uuid,
"job_uuid": job_uuid,
"description": quote["description"],
"desired_count": len(desired_rows),
"created_count": 0,
"state_db_path": str(STATE_DB_PATH),
"rows": [],
}
try:
api_key = load_api_key()
session = requests.Session()
session.headers.update({"X-Api-Key": api_key, "Accept": "application/json", "Content-Type": "application/json"})
quote_description_source = clean_text(quote["description"])
job_details = retrieve_job(session, job_uuid) if quote_description_source else {}
if job_details:
upsert_job_metadata(conn, job_uuid=job_uuid, job=job_details, source=mode)
job_update_payload = build_job_update_payload(quote_description_source, job_details)
job_update_record_payload = {
"endpoint": f"/job/{job_uuid}.json",
"payload": job_update_payload,
"source_description": quote_description_source,
"job_address": format_job_address(job_details) if job_details else "",
}
job_update_row = {
"kind": "work_done_description",
"source_question": "Description of Works to be Quoted",
"name": job_update_payload.get("work_done_description", ""),
}
if not args.apply:
remote_existing_rows = list_remote_job_materials(session, job_uuid)
remote_existing_blocks_apply = bool(remote_existing_rows)
if remote_existing_blocks_apply:
remote_active_count = sum(1 for remote_row in remote_existing_rows if is_active_remote_job_material(remote_row))
reason = (
f"Remote ServiceM8 job already has {len(remote_existing_rows)} jobMaterial row(s) "
f"({remote_active_count} active); apply would be blocked before updates or creates"
)
incident_id = record_remote_existing_incident(
conn,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
apply_run_id=run_id,
desired_count=len(desired_rows),
remote_rows=remote_existing_rows,
action="dry_run_would_block",
reason=reason,
)
result["remote_existing"] = {
"incident_id": incident_id,
"action": "would_block_remote_existing",
"remote_count": len(remote_existing_rows),
"remote_active_count": remote_active_count,
"reason": reason,
}
if job_update_payload:
job_update_action = (
"would_update_work_done_description_if_remote_empty"
if remote_existing_blocks_apply
else "would_update_work_done_description"
)
record_apply_row(
conn,
run_id=run_id,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
row_index=0,
row=job_update_row,
api_payload=job_update_record_payload,
action=job_update_action,
)
result["job_update"] = {"action": job_update_action, **job_update_record_payload}
else:
result["job_update"] = {"action": "skipped", "reason": "Quote description is empty"}
for idx, row in enumerate(desired_rows, start=1):
api_payload = build_payload(job_uuid, row)
row_action = "would_create_if_remote_empty" if remote_existing_blocks_apply else "would_create"
record_apply_row(
conn,
run_id=run_id,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
row_index=idx,
row=row,
api_payload=api_payload,
action=row_action,
)
result["rows"].append({"action": row_action, "kind": row.get("kind"), "payload": api_payload})
finish_apply_run(conn, run_id, status="dry-run", created_count=0)
conn.execute(
"UPDATE quote_template_form_responses SET process_status = ? WHERE form_response_uuid = ?",
("dry-run", form_response_uuid),
)
conn.commit()
print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False))
return 0
init_state_db(STATE_DB_PATH)
remote_existing_rows = list_remote_job_materials(session, job_uuid)
if remote_existing_rows:
remote_active_count = sum(1 for remote_row in remote_existing_rows if is_active_remote_job_material(remote_row))
action = "forced" if args.force_remote_existing else "blocked"
reason = (
f"Remote ServiceM8 job already has {len(remote_existing_rows)} jobMaterial row(s) "
f"({remote_active_count} active); no creates attempted"
if not args.force_remote_existing
else f"Remote ServiceM8 job already has {len(remote_existing_rows)} jobMaterial row(s) "
f"({remote_active_count} active); create forced by --force-remote-existing"
)
incident_id = record_remote_existing_incident(
conn,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
apply_run_id=run_id,
desired_count=len(desired_rows),
remote_rows=remote_existing_rows,
action=action,
reason=reason,
)
result["remote_existing"] = {
"incident_id": incident_id,
"action": action,
"remote_count": len(remote_existing_rows),
"remote_active_count": remote_active_count,
"reason": reason,
}
if not args.force_remote_existing:
finish_apply_run(conn, run_id, status="blocked_remote_existing", created_count=0)
conn.execute(
"UPDATE quote_template_form_responses SET processed_at = ?, process_status = ?, process_error = NULL WHERE form_response_uuid = ?",
(utc_now(), "blocked_remote_existing", form_response_uuid),
)
conn.commit()
result["status"] = "blocked_remote_existing"
print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False))
return 0
if job_update_payload:
update_job_description(session, job_uuid, job_update_payload)
record_apply_row(
conn,
run_id=run_id,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
row_index=0,
row=job_update_row,
api_payload=job_update_record_payload,
action="updated_work_done_description",
)
result["job_update"] = {"action": "updated_work_done_description", **job_update_record_payload}
else:
result["job_update"] = {"action": "skipped", "reason": "Quote description is empty"}
created_count = 0
for idx, row in enumerate(desired_rows, start=1):
api_payload = build_payload(job_uuid, row)
created_uuid = create_job_material(session, api_payload)
created_count += 1
record_generated_job_material(
job_uuid=job_uuid,
form_response_uuid=form_response_uuid,
job_material_uuid=created_uuid,
kind=row.get("kind", ""),
source_field_uuid=row.get("source_field_uuid", ""),
source_question=row.get("source_question", ""),
source_text=row.get("name", ""),
db_path=STATE_DB_PATH,
)
record_apply_row(
conn,
run_id=run_id,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
row_index=idx,
row=row,
api_payload=api_payload,
action="created",
job_material_uuid=created_uuid,
)
result["rows"].append({"action": "created", "kind": row.get("kind"), "job_material_uuid": created_uuid, "payload": api_payload})
result["created_count"] = created_count
finish_apply_run(conn, run_id, status="applied", created_count=created_count)
conn.execute(
"UPDATE quote_template_form_responses SET processed_at = ?, process_status = ?, process_error = NULL WHERE form_response_uuid = ?",
(utc_now(), "applied", form_response_uuid),
)
conn.commit()
print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False))
return 0
except Exception as exc:
finish_apply_run(conn, run_id, status="error", created_count=result.get("created_count", 0), error=str(exc))
conn.execute(
"UPDATE quote_template_form_responses SET process_status = ?, process_error = ? WHERE form_response_uuid = ?",
("error", str(exc), form_response_uuid),
)
conn.commit()
raise
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(str(exc), file=sys.stderr)
raise SystemExit(1)