296 lines
9.3 KiB
Python
Executable File
296 lines
9.3 KiB
Python
Executable File
import json
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
from contextlib import closing
|
|
from datetime import datetime, timezone
|
|
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.responses import PlainTextResponse
|
|
from servicem8_quote_template_parser import (
|
|
QUOTE_TEMPLATE_FORM_UUID,
|
|
STATE_DB_PATH,
|
|
init_state_db as init_quote_state_db,
|
|
parse_quote_template_form_response,
|
|
)
|
|
|
|
DB_PATH = os.getenv("WEBHOOK_DB_PATH", "./servicem8_webhooks.db")
|
|
APP_HOST = os.getenv("WEBHOOK_HOST", "0.0.0.0")
|
|
APP_PORT = int(os.getenv("WEBHOOK_PORT", "18354"))
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
logger = logging.getLogger("servicem8-webhook-receiver")
|
|
|
|
app = FastAPI(title="ServiceM8 Webhook Receiver", version="1.2.1")
|
|
|
|
|
|
def get_conn():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_db():
|
|
with closing(get_conn()) as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS webhook_events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
received_at TEXT NOT NULL,
|
|
client_host TEXT,
|
|
method TEXT NOT NULL,
|
|
path TEXT NOT NULL,
|
|
headers_json TEXT,
|
|
payload_json TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS webhook_objects (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
received_at TEXT NOT NULL,
|
|
client_host TEXT,
|
|
method TEXT NOT NULL,
|
|
path TEXT NOT NULL,
|
|
object_type TEXT,
|
|
object_uuid TEXT,
|
|
changed_fields_json TEXT,
|
|
object_time_utc TEXT,
|
|
resource_url TEXT,
|
|
headers_json TEXT,
|
|
payload_json TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS webhook_form_responses (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
received_at TEXT NOT NULL,
|
|
client_host TEXT,
|
|
method TEXT NOT NULL,
|
|
path TEXT NOT NULL,
|
|
headers_json TEXT,
|
|
payload_json TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
|
|
conn.commit()
|
|
|
|
init_quote_state_db(STATE_DB_PATH)
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
init_db()
|
|
logger.info("SQLite DB ready at %s", DB_PATH)
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"ok": True}
|
|
|
|
|
|
async def parse_request_payload(request: Request):
|
|
content_type = request.headers.get("content-type", "").lower()
|
|
|
|
if "application/json" in content_type:
|
|
try:
|
|
return await request.json()
|
|
except Exception:
|
|
body = await request.body()
|
|
return {"_raw_body": body.decode("utf-8", errors="replace")}
|
|
|
|
if "application/x-www-form-urlencoded" in content_type or "multipart/form-data" in content_type:
|
|
try:
|
|
form = await request.form()
|
|
return dict(form)
|
|
except Exception:
|
|
body = await request.body()
|
|
return {"_raw_body": body.decode("utf-8", errors="replace")}
|
|
|
|
try:
|
|
return await request.json()
|
|
except Exception:
|
|
body = await request.body()
|
|
return {"_raw_body": body.decode("utf-8", errors="replace")}
|
|
|
|
|
|
def maybe_handle_challenge(payload, client_host):
|
|
if isinstance(payload, dict) and payload.get("mode") == "subscribe" and payload.get("challenge"):
|
|
challenge = str(payload["challenge"])
|
|
logger.info("ServiceM8 webhook verification received from %s", client_host)
|
|
return PlainTextResponse(challenge, status_code=200)
|
|
return None
|
|
|
|
|
|
def store_simple_event(table_name, request: Request, client_host: str, received_at: str, headers: dict, payload):
|
|
with closing(get_conn()) as conn:
|
|
conn.execute(
|
|
f"""
|
|
INSERT INTO {table_name} (
|
|
received_at,
|
|
client_host,
|
|
method,
|
|
path,
|
|
headers_json,
|
|
payload_json
|
|
) VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
received_at,
|
|
client_host,
|
|
request.method,
|
|
request.url.path,
|
|
json.dumps(headers),
|
|
json.dumps(payload),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def queue_quote_template_jobmaterials(payload, received_at: str):
|
|
try:
|
|
parsed = parse_quote_template_form_response(payload)
|
|
except Exception as exc:
|
|
logger.warning("Quote template parser failed: %s", exc)
|
|
return
|
|
|
|
form_uuid = parsed.get("form_uuid", "")
|
|
if form_uuid != QUOTE_TEMPLATE_FORM_UUID:
|
|
return
|
|
|
|
queue_path = os.path.join(os.path.dirname(DB_PATH), "quote-template-jobmaterials-queue.jsonl")
|
|
queue_record = {
|
|
"queued_at": received_at,
|
|
"form_uuid": form_uuid,
|
|
"form_response_uuid": parsed.get("form_response_uuid", ""),
|
|
"job_uuid": parsed.get("job_uuid", ""),
|
|
"description": parsed.get("description", ""),
|
|
"desired_job_materials": parsed.get("desired_job_materials", []),
|
|
}
|
|
with open(queue_path, "a", encoding="utf-8") as fh:
|
|
fh.write(json.dumps(queue_record, ensure_ascii=False) + "\n")
|
|
|
|
logger.info(
|
|
"Queued quote-template jobmaterials: form_response_uuid=%s job_uuid=%s rows=%s",
|
|
queue_record["form_response_uuid"],
|
|
queue_record["job_uuid"],
|
|
len(queue_record["desired_job_materials"]),
|
|
)
|
|
|
|
|
|
@app.post("/webhooks/servicem8-job-updated")
|
|
async def servicem8_event_webhook(request: Request):
|
|
payload = await parse_request_payload(request)
|
|
headers = dict(request.headers)
|
|
client_host = request.client.host if request.client else None
|
|
received_at = datetime.now(timezone.utc).isoformat()
|
|
|
|
challenge_response = maybe_handle_challenge(payload, client_host)
|
|
if challenge_response is not None:
|
|
return challenge_response
|
|
|
|
store_simple_event("webhook_events", request, client_host, received_at, headers, payload)
|
|
|
|
logger.info("Event webhook received from %s and stored", client_host)
|
|
return PlainTextResponse("OK", status_code=200)
|
|
|
|
|
|
@app.post("/webhooks/servicem8/form-response")
|
|
async def servicem8_form_response_webhook(request: Request):
|
|
payload = await parse_request_payload(request)
|
|
headers = dict(request.headers)
|
|
client_host = request.client.host if request.client else None
|
|
received_at = datetime.now(timezone.utc).isoformat()
|
|
|
|
challenge_response = maybe_handle_challenge(payload, client_host)
|
|
if challenge_response is not None:
|
|
return challenge_response
|
|
|
|
store_simple_event("webhook_form_responses", request, client_host, received_at, headers, payload)
|
|
queue_quote_template_jobmaterials(payload, received_at)
|
|
|
|
logger.info("Form response webhook received from %s and stored", client_host)
|
|
return PlainTextResponse("OK", status_code=200)
|
|
|
|
|
|
@app.post("/webhooks/servicem8-object")
|
|
async def servicem8_object_webhook(request: Request):
|
|
payload = await parse_request_payload(request)
|
|
headers = dict(request.headers)
|
|
client_host = request.client.host if request.client else None
|
|
received_at = datetime.now(timezone.utc).isoformat()
|
|
|
|
challenge_response = maybe_handle_challenge(payload, client_host)
|
|
if challenge_response is not None:
|
|
return challenge_response
|
|
|
|
object_type = None
|
|
object_uuid = None
|
|
changed_fields = None
|
|
object_time_utc = None
|
|
resource_url = None
|
|
|
|
if isinstance(payload, dict):
|
|
object_type = payload.get("object")
|
|
resource_url = payload.get("resource_url")
|
|
|
|
entry = payload.get("entry")
|
|
if isinstance(entry, list) and len(entry) > 0 and isinstance(entry[0], dict):
|
|
first_entry = entry[0]
|
|
object_uuid = first_entry.get("uuid")
|
|
changed_fields = first_entry.get("changed_fields")
|
|
object_time_utc = first_entry.get("time")
|
|
|
|
with closing(get_conn()) as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO webhook_objects (
|
|
received_at,
|
|
client_host,
|
|
method,
|
|
path,
|
|
object_type,
|
|
object_uuid,
|
|
changed_fields_json,
|
|
object_time_utc,
|
|
resource_url,
|
|
headers_json,
|
|
payload_json
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
received_at,
|
|
client_host,
|
|
request.method,
|
|
request.url.path,
|
|
object_type,
|
|
object_uuid,
|
|
json.dumps(changed_fields) if changed_fields is not None else None,
|
|
object_time_utc,
|
|
resource_url,
|
|
json.dumps(headers),
|
|
json.dumps(payload),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
logger.info(
|
|
"Object webhook received from %s: object=%s uuid=%s",
|
|
client_host,
|
|
object_type,
|
|
object_uuid,
|
|
)
|
|
return PlainTextResponse("OK", status_code=200)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run("servicem8_webhook_receiver:app", host=APP_HOST, port=APP_PORT, reload=False)
|