Files
plumbing/servicem8_webhook_receiver-v1.py
T
2026-04-28 09:44:22 +10:00

98 lines
2.6 KiB
Python
Executable File

import json
import logging
import os
import sqlite3
from contextlib import closing
from datetime import datetime, timezone
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, PlainTextResponse
DB_PATH = os.getenv('WEBHOOK_DB_PATH', './servicem8_webhooks.db')
APP_HOST = os.getenv('WEBHOOK_HOST', '0.0.0.0')
APP_PORT = int(os.getenv('WEBHOOK_PORT', '8000'))
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger('servicem8-webhook-receiver')
app = FastAPI(title='ServiceM8 Webhook Receiver', version='1.0.0')
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with closing(get_conn()) as conn:
conn.execute(
'''
CREATE TABLE IF NOT EXISTS webhook_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
client_host TEXT,
method TEXT NOT NULL,
path TEXT NOT NULL,
headers_json TEXT,
payload_json TEXT NOT NULL
)
'''
)
conn.commit()
@app.on_event('startup')
async def startup_event():
init_db()
logger.info('SQLite DB ready at %s', DB_PATH)
@app.get('/health')
async def health():
return {'ok': True}
@app.post('/webhooks/servicem8-job-updated')
async def servicem8_webhook(request: Request):
try:
payload = await request.json()
except Exception:
body = await request.body()
payload = {'_raw_body': body.decode('utf-8', errors='replace')}
headers = dict(request.headers)
client_host = request.client.host if request.client else None
received_at = datetime.now(timezone.utc).isoformat()
with closing(get_conn()) as conn:
conn.execute(
'''
INSERT INTO webhook_events (
received_at,
client_host,
method,
path,
headers_json,
payload_json
) VALUES (?, ?, ?, ?, ?, ?)
''',
(
received_at,
client_host,
request.method,
request.url.path,
json.dumps(headers),
json.dumps(payload),
),
)
conn.commit()
logger.info('Webhook received from %s and stored', client_host)
return PlainTextResponse('OK', status_code=200)
if __name__ == '__main__':
import uvicorn
uvicorn.run('servicem8_webhook_receiver:app', host=APP_HOST, port=APP_PORT, reload=False)