Updates to the inspector view to fix matching quote forms to be able to display and also the apply script which now checks for existing jobMaterials before writing.

This commit is contained in:
2026-05-12 20:22:50 +09:30
parent c425f45910
commit c4248eba76
2 changed files with 131 additions and 9 deletions
+117 -3
View File
@@ -33,6 +33,11 @@ DEV_QUOTE_MATERIAL_UUID = "f78b1d23-b9fa-40fe-a806-2425fe09cc0b"
QUOTE_INCLUDE_HEADER_MATERIAL_UUID = "1924893b-917f-474a-adaa-2093bd622d4b"
QUOTE_EXCLUDE_HEADER_MATERIAL_UUID = "4947bfd7-4875-48f7-9caf-2093b9751b9b"
DEV_QUOTE_TAX_RATE_UUID = "84e4dd28-06b3-452b-a796-1f58a20ac49b"
QUOTE_DESCRIPTION_PREFIX = "Thank you for the opportunity to quote to "
QUOTE_DESCRIPTION_SUFFIX = (
"Please find below a detailed breakdown of the proposed costs included in the quotation. "
"If you have any questions or concerns, please do not hesitate to contact us."
)
def utc_now() -> str:
@@ -66,6 +71,69 @@ def build_payload(job_uuid: str, row: dict) -> dict:
}
def clean_text(value: Any) -> str:
if value is None:
return ""
return str(value).replace("\r\n", "\n").replace("\r", "\n").strip()
def first_text(*values: Any) -> str:
for value in values:
text = clean_text(value)
if text:
return text
return ""
def format_job_address(job: Dict[str, Any]) -> str:
direct = first_text(
job.get("job_address"),
job.get("site_address"),
job.get("address"),
job.get("location_address"),
job.get("billing_address"),
)
if direct:
return direct
parts = [
first_text(job.get("street"), job.get("street_address"), job.get("address_1"), job.get("address1")),
first_text(job.get("suburb"), job.get("city")),
first_text(job.get("state")),
first_text(job.get("postcode"), job.get("postal_code"), job.get("zip")),
]
return " ".join(part for part in parts if part)
def build_quote_description_text(description: str, job: Dict[str, Any]) -> str:
description = clean_text(description)
if not description:
return ""
address = format_job_address(job) or "the job address"
return f"{QUOTE_DESCRIPTION_PREFIX} {description} at {address}. {QUOTE_DESCRIPTION_SUFFIX}"
def build_job_update_payload(description: str, job: Dict[str, Any]) -> dict:
quote_description = build_quote_description_text(description, job)
return {"job_description": quote_description} if quote_description else {}
def retrieve_job(session: requests.Session, job_uuid: str) -> Dict[str, Any]:
response = session.get(f"{BASE_URL}/job/{job_uuid}.json", timeout=REQUEST_TIMEOUT)
if not response.ok:
raise RuntimeError(f"Job retrieve failed: HTTP {response.status_code} :: {response.text[:1000]}")
data = response.json()
if not isinstance(data, dict):
raise RuntimeError(f"Job retrieve expected object response, got {type(data).__name__}")
return data
def update_job_description(session: requests.Session, job_uuid: str, payload: dict) -> None:
response = session.post(f"{BASE_URL}/job/{job_uuid}.json", json=payload, timeout=REQUEST_TIMEOUT)
if not response.ok:
raise RuntimeError(f"Job description update failed: HTTP {response.status_code} :: {response.text[:1000]}")
def create_job_material(session: requests.Session, payload: dict) -> str:
response = session.post(f"{BASE_URL}/jobmaterial.json", json=payload, timeout=REQUEST_TIMEOUT)
if not response.ok:
@@ -393,7 +461,40 @@ def main() -> int:
}
try:
api_key = load_api_key()
session = requests.Session()
session.headers.update({"X-Api-Key": api_key, "Accept": "application/json", "Content-Type": "application/json"})
quote_description_source = clean_text(quote["description"])
job_details = retrieve_job(session, job_uuid) if quote_description_source else {}
job_update_payload = build_job_update_payload(quote_description_source, job_details)
job_update_record_payload = {
"endpoint": f"/job/{job_uuid}.json",
"payload": job_update_payload,
"source_description": quote_description_source,
"job_address": format_job_address(job_details) if job_details else "",
}
job_update_row = {
"kind": "job_description",
"source_question": "Description of Works to be Quoted",
"name": job_update_payload.get("job_description", ""),
}
if not args.apply:
if job_update_payload:
record_apply_row(
conn,
run_id=run_id,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
row_index=0,
row=job_update_row,
api_payload=job_update_record_payload,
action="would_update_job_description",
)
result["job_update"] = {"action": "would_update_job_description", **job_update_record_payload}
else:
result["job_update"] = {"action": "skipped", "reason": "Quote description is empty"}
for idx, row in enumerate(desired_rows, start=1):
api_payload = build_payload(job_uuid, row)
record_apply_row(
@@ -416,10 +517,7 @@ def main() -> int:
print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False))
return 0
api_key = load_api_key()
init_state_db(STATE_DB_PATH)
session = requests.Session()
session.headers.update({"X-Api-Key": api_key, "Accept": "application/json", "Content-Type": "application/json"})
remote_existing_rows = list_remote_job_materials(session, job_uuid)
if remote_existing_rows:
@@ -460,6 +558,22 @@ def main() -> int:
print(json.dumps(result, indent=2 if args.pretty else None, ensure_ascii=False))
return 0
if job_update_payload:
update_job_description(session, job_uuid, job_update_payload)
record_apply_row(
conn,
run_id=run_id,
form_response_uuid=form_response_uuid,
job_uuid=job_uuid,
row_index=0,
row=job_update_row,
api_payload=job_update_record_payload,
action="updated_job_description",
)
result["job_update"] = {"action": "updated_job_description", **job_update_record_payload}
else:
result["job_update"] = {"action": "skipped", "reason": "Quote description is empty"}
created_count = 0
for idx, row in enumerate(desired_rows, start=1):
api_payload = build_payload(job_uuid, row)
+14 -6
View File
@@ -93,6 +93,14 @@ def pretty_json(value):
return str(value)
def html_value(value):
if value is None:
return ""
if isinstance(value, (dict, list)):
return pretty_json(value)
return str(value)
def parse_json_field(value, default=None):
if value is None:
return default
@@ -520,10 +528,10 @@ def form_response_detail(row_id: int):
for item in sorted(field_data, key=lambda x: x.get("SortOrder", 0)):
field_rows.append(
f"<tr>"
f"<td>{escape(str(item.get('SortOrder', '')))}</td>"
f"<td>{escape(item.get('FieldType', ''))}</td>"
f"<td>{escape(item.get('Question', ''))}</td>"
f"<td>{escape(item.get('Response', ''))}</td>"
f"<td>{escape(html_value(item.get('SortOrder')))}</td>"
f"<td>{escape(html_value(item.get('FieldType')))}</td>"
f"<td>{escape(html_value(item.get('Question')))}</td>"
f"<td>{escape(html_value(item.get('Response')))}</td>"
f"</tr>"
)
@@ -851,8 +859,8 @@ def polled_form_response_detail(form_response_uuid: str):
for item in sorted(field_data, key=lambda x: x.get("SortOrder", 0) if isinstance(x, dict) else 0):
if isinstance(item, dict):
field_rows.append(
f"<tr><td>{escape(str(item.get('SortOrder', '')))}</td><td>{escape(item.get('FieldType', ''))}</td>"
f"<td>{escape(item.get('Question', ''))}</td><td>{escape(item.get('Response', ''))}</td></tr>"
f"<tr><td>{escape(html_value(item.get('SortOrder')))}</td><td>{escape(html_value(item.get('FieldType')))}</td>"
f"<td>{escape(html_value(item.get('Question')))}</td><td>{escape(html_value(item.get('Response')))}</td></tr>"
)
quote_link = f"<div><strong>Parsed quote</strong></div><div><a href='/poll/quote-template/{escape(form_response_uuid)}'>Open parsed Quote Template view</a></div>" if quote else ""