Added the creation of jobmaterials script and the creation script ofr live updates to the ServiceM8 server (default is -dry-run). Altered inspector.py to now let us view jobmaterials DB.

This commit is contained in:
2026-04-28 16:41:06 +10:00
parent debc442081
commit 3472e770d6
3 changed files with 336 additions and 3 deletions
@@ -0,0 +1,134 @@
import argparse
import json
import os
import sys
from pathlib import Path
import requests
from servicem8_quote_template_parser import (
QUOTE_TEMPLATE_FORM_UUID,
STATE_DB_PATH,
init_state_db,
load_input_file,
parse_quote_template_form_response,
record_generated_job_material,
)
BASE_URL = os.getenv("SERVICEM8_BASE_URL", "https://api.servicem8.com/api_1.0")
ACCESS_TOKEN = os.getenv("SERVICEM8_ACCESS_TOKEN", "")
REQUEST_TIMEOUT = int(os.getenv("SERVICEM8_TIMEOUT", "30"))
def build_payload(job_uuid: str, row: dict) -> dict:
return {
"job_uuid": job_uuid,
"material_uuid": row["material_uuid"],
"name": row["name"],
"quantity": row["quantity"],
"price": row["price"],
"displayed_amount": row["displayed_amount"],
"displayed_amount_is_tax_inclusive": row["displayed_amount_is_tax_inclusive"],
"sort_order": row["sort_order"],
}
def create_job_material(session: requests.Session, payload: dict) -> str:
response = session.post(f"{BASE_URL}/jobmaterial.json", json=payload, timeout=REQUEST_TIMEOUT)
if not response.ok:
raise RuntimeError(f"Create failed: HTTP {response.status_code} :: {response.text}")
record_uuid = response.headers.get("x-record-uuid", "")
if not record_uuid:
raise RuntimeError("Create succeeded but x-record-uuid header was missing")
return record_uuid
def main():
parser = argparse.ArgumentParser(description="Create ServiceM8 jobMaterials from a Quote Template form response")
parser.add_argument("input", help="Path to JSON file containing full form response payload or a data object with field_data")
parser.add_argument("--apply", action="store_true", help="Actually create records in ServiceM8. Default is dry-run.")
parser.add_argument("--pretty", action="store_true", help="Pretty-print output JSON")
args = parser.parse_args()
init_state_db(STATE_DB_PATH)
payload = load_input_file(args.input)
parsed = parse_quote_template_form_response(payload)
form_uuid = parsed.get("form_uuid", "")
if form_uuid and form_uuid != QUOTE_TEMPLATE_FORM_UUID:
raise SystemExit(f"Not a Quote Template form response: {form_uuid}")
job_uuid = parsed.get("job_uuid", "")
form_response_uuid = parsed.get("form_response_uuid", "")
desired_rows = parsed.get("desired_job_materials", [])
if not job_uuid:
raise SystemExit("Missing job_uuid / regarding_object_uuid in form response")
result = {
"mode": "apply" if args.apply else "dry-run",
"job_uuid": job_uuid,
"form_response_uuid": form_response_uuid,
"count": len(desired_rows),
"rows": [],
"state_db_path": str(STATE_DB_PATH),
}
if not args.apply:
for row in desired_rows:
result["rows"].append(
{
"action": "would_create",
"kind": row["kind"],
"payload": build_payload(job_uuid, row),
"source_question": row.get("source_question", ""),
"source_field_uuid": row.get("source_field_uuid", ""),
}
)
print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False))
return
if not ACCESS_TOKEN:
raise SystemExit("SERVICEM8_ACCESS_TOKEN is required for --apply")
session = requests.Session()
session.headers.update(
{
"X-Api-Key": ACCESS_TOKEN,
"Accept": "application/json",
"Content-Type": "application/json",
}
)
for row in desired_rows:
api_payload = build_payload(job_uuid, row)
created_uuid = create_job_material(session, api_payload)
record_generated_job_material(
job_uuid=job_uuid,
form_response_uuid=form_response_uuid,
job_material_uuid=created_uuid,
kind=row.get("kind", ""),
source_field_uuid=row.get("source_field_uuid", ""),
source_question=row.get("source_question", ""),
source_text=row.get("name", ""),
db_path=STATE_DB_PATH,
)
result["rows"].append(
{
"action": "created",
"kind": row["kind"],
"job_material_uuid": created_uuid,
"payload": api_payload,
}
)
print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False))
if __name__ == "__main__":
try:
main()
except Exception as exc:
print(str(exc), file=sys.stderr)
sys.exit(1)
+112 -2
View File
@@ -1,5 +1,8 @@
import argparse import argparse
import json import json
import sqlite3
from contextlib import closing
from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
@@ -10,6 +13,18 @@ QUOTE_INCLUDE_ITEM_MATERIAL_UUID = "8c00ca29-2178-403e-be76-241cfaddeedb"
QUOTE_EXCLUDE_HEADER_FIELD_UUID = "5e9aeda9-2c59-43db-ba64-241f0b7812bd" QUOTE_EXCLUDE_HEADER_FIELD_UUID = "5e9aeda9-2c59-43db-ba64-241f0b7812bd"
QUOTE_EXCLUDE_HEADER_MATERIAL_UUID = "4947bfd7-4875-48f7-9caf-2093b9751b9b" QUOTE_EXCLUDE_HEADER_MATERIAL_UUID = "4947bfd7-4875-48f7-9caf-2093b9751b9b"
QUOTE_EXCLUDE_ITEM_MATERIAL_UUID = "8c00ca29-2178-403e-be76-241cfaddeedb" QUOTE_EXCLUDE_ITEM_MATERIAL_UUID = "8c00ca29-2178-403e-be76-241cfaddeedb"
STATE_DB_PATH = Path(__file__).with_name("servicem8_quote_materials_state.db")
EXTRA_INCLUDE_FIELDS = [
"Number of trades needed",
"Labour Hours Required",
"Materials",
"Excavation",
"Number of hours required",
"Scaffolding?",
"Scaffolding requirements",
"Equipment Hire?",
"Equipment required",
]
def clean_text(value: Any) -> str: def clean_text(value: Any) -> str:
@@ -18,6 +33,81 @@ def clean_text(value: Any) -> str:
return str(value).replace("\r\n", "\n").replace("\r", "\n").strip() return str(value).replace("\r\n", "\n").replace("\r", "\n").strip()
def get_state_conn(db_path: Path = STATE_DB_PATH):
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
def init_state_db(db_path: Path = STATE_DB_PATH):
with closing(get_state_conn(db_path)) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS generated_job_materials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_uuid TEXT NOT NULL,
form_response_uuid TEXT,
job_material_uuid TEXT,
kind TEXT NOT NULL,
source_field_uuid TEXT,
source_question TEXT,
source_text TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_generated_job_materials_job_uuid ON generated_job_materials(job_uuid)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_generated_job_materials_form_response_uuid ON generated_job_materials(form_response_uuid)"
)
conn.commit()
def record_generated_job_material(
*,
job_uuid: str,
form_response_uuid: str,
job_material_uuid: str,
kind: str,
source_field_uuid: str,
source_question: str,
source_text: str,
db_path: Path = STATE_DB_PATH,
):
now = datetime.now(timezone.utc).isoformat()
with closing(get_state_conn(db_path)) as conn:
conn.execute(
"""
INSERT INTO generated_job_materials (
job_uuid,
form_response_uuid,
job_material_uuid,
kind,
source_field_uuid,
source_question,
source_text,
created_at,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
job_uuid,
form_response_uuid,
job_material_uuid,
kind,
source_field_uuid,
source_question,
source_text,
now,
now,
),
)
conn.commit()
def parse_field_data(field_data: Any) -> List[Dict[str, Any]]: def parse_field_data(field_data: Any) -> List[Dict[str, Any]]:
if isinstance(field_data, str): if isinstance(field_data, str):
return json.loads(field_data) return json.loads(field_data)
@@ -57,6 +147,7 @@ def parse_quote_template_field_rows(field_rows: List[Dict[str, Any]]) -> Dict[st
description = "" description = ""
include_items: List[Dict[str, Any]] = [] include_items: List[Dict[str, Any]] = []
exclude_items: List[Dict[str, Any]] = [] exclude_items: List[Dict[str, Any]] = []
extra_include_items: List[Dict[str, Any]] = []
for row in ordered: for row in ordered:
question = clean_text(row.get("Question")) question = clean_text(row.get("Question"))
@@ -92,10 +183,23 @@ def parse_quote_template_field_rows(field_rows: List[Dict[str, Any]]) -> Dict[st
) )
continue continue
if question in EXTRA_INCLUDE_FIELDS and response:
extra_include_items.append(
{
"question": question,
"response": f"{question}: {response}",
"field_uuid": field_uuid,
"sort_order": int(row.get("SortOrder", 0) or 0),
}
)
continue
desired_job_materials: List[Dict[str, Any]] = [] desired_job_materials: List[Dict[str, Any]] = []
next_sort = 100 next_sort = 100
if include_items: combined_include_items = include_items + extra_include_items
if combined_include_items:
desired_job_materials.append( desired_job_materials.append(
build_job_material_line( build_job_material_line(
kind="include_header", kind="include_header",
@@ -107,7 +211,7 @@ def parse_quote_template_field_rows(field_rows: List[Dict[str, Any]]) -> Dict[st
) )
) )
next_sort += 10 next_sort += 10
for item in include_items: for item in combined_include_items:
desired_job_materials.append( desired_job_materials.append(
build_job_material_line( build_job_material_line(
kind="include_item", kind="include_item",
@@ -150,6 +254,7 @@ def parse_quote_template_field_rows(field_rows: List[Dict[str, Any]]) -> Dict[st
return { return {
"description": description, "description": description,
"include_items": include_items, "include_items": include_items,
"extra_include_items": extra_include_items,
"exclude_items": exclude_items, "exclude_items": exclude_items,
"desired_job_materials": desired_job_materials, "desired_job_materials": desired_job_materials,
} }
@@ -181,10 +286,15 @@ def main():
parser = argparse.ArgumentParser(description="Parse ServiceM8 Quote Template form responses into desired Job Material rows") parser = argparse.ArgumentParser(description="Parse ServiceM8 Quote Template form responses into desired Job Material rows")
parser.add_argument("input", help="Path to JSON file containing full form response payload or a data object with field_data") parser.add_argument("input", help="Path to JSON file containing full form response payload or a data object with field_data")
parser.add_argument("--pretty", action="store_true", help="Pretty-print output JSON") parser.add_argument("--pretty", action="store_true", help="Pretty-print output JSON")
parser.add_argument("--init-db", action="store_true", help="Initialize the local generated-job-materials state DB")
args = parser.parse_args() args = parser.parse_args()
if args.init_db:
init_state_db()
payload = load_input_file(args.input) payload = load_input_file(args.input)
parsed = parse_quote_template_form_response(payload) parsed = parse_quote_template_form_response(payload)
parsed["state_db_path"] = str(STATE_DB_PATH)
if args.pretty: if args.pretty:
print(json.dumps(parsed, indent=2, ensure_ascii=False)) print(json.dumps(parsed, indent=2, ensure_ascii=False))
+90 -1
View File
@@ -10,6 +10,7 @@ from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import HTMLResponse from fastapi.responses import HTMLResponse
DB_PATH = os.getenv("WEBHOOK_DB_PATH", "./servicem8_webhooks.db") DB_PATH = os.getenv("WEBHOOK_DB_PATH", "./servicem8_webhooks.db")
STATE_DB_PATH = os.getenv("WEBHOOK_STATE_DB_PATH", "./servicem8_quote_materials_state.db")
APP_HOST = os.getenv("WEBHOOK_INSPECTOR_HOST", "127.0.0.1") APP_HOST = os.getenv("WEBHOOK_INSPECTOR_HOST", "127.0.0.1")
APP_PORT = int(os.getenv("WEBHOOK_INSPECTOR_PORT", "18355")) APP_PORT = int(os.getenv("WEBHOOK_INSPECTOR_PORT", "18355"))
PAGE_SIZE = 50 PAGE_SIZE = 50
@@ -23,6 +24,12 @@ def get_conn():
return conn return conn
def get_state_conn():
conn = sqlite3.connect(STATE_DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def html_page(title: str, body: str) -> HTMLResponse: def html_page(title: str, body: str) -> HTMLResponse:
nav = """ nav = """
<nav> <nav>
@@ -30,6 +37,7 @@ def html_page(title: str, body: str) -> HTMLResponse:
<a href='/events'>Events</a> <a href='/events'>Events</a>
<a href='/objects'>Objects</a> <a href='/objects'>Objects</a>
<a href='/form-responses'>Form responses</a> <a href='/form-responses'>Form responses</a>
<a href='/generated-materials'>Generated materials</a>
</nav> </nav>
""" """
css = """ css = """
@@ -114,7 +122,7 @@ def link_with_params(path, **params):
@app.get("/health") @app.get("/health")
def health(): def health():
return {"ok": True, "db_path": DB_PATH} return {"ok": True, "db_path": DB_PATH, "state_db_path": STATE_DB_PATH}
@app.get("/", response_class=HTMLResponse) @app.get("/", response_class=HTMLResponse)
@@ -129,6 +137,17 @@ def dashboard():
latest_object = conn.execute("select received_at from webhook_objects order by id desc limit 1").fetchone() latest_object = conn.execute("select received_at from webhook_objects order by id desc limit 1").fetchone()
latest_form = conn.execute("select received_at from webhook_form_responses order by id desc limit 1").fetchone() latest_form = conn.execute("select received_at from webhook_form_responses order by id desc limit 1").fetchone()
state_count = 0
latest_generated = None
try:
with closing(get_state_conn()) as conn:
state_count = conn.execute("select count(*) from generated_job_materials").fetchone()[0]
latest_generated = conn.execute("select updated_at from generated_job_materials order by id desc limit 1").fetchone()
except sqlite3.Error:
pass
counts["generated_job_materials"] = state_count
cards = "".join( cards = "".join(
f"<div class='card'><div class='muted'>{escape(name)}</div><div class='big'>{count}</div></div>" f"<div class='card'><div class='muted'>{escape(name)}</div><div class='big'>{count}</div></div>"
for name, count in counts.items() for name, count in counts.items()
@@ -138,9 +157,11 @@ def dashboard():
<div class='card'> <div class='card'>
<div class='summary-grid'> <div class='summary-grid'>
<div><strong>DB path</strong></div><div><code>{escape(DB_PATH)}</code></div> <div><strong>DB path</strong></div><div><code>{escape(DB_PATH)}</code></div>
<div><strong>State DB path</strong></div><div><code>{escape(STATE_DB_PATH)}</code></div>
<div><strong>Latest event</strong></div><div>{escape(latest_event[0] if latest_event else '')}</div> <div><strong>Latest event</strong></div><div>{escape(latest_event[0] if latest_event else '')}</div>
<div><strong>Latest object</strong></div><div>{escape(latest_object[0] if latest_object else '')}</div> <div><strong>Latest object</strong></div><div>{escape(latest_object[0] if latest_object else '')}</div>
<div><strong>Latest form response</strong></div><div>{escape(latest_form[0] if latest_form else '')}</div> <div><strong>Latest form response</strong></div><div>{escape(latest_form[0] if latest_form else '')}</div>
<div><strong>Latest generated material row</strong></div><div>{escape(latest_generated[0] if latest_generated else '')}</div>
</div> </div>
</div> </div>
<div class='section'> <div class='section'>
@@ -149,6 +170,7 @@ def dashboard():
<li><a href='/events'>Browse webhook events</a></li> <li><a href='/events'>Browse webhook events</a></li>
<li><a href='/objects'>Browse object webhooks</a></li> <li><a href='/objects'>Browse object webhooks</a></li>
<li><a href='/form-responses'>Browse form responses</a></li> <li><a href='/form-responses'>Browse form responses</a></li>
<li><a href='/generated-materials'>Browse generated job-material state</a></li>
</ul> </ul>
</div> </div>
""" """
@@ -364,6 +386,73 @@ def list_form_responses(page: int = Query(1, ge=1)):
return html_page("Form responses", body) return html_page("Form responses", body)
@app.get("/generated-materials", response_class=HTMLResponse)
def list_generated_materials(page: int = Query(1, ge=1)):
offset = (page - 1) * PAGE_SIZE
try:
with closing(get_state_conn()) as conn:
rows = conn.execute(
"select id, job_uuid, form_response_uuid, job_material_uuid, kind, source_question, source_text, updated_at from generated_job_materials order by id desc limit ? offset ?",
(PAGE_SIZE, offset),
).fetchall()
except sqlite3.Error as e:
return html_page("Generated materials", f"<div class='card'>State DB unavailable: {escape(str(e))}</div>")
table_rows = []
for row in rows:
table_rows.append(
f"<tr>"
f"<td><a href='/generated-materials/{row['id']}'>{row['id']}</a></td>"
f"<td>{escape(row['job_uuid'] or '')}</td>"
f"<td>{escape(row['form_response_uuid'] or '')}</td>"
f"<td>{escape(row['job_material_uuid'] or '')}</td>"
f"<td>{escape(row['kind'] or '')}</td>"
f"<td>{escape(row['source_question'] or '')}</td>"
f"<td>{escape(row['updated_at'] or '')}</td>"
f"</tr>"
)
body = f"""
<table>
<thead><tr><th>ID</th><th>Job UUID</th><th>Form response UUID</th><th>Job material UUID</th><th>Kind</th><th>Source question</th><th>Updated</th></tr></thead>
<tbody>{''.join(table_rows) or "<tr><td colspan='7'>No rows found.</td></tr>"}</tbody>
</table>
<div class='pagination'>
{f"<a href='{link_with_params('/generated-materials', page=page-1)}'>← Prev</a>" if page > 1 else ''}
<a href='{link_with_params('/generated-materials', page=page+1)}'>Next →</a>
</div>
"""
return html_page("Generated materials", body)
@app.get("/generated-materials/{row_id}", response_class=HTMLResponse)
def generated_material_detail(row_id: int):
try:
with closing(get_state_conn()) as conn:
row = conn.execute("select * from generated_job_materials where id = ?", (row_id,)).fetchone()
except sqlite3.Error as e:
return html_page("Generated material detail", f"<div class='card'>State DB unavailable: {escape(str(e))}</div>")
if not row:
raise HTTPException(status_code=404, detail="Generated material row not found")
body = f"""
<div class='card summary-grid'>
<div><strong>ID</strong></div><div>{row['id']}</div>
<div><strong>Job UUID</strong></div><div>{escape(row['job_uuid'] or '')}</div>
<div><strong>Form response UUID</strong></div><div>{escape(row['form_response_uuid'] or '')}</div>
<div><strong>Job material UUID</strong></div><div>{escape(row['job_material_uuid'] or '')}</div>
<div><strong>Kind</strong></div><div>{escape(row['kind'] or '')}</div>
<div><strong>Source field UUID</strong></div><div>{escape(row['source_field_uuid'] or '')}</div>
<div><strong>Source question</strong></div><div>{escape(row['source_question'] or '')}</div>
<div><strong>Source text</strong></div><div>{escape(row['source_text'] or '')}</div>
<div><strong>Created</strong></div><div>{escape(row['created_at'] or '')}</div>
<div><strong>Updated</strong></div><div>{escape(row['updated_at'] or '')}</div>
</div>
"""
return html_page(f"Generated material {row_id}", body)
@app.get("/form-responses/{row_id}", response_class=HTMLResponse) @app.get("/form-responses/{row_id}", response_class=HTMLResponse)
def form_response_detail(row_id: int): def form_response_detail(row_id: int):
with closing(get_conn()) as conn: with closing(get_conn()) as conn: