Intial new scripts for pivot logic approach
This commit is contained in:
Executable
+154
@@ -0,0 +1,154 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import json
|
||||
import sqlite3
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
DB_PATH = Path(__file__).with_name("servicem8_webhooks.db")
|
||||
STATE_PATH = Path(__file__).with_name(".quote_form_response_watch_state.json")
|
||||
QUOTE_TEMPLATE_FORM_UUID = "3621b6be-1d19-4756-9ab4-9d5e4120f6d9"
|
||||
|
||||
|
||||
def load_state(path: Path) -> Dict[str, Any]:
|
||||
if not path.exists():
|
||||
return {}
|
||||
try:
|
||||
return json.loads(path.read_text())
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def save_state(path: Path, state: Dict[str, Any]) -> None:
|
||||
path.write_text(json.dumps(state, indent=2, ensure_ascii=False) + "\n")
|
||||
|
||||
|
||||
def get_conn(db_path: Path) -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
|
||||
def query_latest_quote_response(db_path: Path) -> Optional[Dict[str, Any]]:
|
||||
sql = """
|
||||
SELECT
|
||||
id,
|
||||
received_at,
|
||||
json_extract(payload_json, '$.id') AS webhook_event_id,
|
||||
json_extract(payload_json, '$.type') AS event_type,
|
||||
json_extract(payload_json, '$.data.uuid') AS form_response_uuid,
|
||||
json_extract(payload_json, '$.data.form_uuid') AS form_uuid,
|
||||
json_extract(payload_json, '$.data.regarding_object_uuid') AS job_uuid,
|
||||
json_extract(payload_json, '$.data.edit_date') AS edit_date,
|
||||
json_extract(payload_json, '$.data.created_by_staff_uuid') AS created_by_staff_uuid,
|
||||
json_extract(payload_json, '$.data.status') AS status,
|
||||
payload_json
|
||||
FROM webhook_form_responses
|
||||
WHERE json_extract(payload_json, '$.data.form_uuid') = ?
|
||||
ORDER BY datetime(received_at) DESC, id DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
with get_conn(db_path) as conn:
|
||||
row = conn.execute(sql, (QUOTE_TEMPLATE_FORM_UUID,)).fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
|
||||
def build_summary(row: Dict[str, Any]) -> str:
|
||||
received_at = row.get("received_at") or "unknown"
|
||||
edit_date = row.get("edit_date") or "unknown"
|
||||
job_uuid = row.get("job_uuid") or "unknown"
|
||||
form_response_uuid = row.get("form_response_uuid") or "unknown"
|
||||
return (
|
||||
"New quote template form response received\n"
|
||||
f"- Received: {received_at}\n"
|
||||
f"- Edit date: {edit_date}\n"
|
||||
f"- Job UUID: {job_uuid}\n"
|
||||
f"- Form response UUID: {form_response_uuid}"
|
||||
)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Check for new quote template form responses in the ServiceM8 webhook DB")
|
||||
parser.add_argument("--db", default=str(DB_PATH), help="Path to servicem8_webhooks.db")
|
||||
parser.add_argument("--state", default=str(STATE_PATH), help="Path to local state file")
|
||||
parser.add_argument("--prime", action="store_true", help="Record the current latest event as seen without alerting")
|
||||
parser.add_argument("--json", action="store_true", help="Print machine-readable JSON")
|
||||
args = parser.parse_args()
|
||||
|
||||
db_path = Path(args.db)
|
||||
state_path = Path(args.state)
|
||||
|
||||
if not db_path.exists():
|
||||
print(f"Database not found: {db_path}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
latest = query_latest_quote_response(db_path)
|
||||
state = load_state(state_path)
|
||||
|
||||
if not latest:
|
||||
result = {
|
||||
"status": "no_quote_responses_found",
|
||||
"alert": False,
|
||||
"db_path": str(db_path),
|
||||
"state_path": str(state_path),
|
||||
}
|
||||
print(json.dumps(result, indent=2) if args.json else "No quote template form responses found in DB")
|
||||
return 0
|
||||
|
||||
current_marker = {
|
||||
"id": latest.get("id"),
|
||||
"received_at": latest.get("received_at"),
|
||||
"form_response_uuid": latest.get("form_response_uuid"),
|
||||
"job_uuid": latest.get("job_uuid"),
|
||||
"edit_date": latest.get("edit_date"),
|
||||
"checked_at": datetime.now().astimezone().isoformat(),
|
||||
}
|
||||
|
||||
if args.prime or not state:
|
||||
save_state(state_path, current_marker)
|
||||
result = {
|
||||
"status": "primed",
|
||||
"alert": False,
|
||||
"latest": current_marker,
|
||||
"db_path": str(db_path),
|
||||
"state_path": str(state_path),
|
||||
}
|
||||
print(json.dumps(result, indent=2) if args.json else f"Primed latest quote template event at {current_marker['received_at']}")
|
||||
return 0
|
||||
|
||||
last_seen_id = state.get("id")
|
||||
last_seen_form_response_uuid = state.get("form_response_uuid")
|
||||
is_new = (latest.get("id") != last_seen_id) or (
|
||||
latest.get("form_response_uuid") and latest.get("form_response_uuid") != last_seen_form_response_uuid
|
||||
)
|
||||
|
||||
if is_new:
|
||||
save_state(state_path, current_marker)
|
||||
result = {
|
||||
"status": "new_quote_response",
|
||||
"alert": True,
|
||||
"summary": build_summary(latest),
|
||||
"latest": current_marker,
|
||||
"previous": state,
|
||||
"db_path": str(db_path),
|
||||
"state_path": str(state_path),
|
||||
}
|
||||
print(json.dumps(result, indent=2) if args.json else result["summary"])
|
||||
return 0
|
||||
|
||||
result = {
|
||||
"status": "no_new_quote_response",
|
||||
"alert": False,
|
||||
"latest": current_marker,
|
||||
"previous": state,
|
||||
"db_path": str(db_path),
|
||||
"state_path": str(state_path),
|
||||
}
|
||||
print(json.dumps(result, indent=2) if args.json else f"No new quote template form response since {state.get('received_at')}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user