diff --git a/PROJECT-PROGRESS.md b/PROJECT-PROGRESS.md index b98c1b2..85f0862 100644 --- a/PROJECT-PROGRESS.md +++ b/PROJECT-PROGRESS.md @@ -47,9 +47,12 @@ The poller stores all fetched form responses in `form_responses_raw`, then parse - dry-run by default - `--apply` performs live ServiceM8 `jobmaterial.json` creates - refuses duplicate apply when generated material rows already exist for that form response unless `--force` is used -- Apply tracking tables in `servicem8_formresponse_poll.db`: + - before any live create, checks remote ServiceM8 `/jobmaterial.json` for existing rows on the target `job_uuid` + - if remote rows already exist, records a remote-existing incident and creates nothing unless `--force-remote-existing` is explicitly used +- Apply/incident tracking tables in `servicem8_formresponse_poll.db`: - `quote_template_apply_runs` - `quote_template_apply_run_rows` + - `quote_template_remote_existing_incidents` - Created ServiceM8 job material mappings are recorded in: - `servicem8_quote_materials_state.db` @@ -73,15 +76,20 @@ Current apply payload rules: - stores/parses results - applies any parsed Quote Template responses that are not already marked/applied - logs each run under `logs/poll-and-apply-YYYYMMDD-HHMMSS.log` -- Safety option now available: +- Safety options now available: - `--dry-run` runs the same poll/selection flow, but previews the ServiceM8 `jobmaterial` payloads only - dry-run does **not** write to ServiceM8 - dry-run does **not** mark quote responses as applied + - live apply blocks if remote ServiceM8 already has jobMaterial rows for the job, logging the captured rows/counts to `quote_template_remote_existing_incidents` + - `--recheck-remote-existing` revisits previously blocked rows without overriding the safety gate + - `--force-remote-existing` explicitly overrides the remote-existing safety gate and still records a forced incident before creating - Examples: - `./poll_and_apply_quote_templates.sh` - `./poll_and_apply_quote_templates.sh --hours 48` - `./poll_and_apply_quote_templates.sh --since '2026-05-04 08:00:00'` - `./poll_and_apply_quote_templates.sh --dry-run --hours 48` + - `./poll_and_apply_quote_templates.sh --recheck-remote-existing --hours 48` + - `./poll_and_apply_quote_templates.sh --force-remote-existing --hours 48` This is the proposed scheduled entry point for soft release, e.g. every 10–30 minutes. For manual confidence checks, run it with `--dry-run` first, inspect the generated payloads/log, then rerun without `--dry-run` only when ready to apply. diff --git a/apply_polled_quote_template_jobmaterials.py b/apply_polled_quote_template_jobmaterials.py index 8e89c28..ac01281 100755 --- a/apply_polled_quote_template_jobmaterials.py +++ b/apply_polled_quote_template_jobmaterials.py @@ -39,6 +39,10 @@ def utc_now() -> str: return datetime.now(timezone.utc).isoformat() +def escape_filter_value(value: str) -> str: + return value.replace("'", "''") + + def material_uuid_for_row(row: dict) -> str: kind = row.get("kind", "") if kind == "include_header": @@ -72,6 +76,23 @@ def create_job_material(session: requests.Session, payload: dict) -> str: return record_uuid +def list_remote_job_materials(session: requests.Session, job_uuid: str) -> List[Dict[str, Any]]: + """Return existing remote ServiceM8 jobMaterial rows for a job.""" + filter_expr = f"job_uuid eq '{escape_filter_value(job_uuid)}'" + response = session.get(f"{BASE_URL}/jobmaterial.json", params={"$filter": filter_expr}, timeout=REQUEST_TIMEOUT) + if not response.ok: + raise RuntimeError(f"Remote jobMaterial preflight failed: HTTP {response.status_code} :: {response.text[:1000]}") + data = response.json() + if not isinstance(data, list): + raise RuntimeError(f"Remote jobMaterial preflight expected list response, got {type(data).__name__}") + return data + + +def is_active_remote_job_material(row: Dict[str, Any]) -> bool: + value = row.get("active", 1) + return value not in (0, "0", False, "false", "False") + + def load_api_key() -> str: for name in ("SERVICEM8_ACCESS_TOKEN", "SERVICEM8_API_KEY"): value = os.getenv(name) @@ -92,7 +113,7 @@ def load_api_key() -> str: def get_conn(db_path: Path = POLL_DB_PATH) -> sqlite3.Connection: - conn = sqlite3.connect(db_path) + conn = sqlite3.connect(db_path, timeout=30) conn.row_factory = sqlite3.Row return conn @@ -134,8 +155,28 @@ def init_apply_tables(conn: sqlite3.Connection) -> None: ) """ ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS quote_template_remote_existing_incidents ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + detected_at TEXT NOT NULL, + form_response_uuid TEXT NOT NULL, + job_uuid TEXT NOT NULL, + apply_run_id INTEGER, + desired_count INTEGER NOT NULL DEFAULT 0, + remote_count INTEGER NOT NULL DEFAULT 0, + remote_active_count INTEGER NOT NULL DEFAULT 0, + action TEXT NOT NULL, + reason TEXT, + remote_rows_json TEXT NOT NULL, + FOREIGN KEY(apply_run_id) REFERENCES quote_template_apply_runs(id) + ) + """ + ) conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_apply_runs_form_response ON quote_template_apply_runs(form_response_uuid)") conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_apply_rows_run ON quote_template_apply_run_rows(run_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_remote_existing_form_response ON quote_template_remote_existing_incidents(form_response_uuid)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_quote_remote_existing_job_uuid ON quote_template_remote_existing_incidents(job_uuid)") conn.commit() @@ -153,7 +194,7 @@ def existing_created_for_form(form_response_uuid: str) -> int: if not STATE_DB_PATH.exists(): return 0 try: - with closing(sqlite3.connect(STATE_DB_PATH)) as conn: + with closing(sqlite3.connect(STATE_DB_PATH, timeout=30)) as conn: row = conn.execute( "select count(*) from generated_job_materials where form_response_uuid = ?", (form_response_uuid,), @@ -226,6 +267,42 @@ def record_apply_row( conn.commit() +def record_remote_existing_incident( + conn: sqlite3.Connection, + *, + form_response_uuid: str, + job_uuid: str, + apply_run_id: int, + desired_count: int, + remote_rows: List[Dict[str, Any]], + action: str, + reason: str, +) -> int: + remote_active_count = sum(1 for row in remote_rows if is_active_remote_job_material(row)) + cur = conn.execute( + """ + INSERT INTO quote_template_remote_existing_incidents ( + detected_at, form_response_uuid, job_uuid, apply_run_id, desired_count, + remote_count, remote_active_count, action, reason, remote_rows_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + utc_now(), + form_response_uuid, + job_uuid, + apply_run_id, + desired_count, + len(remote_rows), + remote_active_count, + action, + reason, + json.dumps(remote_rows, ensure_ascii=False, sort_keys=True), + ), + ) + conn.commit() + return int(cur.lastrowid) + + def list_pending(conn: sqlite3.Connection) -> List[Dict[str, Any]]: rows = conn.execute( """ @@ -263,6 +340,7 @@ def main() -> int: parser.add_argument("--db", default=str(POLL_DB_PATH), help="Poll DB path") parser.add_argument("--apply", action="store_true", help="Actually create ServiceM8 jobMaterial records. Default is dry-run.") parser.add_argument("--force", action="store_true", help="Allow apply even if generated_job_materials already contains rows for this form response") + parser.add_argument("--force-remote-existing", action="store_true", help="Allow apply even when ServiceM8 already has jobMaterial rows for the target job; records a forced incident before creating") parser.add_argument("--list", action="store_true", help="List parsed polled Quote Template responses and exit") parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON output") args = parser.parse_args() @@ -343,6 +421,45 @@ def main() -> int: session = requests.Session() session.headers.update({"X-Api-Key": api_key, "Accept": "application/json", "Content-Type": "application/json"}) + remote_existing_rows = list_remote_job_materials(session, job_uuid) + if remote_existing_rows: + remote_active_count = sum(1 for remote_row in remote_existing_rows if is_active_remote_job_material(remote_row)) + action = "forced" if args.force_remote_existing else "blocked" + reason = ( + f"Remote ServiceM8 job already has {len(remote_existing_rows)} jobMaterial row(s) " + f"({remote_active_count} active); no creates attempted" + if not args.force_remote_existing + else f"Remote ServiceM8 job already has {len(remote_existing_rows)} jobMaterial row(s) " + f"({remote_active_count} active); create forced by --force-remote-existing" + ) + incident_id = record_remote_existing_incident( + conn, + form_response_uuid=form_response_uuid, + job_uuid=job_uuid, + apply_run_id=run_id, + desired_count=len(desired_rows), + remote_rows=remote_existing_rows, + action=action, + reason=reason, + ) + result["remote_existing"] = { + "incident_id": incident_id, + "action": action, + "remote_count": len(remote_existing_rows), + "remote_active_count": remote_active_count, + "reason": reason, + } + if not args.force_remote_existing: + finish_apply_run(conn, run_id, status="blocked_remote_existing", created_count=0) + conn.execute( + "UPDATE quote_template_form_responses SET processed_at = ?, process_status = ?, process_error = NULL WHERE form_response_uuid = ?", + (utc_now(), "blocked_remote_existing", form_response_uuid), + ) + conn.commit() + result["status"] = "blocked_remote_existing" + print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False)) + return 0 + created_count = 0 for idx, row in enumerate(desired_rows, start=1): api_payload = build_payload(job_uuid, row) diff --git a/poll_and_apply_quote_templates.sh b/poll_and_apply_quote_templates.sh index 84d1a40..d46c359 100755 --- a/poll_and_apply_quote_templates.sh +++ b/poll_and_apply_quote_templates.sh @@ -12,7 +12,8 @@ set -euo pipefail # By default this wrapper applies unapplied parsed responses. Use --dry-run to # run the poll and preview each pending apply without writing to ServiceM8. # The apply script still refuses duplicate applies unless --force is explicitly -# passed through. +# passed through. It also checks ServiceM8 for existing jobMaterial rows on +# the target job and blocks the apply unless --force-remote-existing is passed. SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" POLL_SCRIPT="$SCRIPT_DIR/poll_form_responses_since.py" @@ -24,11 +25,13 @@ QUOTE_TEMPLATE_FORM_UUID="${SERVICEM8_QUOTE_TEMPLATE_FORM_UUID:-3621b6be-1d19-47 SINCE="" HOURS="24" FORCE="0" +FORCE_REMOTE_EXISTING="0" +RECHECK_REMOTE_EXISTING="0" DRY_RUN="0" usage() { cat < HTMLResponse: Polled form responses Polled quote templates Apply runs + Remote existing incidents Generated materials """ @@ -160,15 +161,22 @@ def dashboard(): "poll_runs": 0, "polled_form_responses": 0, "polled_quote_templates": 0, + "remote_existing_incidents": 0, } latest_poll_run = None latest_polled_form = None latest_polled_quote = None + latest_remote_existing_incident = None try: with closing(get_poll_conn()) as conn: poll_counts["poll_runs"] = conn.execute("select count(*) from poll_runs").fetchone()[0] poll_counts["polled_form_responses"] = conn.execute("select count(*) from form_responses_raw").fetchone()[0] poll_counts["polled_quote_templates"] = conn.execute("select count(*) from quote_template_form_responses").fetchone()[0] + try: + poll_counts["remote_existing_incidents"] = conn.execute("select count(*) from quote_template_remote_existing_incidents").fetchone()[0] + latest_remote_existing_incident = conn.execute("select detected_at from quote_template_remote_existing_incidents order by id desc limit 1").fetchone() + except sqlite3.Error: + pass latest_poll_run = conn.execute("select finished_at from poll_runs order by id desc limit 1").fetchone() latest_polled_form = conn.execute("select timestamp from form_responses_raw order by timestamp desc limit 1").fetchone() latest_polled_quote = conn.execute("select discovered_at from quote_template_form_responses order by discovered_at desc limit 1").fetchone() @@ -196,6 +204,7 @@ def dashboard():
Latest poll run
{escape(latest_poll_run[0] if latest_poll_run else '—')}
Latest polled form timestamp
{escape(latest_polled_form[0] if latest_polled_form else '—')}
Latest polled quote discovered
{escape(latest_polled_quote[0] if latest_polled_quote else '—')}
+
Latest remote-existing incident
{escape(latest_remote_existing_incident[0] if latest_remote_existing_incident else '—')}
@@ -208,6 +217,7 @@ def dashboard():
  • Browse parsed polled Quote Template responses
  • Browse poll runs
  • Browse dry-run/apply runs
  • +
  • Browse remote-existing incidents
  • Browse generated job-material state
  • @@ -542,6 +552,90 @@ def form_response_detail(row_id: int): return html_page(f"Form response {row_id}", body) +@app.get("/poll/remote-existing-incidents", response_class=HTMLResponse) +def list_remote_existing_incidents(page: int = Query(1, ge=1)): + offset = (page - 1) * PAGE_SIZE + try: + with closing(get_poll_conn()) as conn: + rows = conn.execute( + """ + select id, detected_at, form_response_uuid, job_uuid, apply_run_id, + desired_count, remote_count, remote_active_count, action, reason + from quote_template_remote_existing_incidents + order by id desc + limit ? offset ? + """, + (PAGE_SIZE, offset), + ).fetchall() + except sqlite3.Error as e: + return html_page("Remote existing incidents", f"
    Incident table unavailable: {escape(str(e))}
    ") + + table_rows = [] + for row in rows: + run_link = f"{row['apply_run_id']}" if row['apply_run_id'] else "" + table_rows.append( + f"{row['id']}" + f"{escape(row['detected_at'] or '')}{escape(row['action'] or '')}" + f"{escape(row['form_response_uuid'])}" + f"{escape(row['job_uuid'] or '')}{run_link}" + f"{row['desired_count']}{row['remote_count']}{row['remote_active_count']}" + f"{escape((row['reason'] or '')[:180])}" + ) + + body = f""" + + + {''.join(table_rows) or ""} +
    IDDetectedActionForm response UUIDJob UUIDApply runDesiredRemote rowsActiveReason
    No remote-existing incidents found.
    + + """ + return html_page("Remote existing incidents", body) + + +@app.get("/poll/remote-existing-incidents/{incident_id}", response_class=HTMLResponse) +def remote_existing_incident_detail(incident_id: int): + try: + with closing(get_poll_conn()) as conn: + row = conn.execute("select * from quote_template_remote_existing_incidents where id = ?", (incident_id,)).fetchone() + except sqlite3.Error as e: + return html_page("Remote existing incident", f"
    Incident table unavailable: {escape(str(e))}
    ") + if not row: + raise HTTPException(status_code=404, detail="Remote existing incident not found") + + remote_rows = parse_json_field(row["remote_rows_json"], []) or [] + material_rows = [] + if isinstance(remote_rows, list): + for item in remote_rows: + if isinstance(item, dict): + material_rows.append( + f"{escape(str(item.get('uuid', '')))}{escape(str(item.get('active', '')))}" + f"{escape(str(item.get('name', '')))}{escape(str(item.get('material_uuid', '')))}" + f"{escape(str(item.get('quantity', '')))}{escape(str(item.get('price', '')))}" + f"{escape(str(item.get('sort_order', '')))}" + ) + run_link = f"{row['apply_run_id']}" if row['apply_run_id'] else "" + body = f""" +
    +
    ID
    {row['id']}
    +
    Detected
    {escape(row['detected_at'] or '')}
    +
    Action
    {escape(row['action'] or '')}
    +
    Form response UUID
    {escape(row['form_response_uuid'])}
    +
    Job UUID
    {escape(row['job_uuid'] or '')}
    +
    Apply run
    {run_link}
    +
    Desired rows
    {row['desired_count']}
    +
    Remote rows
    {row['remote_count']}
    +
    Remote active rows
    {row['remote_active_count']}
    +
    Reason
    {escape(row['reason'] or '')}
    +
    +

    Remote jobMaterial rows

    {''.join(material_rows) or ""}
    UUIDActiveNameMaterial UUIDQtyPriceSort
    No remote rows captured.
    +

    Raw remote rows JSON

    {escape(pretty_json(remote_rows))}
    + """ + return html_page(f"Remote existing incident {incident_id}", body) + + @app.get("/poll/apply-runs", response_class=HTMLResponse) def list_apply_runs(page: int = Query(1, ge=1)): offset = (page - 1) * PAGE_SIZE @@ -592,11 +686,23 @@ def apply_run_detail(run_id: int): "select * from quote_template_apply_run_rows where run_id = ? order by row_index asc", (run_id,), ).fetchall() + incidents = conn.execute( + "select id, detected_at, action, remote_count, remote_active_count, reason from quote_template_remote_existing_incidents where apply_run_id = ? order by id desc", + (run_id,), + ).fetchall() except sqlite3.Error as e: return html_page("Apply run", f"
    Apply-run table unavailable: {escape(str(e))}
    ") if not run: raise HTTPException(status_code=404, detail="Apply run not found") + incident_rows = [] + for incident in incidents: + incident_rows.append( + f"{incident['id']}" + f"{escape(incident['detected_at'] or '')}{escape(incident['action'] or '')}" + f"{incident['remote_count']}{incident['remote_active_count']}{escape((incident['reason'] or '')[:180])}" + ) + table_rows = [] for row in rows: payload = parse_json_field(row['api_payload_json'], {}) or {} @@ -620,6 +726,7 @@ def apply_run_detail(run_id: int):
    Created
    {run['created_count']}
    Error
    {escape(run['error'] or '')}
    +

    Remote-existing incidents

    {''.join(incident_rows) or ""}
    IDDetectedActionRemote rowsActiveReason
    No remote-existing incidents for this run.

    Rows

    {''.join(table_rows) or ""}
    #ActionKindNameCreated UUIDSource questionError
    No rows recorded.
    """ return html_page(f"Apply run {run_id}", body) @@ -842,13 +949,23 @@ def polled_quote_template_detail(form_response_uuid: str): "select id, mode, status, started_at, finished_at, desired_count, created_count, error from quote_template_apply_runs where form_response_uuid = ? order by id desc limit 8", (row['form_response_uuid'],), ).fetchall() + recent_incidents = conn.execute( + "select id, detected_at, action, remote_count, remote_active_count, reason from quote_template_remote_existing_incidents where form_response_uuid = ? order by id desc limit 8", + (row['form_response_uuid'],), + ).fetchall() except sqlite3.Error: recent_runs = [] + recent_incidents = [] recent_run_rows = [] for run in recent_runs: recent_run_rows.append( f"{run['id']}{escape(run['mode'] or '')}{escape(run['status'] or '')}{escape(run['started_at'] or '')}{escape(run['finished_at'] or '')}{run['desired_count']}{run['created_count']}{escape((run['error'] or '')[:120])}" ) + recent_incident_rows = [] + for incident in recent_incidents: + recent_incident_rows.append( + f"{incident['id']}{escape(incident['detected_at'] or '')}{escape(incident['action'] or '')}{incident['remote_count']}{incident['remote_active_count']}{escape((incident['reason'] or '')[:120])}" + ) body = f"""
    @@ -877,6 +994,10 @@ def polled_quote_template_detail(form_response_uuid: str):

    Recent dry-run/apply runs for this response

    {''.join(recent_run_rows) or ""}
    IDModeStatusStartedFinishedDesiredCreatedError
    No dry-run/apply runs yet.
    +
    +

    Remote-existing incidents for this response

    + {''.join(recent_incident_rows) or ""}
    IDDetectedActionRemote rowsActiveReason
    No remote-existing incidents yet.
    +

    Desired jobMaterial rows

    {''.join(material_rows) or ""}
    SortKindNameMaterial UUIDSource question
    No desired jobMaterial rows.

    Parsed JSON

    {escape(pretty_json(parsed))}
    """ diff --git a/servicem8_quote_template_parser.py b/servicem8_quote_template_parser.py index 9c3826e..61c15ea 100644 --- a/servicem8_quote_template_parser.py +++ b/servicem8_quote_template_parser.py @@ -34,7 +34,7 @@ def clean_text(value: Any) -> str: def get_state_conn(db_path: Path = STATE_DB_PATH): - conn = sqlite3.connect(db_path) + conn = sqlite3.connect(db_path, timeout=30) conn.row_factory = sqlite3.Row return conn