Files
plumbing/servicem8_webhook_receiver.py
T

297 lines
9.3 KiB
Python
Executable File

import json
import logging
import os
import sqlite3
from contextlib import closing
from datetime import datetime, timezone
from fastapi import FastAPI, Request
from fastapi.responses import PlainTextResponse
from servicem8_quote_template_parser import (
QUOTE_TEMPLATE_FORM_UUID,
STATE_DB_PATH,
init_state_db as init_quote_state_db,
parse_quote_template_form_response,
)
DB_PATH = os.getenv("WEBHOOK_DB_PATH", "./servicem8_webhooks.db")
APP_HOST = os.getenv("WEBHOOK_HOST", "0.0.0.0")
APP_PORT = int(os.getenv("WEBHOOK_PORT", "18354"))
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("servicem8-webhook-receiver")
app = FastAPI(title="ServiceM8 Webhook Receiver", version="1.2.1")
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with closing(get_conn()) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
client_host TEXT,
method TEXT NOT NULL,
path TEXT NOT NULL,
headers_json TEXT,
payload_json TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_objects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
client_host TEXT,
method TEXT NOT NULL,
path TEXT NOT NULL,
object_type TEXT,
object_uuid TEXT,
changed_fields_json TEXT,
object_time_utc TEXT,
resource_url TEXT,
headers_json TEXT,
payload_json TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_form_responses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
client_host TEXT,
method TEXT NOT NULL,
path TEXT NOT NULL,
headers_json TEXT,
payload_json TEXT NOT NULL
)
"""
)
conn.commit()
init_quote_state_db(STATE_DB_PATH)
@app.on_event("startup")
async def startup_event():
init_db()
logger.info("SQLite DB ready at %s", DB_PATH)
@app.get("/health")
async def health():
return {"ok": True}
async def parse_request_payload(request: Request):
content_type = request.headers.get("content-type", "").lower()
if "application/json" in content_type:
try:
return await request.json()
except Exception:
body = await request.body()
return {"_raw_body": body.decode("utf-8", errors="replace")}
if "application/x-www-form-urlencoded" in content_type or "multipart/form-data" in content_type:
try:
form = await request.form()
return dict(form)
except Exception:
body = await request.body()
return {"_raw_body": body.decode("utf-8", errors="replace")}
try:
return await request.json()
except Exception:
body = await request.body()
return {"_raw_body": body.decode("utf-8", errors="replace")}
def maybe_handle_challenge(payload, client_host):
if isinstance(payload, dict) and payload.get("mode") == "subscribe" and payload.get("challenge"):
challenge = str(payload["challenge"])
logger.info("ServiceM8 webhook verification received from %s", client_host)
return PlainTextResponse(challenge, status_code=200)
return None
def store_simple_event(table_name, request: Request, client_host: str, received_at: str, headers: dict, payload):
with closing(get_conn()) as conn:
conn.execute(
f"""
INSERT INTO {table_name} (
received_at,
client_host,
method,
path,
headers_json,
payload_json
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
received_at,
client_host,
request.method,
request.url.path,
json.dumps(headers),
json.dumps(payload),
),
)
conn.commit()
def queue_quote_template_jobmaterials(payload, received_at: str):
try:
parsed = parse_quote_template_form_response(payload)
except Exception as exc:
logger.warning("Quote template parser failed: %s", exc)
return
form_uuid = parsed.get("form_uuid", "")
if form_uuid != QUOTE_TEMPLATE_FORM_UUID:
return
queue_path = os.path.join(os.path.dirname(DB_PATH), "quote-template-jobmaterials-queue.jsonl")
queue_record = {
"queued_at": received_at,
"form_uuid": form_uuid,
"form_response_uuid": parsed.get("form_response_uuid", ""),
"job_uuid": parsed.get("job_uuid", ""),
"author_name": parsed.get("author_name", ""),
"description": parsed.get("description", ""),
"desired_job_materials": parsed.get("desired_job_materials", []),
}
with open(queue_path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(queue_record, ensure_ascii=False) + "\n")
logger.info(
"Queued quote-template jobmaterials: form_response_uuid=%s job_uuid=%s rows=%s",
queue_record["form_response_uuid"],
queue_record["job_uuid"],
len(queue_record["desired_job_materials"]),
)
@app.post("/webhooks/servicem8-job-updated")
async def servicem8_event_webhook(request: Request):
payload = await parse_request_payload(request)
headers = dict(request.headers)
client_host = request.client.host if request.client else None
received_at = datetime.now(timezone.utc).isoformat()
challenge_response = maybe_handle_challenge(payload, client_host)
if challenge_response is not None:
return challenge_response
store_simple_event("webhook_events", request, client_host, received_at, headers, payload)
logger.info("Event webhook received from %s and stored", client_host)
return PlainTextResponse("OK", status_code=200)
@app.post("/webhooks/servicem8/form-response")
async def servicem8_form_response_webhook(request: Request):
payload = await parse_request_payload(request)
headers = dict(request.headers)
client_host = request.client.host if request.client else None
received_at = datetime.now(timezone.utc).isoformat()
challenge_response = maybe_handle_challenge(payload, client_host)
if challenge_response is not None:
return challenge_response
store_simple_event("webhook_form_responses", request, client_host, received_at, headers, payload)
queue_quote_template_jobmaterials(payload, received_at)
logger.info("Form response webhook received from %s and stored", client_host)
return PlainTextResponse("OK", status_code=200)
@app.post("/webhooks/servicem8-object")
async def servicem8_object_webhook(request: Request):
payload = await parse_request_payload(request)
headers = dict(request.headers)
client_host = request.client.host if request.client else None
received_at = datetime.now(timezone.utc).isoformat()
challenge_response = maybe_handle_challenge(payload, client_host)
if challenge_response is not None:
return challenge_response
object_type = None
object_uuid = None
changed_fields = None
object_time_utc = None
resource_url = None
if isinstance(payload, dict):
object_type = payload.get("object")
resource_url = payload.get("resource_url")
entry = payload.get("entry")
if isinstance(entry, list) and len(entry) > 0 and isinstance(entry[0], dict):
first_entry = entry[0]
object_uuid = first_entry.get("uuid")
changed_fields = first_entry.get("changed_fields")
object_time_utc = first_entry.get("time")
with closing(get_conn()) as conn:
conn.execute(
"""
INSERT INTO webhook_objects (
received_at,
client_host,
method,
path,
object_type,
object_uuid,
changed_fields_json,
object_time_utc,
resource_url,
headers_json,
payload_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
received_at,
client_host,
request.method,
request.url.path,
object_type,
object_uuid,
json.dumps(changed_fields) if changed_fields is not None else None,
object_time_utc,
resource_url,
json.dumps(headers),
json.dumps(payload),
),
)
conn.commit()
logger.info(
"Object webhook received from %s: object=%s uuid=%s",
client_host,
object_type,
object_uuid,
)
return PlainTextResponse("OK", status_code=200)
if __name__ == "__main__":
import uvicorn
uvicorn.run("servicem8_webhook_receiver:app", host=APP_HOST, port=APP_PORT, reload=False)