Refactored the quote_push process to check for any existing jobMaterials and fail with a log entry and some intelligence so that we don't overwrite anything that has been manually created. Also added another list in the inspector.py code to allow us to view this when it occurs.

This commit is contained in:
2026-05-11 14:41:26 +10:00
parent f03840c574
commit d7dc2ade06
5 changed files with 284 additions and 11 deletions
+119 -2
View File
@@ -39,6 +39,10 @@ def utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def escape_filter_value(value: str) -> str:
return value.replace("'", "''")
def material_uuid_for_row(row: dict) -> str:
kind = row.get("kind", "")
if kind == "include_header":
@@ -72,6 +76,23 @@ def create_job_material(session: requests.Session, payload: dict) -> str:
return record_uuid
def list_remote_job_materials(session: requests.Session, job_uuid: str) -> List[Dict[str, Any]]:
"""Return existing remote ServiceM8 jobMaterial rows for a job."""
filter_expr = f"job_uuid eq '{escape_filter_value(job_uuid)}'"
response = session.get(f"{BASE_URL}/jobmaterial.json", params={"$filter": filter_expr}, timeout=REQUEST_TIMEOUT)
if not response.ok:
raise RuntimeError(f"Remote jobMaterial preflight failed: HTTP {response.status_code} :: {response.text[:1000]}")
data = response.json()
if not isinstance(data, list):
raise RuntimeError(f"Remote jobMaterial preflight expected list response, got {type(data).__name__}")
return data
def is_active_remote_job_material(row: Dict[str, Any]) -> bool:
value = row.get("active", 1)
return value not in (0, "0", False, "false", "False")
def load_api_key() -> str:
for name in ("SERVICEM8_ACCESS_TOKEN", "SERVICEM8_API_KEY"):
value = os.getenv(name)
@@ -92,7 +113,7 @@ def load_api_key() -> str:
def get_conn(db_path: Path = POLL_DB_PATH) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn = sqlite3.connect(db_path, timeout=30)
conn.row_factory = sqlite3.Row
return conn
@@ -134,8 +155,28 @@ def init_apply_tables(conn: sqlite3.Connection) -> None:
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS quote_template_remote_existing_incidents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
detected_at TEXT NOT NULL,
form_response_uuid TEXT NOT NULL,
job_uuid TEXT NOT NULL,
apply_run_id INTEGER,
desired_count INTEGER NOT NULL DEFAULT 0,
remote_count INTEGER NOT NULL DEFAULT 0,
remote_active_count INTEGER NOT NULL DEFAULT 0,
action TEXT NOT NULL,
reason TEXT,
remote_rows_json TEXT NOT NULL,
FOREIGN KEY(apply_run_id) REFERENCES quote_template_apply_runs(id)
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_apply_runs_form_response ON quote_template_apply_runs(form_response_uuid)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_apply_rows_run ON quote_template_apply_run_rows(run_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_remote_existing_form_response ON quote_template_remote_existing_incidents(form_response_uuid)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_remote_existing_job_uuid ON quote_template_remote_existing_incidents(job_uuid)")
conn.commit()
@@ -153,7 +194,7 @@ def existing_created_for_form(form_response_uuid: str) -> int:
if not STATE_DB_PATH.exists():
return 0
try:
with closing(sqlite3.connect(STATE_DB_PATH)) as conn:
with closing(sqlite3.connect(STATE_DB_PATH, timeout=30)) as conn:
row = conn.execute(
"select count(*) from generated_job_materials where form_response_uuid = ?",
(form_response_uuid,),
@@ -226,6 +267,42 @@ def record_apply_row(
conn.commit()
def record_remote_existing_incident(
conn: sqlite3.Connection,
*,
form_response_uuid: str,
job_uuid: str,
apply_run_id: int,
desired_count: int,
remote_rows: List[Dict[str, Any]],
action: str,
reason: str,
) -> int:
remote_active_count = sum(1 for row in remote_rows if is_active_remote_job_material(row))
cur = conn.execute(
"""
INSERT INTO quote_template_remote_existing_incidents (
detected_at, form_response_uuid, job_uuid, apply_run_id, desired_count,
remote_count, remote_active_count, action, reason, remote_rows_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
utc_now(),
form_response_uuid,
job_uuid,
apply_run_id,
desired_count,
len(remote_rows),
remote_active_count,
action,
reason,
json.dumps(remote_rows, ensure_ascii=False, sort_keys=True),
),
)
conn.commit()
return int(cur.lastrowid)
def list_pending(conn: sqlite3.Connection) -> List[Dict[str, Any]]:
rows = conn.execute(
"""
@@ -263,6 +340,7 @@ def main() -> int:
parser.add_argument("--db", default=str(POLL_DB_PATH), help="Poll DB path")
parser.add_argument("--apply", action="store_true", help="Actually create ServiceM8 jobMaterial records. Default is dry-run.")
parser.add_argument("--force", action="store_true", help="Allow apply even if generated_job_materials already contains rows for this form response")
parser.add_argument("--force-remote-existing", action="store_true", help="Allow apply even when ServiceM8 already has jobMaterial rows for the target job; records a forced incident before creating")
parser.add_argument("--list", action="store_true", help="List parsed polled Quote Template responses and exit")
parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON output")
args = parser.parse_args()
@@ -343,6 +421,45 @@ def main() -> int:
session = requests.Session()
session.headers.update({"X-Api-Key": api_key, "Accept": "application/json", "Content-Type": "application/json"})
remote_existing_rows = list_remote_job_materials(session, job_uuid)
if remote_existing_rows:
remote_active_count = sum(1 for remote_row in remote_existing_rows if is_active_remote_job_material(remote_row))
action = "forced" if args.force_remote_existing else "blocked"
reason = (
f"Remote ServiceM8 job already has {len(remote_existing_rows)} jobMaterial row(s) "
f"({remote_active_count} active); no creates attempted"
if not args.force_remote_existing
else f"Remote ServiceM8 job already has {len(remote_existing_rows)} jobMaterial row(s) "
f"({remote_active_count} active); create forced by --force-remote-existing"
)
incident_id = record_remote_existing_incident(
conn,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
apply_run_id=run_id,
desired_count=len(desired_rows),
remote_rows=remote_existing_rows,
action=action,
reason=reason,
)
result["remote_existing"] = {
"incident_id": incident_id,
"action": action,
"remote_count": len(remote_existing_rows),
"remote_active_count": remote_active_count,
"reason": reason,
}
if not args.force_remote_existing:
finish_apply_run(conn, run_id, status="blocked_remote_existing", created_count=0)
conn.execute(
"UPDATE quote_template_form_responses SET processed_at = ?, process_status = ?, process_error = NULL WHERE form_response_uuid = ?",
(utc_now(), "blocked_remote_existing", form_response_uuid),
)
conn.commit()
result["status"] = "blocked_remote_existing"
print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False))
return 0
created_count = 0
for idx, row in enumerate(desired_rows, start=1):
api_payload = build_payload(job_uuid, row)