Initial commit

This commit is contained in:
2026-04-28 09:44:22 +10:00
commit 87bfe26890
11 changed files with 1252 additions and 0 deletions
+21
View File
@@ -0,0 +1,21 @@
### Servicem8 personal dashboard
Project is to create a simple task list style dashboard for each staff member which is clean and shows them what tasks they need to tend to in a less cluttered way then the default full ServiceM8 user interface and “dispatch board”.
Initially there may only be one actionable queue for each staff member - but we may update this later to have multiple queues/lists for each staff member, maybe organised by date that it arrived for them
We will also likely want to add the ability for staff to send/transfer any individual task to another staff member.
- Reference the ServiceM8 webhooks documentation as that is how our system will subscribe to event changes - https://developer.servicem8.com/docs/webhooks-overview
- We will subscribe to certain events that we want to know about
- This means we will need a webhook server listening which will initiate the operation on our end. (tech and platform to be decided, but needs to be lightweight and timely)
- (This is the process which is triggered following the webhook notification)
We will then need to retrieve details relating to the update using the REST API which can be referenced at - https://developer.servicem8.com/docs/rest-overview
- We will query this service when we receive webhook notifications for the full details
- When using the REST API, filters will need to be URL encoded/escaped before sending them
- Following this retrieval, the whole task assignment process (or task change process) will be guided from a workflow document which details which state change results in where/who the task should be assigned to.
- We will likely need to initially (on first start up or initiation) pull the details of all objects that we are interested in monitoring in order to do the state change analysis - so that we know what state the object was in and what it has changed to - we need this to understand and then act depending on the change.
- We will need a lightweight DB storage for backend job, activity, quote, audit trail etc.
- Task lists will be populated with the jobs or tasks or quotes that have come from the one ServiceM8 account
- We WILL NOT push any changes back to the ServiceM8 account at this point - we are an external repo only!
-
+249
View File
@@ -0,0 +1,249 @@
# List Webhook Subscriptions
List all your current webhook subscriptions
# OpenAPI definition
```json
{
"openapi": "3.1.0",
"info": {
"title": "Webhooks API",
"description": "API for managing webhook subscriptions. This specification includes 25 dynamically registered webhook events.",
"version": "1.0.0"
},
"servers": [
{
"url": "https://api.servicem8.com"
}
],
"paths": {
"/webhook_subscriptions": {
"get": {
"operationId": "get_webhook_subscriptions",
"tags": [
"Webhook Subscription"
],
"summary": "List Webhook Subscriptions",
"description": "List all your current webhook subscriptions\n",
"parameters": [
{
"in": "query",
"name": "status",
"schema": {
"type": "string",
"enum": [
"active",
"inactive",
"all"
],
"default": "active"
},
"description": "Filter subscriptions by status. Use `all` to include deactivated subscriptions."
}
],
"responses": {
"200": {
"description": "An array of Webhook Subscriptions",
"content": {
"application/json": {
"schema": {
"type": "array",
"items": {
"$ref": "#/components/schemas/Webhook"
}
},
"examples": {
"example-1": {
"value": [
{
"type": "object",
"object": "job",
"callback_url": "https://example.com/hooks/JobChanged",
"fields": [
"uuid",
"status"
],
"unique_id": "integration-a",
"active": true
},
{
"type": "event",
"event": "job.created",
"callback_url": "https://example.com/hooks/JobCreated",
"unique_id": "integration-a",
"active": true
},
{
"type": "object",
"object": "job",
"callback_url": "https://example.com/hooks/JobChanged",
"fields": [
"uuid",
"status"
],
"unique_id": "integration-a",
"active": false,
"last_failure_reason": "Webhook request failed for over 12 hours",
"last_failure_at": "2026-04-14 03:25:00"
}
]
}
}
}
}
}
}
}
}
},
"components": {
"schemas": {
"Webhook": {
"oneOf": [
{
"$ref": "#/components/schemas/ObjectWebhook"
},
{
"$ref": "#/components/schemas/EventWebhook"
}
]
},
"ObjectWebhook": {
"type": "object",
"required": [
"type",
"object",
"callback_url",
"fields",
"active"
],
"properties": {
"type": {
"type": "string",
"enum": [
"object"
],
"description": "Type of webhook subscription."
},
"object": {
"type": "string",
"description": "Object type for this subscription (e.g. job, company)."
},
"callback_url": {
"type": "string",
"format": "uri",
"description": "The URL that will receive the webhook when an update is triggered."
},
"fields": {
"type": "array",
"items": {
"type": "string"
},
"description": "List of fields watched for changes."
},
"unique_id": {
"type": "string",
"description": "Optional unique identifier for grouping subscriptions."
},
"active": {
"type": "boolean",
"description": "Whether the subscription is active."
},
"last_failure_reason": {
"type": [
"string",
"null"
],
"description": "The most recent recorded failure reason for this subscription, if any."
},
"last_failure_at": {
"type": [
"string",
"null"
],
"description": "Timestamp of the most recent recorded failure for this subscription, if any."
}
}
},
"EventWebhook": {
"type": "object",
"required": [
"type",
"event",
"callback_url",
"active"
],
"properties": {
"type": {
"type": "string",
"enum": [
"event"
],
"description": "Type of webhook subscription."
},
"event": {
"type": "string",
"description": "Event name for this subscription. See CreateEventWebhookRequest for the full list of supported events.",
"enum": [
"company.created",
"company.updated",
"form.response_created",
"inbox.message_received",
"job.badge_added",
"job.badge_removed",
"job.checked_in",
"job.checked_out",
"job.completed",
"job.created",
"job.invoice_paid",
"job.invoice_sent",
"job.note_added",
"job.photo_added",
"job.queued",
"job.quote_accepted",
"job.quote_sent",
"job.review_received",
"job.status_changed",
"job.updated",
"job.video_added",
"proposal.sent",
"proposal.viewed",
"staff.clocked_off",
"staff.clocked_on"
]
},
"callback_url": {
"type": "string",
"format": "uri",
"description": "The URL that will receive the webhook when the event occurs."
},
"unique_id": {
"type": "string",
"description": "Optional unique identifier for grouping subscriptions."
},
"active": {
"type": "boolean",
"description": "Whether the subscription is active."
},
"last_failure_reason": {
"type": [
"string",
"null"
],
"description": "The most recent recorded failure reason for this subscription, if any."
},
"last_failure_at": {
"type": [
"string",
"null"
],
"description": "Timestamp of the most recent recorded failure for this subscription, if any."
}
}
}
}
}
}
```
+148
View File
@@ -0,0 +1,148 @@
# Overview
# What is a Webhook?
A Webhook is a tool for retrieving and storing data from a certain event. They allow you to register an http\:// or https\:// URL where the event data can be received in JSON formats.
ServiceM8 Webhooks are commonly used for:
* Receiving scheduling changes
* Updating item/materials price changes in your app
* Notifying staff with real-time job changes
* Collecting data for data-warehousing
* Integrating your accounting software
Think of it this way, if you would otherwise have to poll for a substantial amount of data, you should be using webhooks.
> 📘 Note that updates only indicate that a particular field has changed, they do not include the value of the field. When your application receives a webhook update from ServiceM8, you will need to use the [REST API](/documentation/rest-api/overview) to retrieve the record and get the field values.
# Creating and Updating Subscriptions
Webhooks are subscribed to by using the Webhooks API endpoint: `https://api.servicem8.com/webhook_subscriptions`
The basic operations are:
* Add or modify a subscription.
* List each of your existing subscriptions.
* Delete subscriptions.
# Setting Up your Callback URL
First youll need to prepare the page that will act as your callback URL. This URL will need to be accessible by ServiceM8 servers, and be able to receive form-encoded POST data that is sent when an update happens, and in order to verify subscriptions.
This URL should always return a HTTP 200 response when invoked by ServiceM8.
# Handling Verification Requests
> 🚧 Public Applications Only
>
> Public applications (OAuth 2) must complete a challenge request to subscribe to webhooks.
>
> Note that API key requests are not required to complete the challenge request process.
When you add a new subscription, or modify an existing one, ServiceM8 servers will make a HTTP POST request to your callback URL in order to verify the validity of the callback server. The request will include the following POST parameters.
| Parameter | Value |
| :-------- | :--------------- |
| mode | subscribe |
| challenge | \<random string> |
When your server receives one of these requests, it needs to render a response to the request that includes only the challenge value. This confirms that this server is configured to accept callbacks, and is used for security verification on ServiceM8s side.
Here's a simple example of how to implement the verification step.
```php
<?php
if ($_REQUEST['mode'] == 'subscribe' && $_REQUEST['challenge']) {
echo $_REQUEST['challenge'];
} // else: handle webhook POST data
```
# Handling Webhooks
After your callback URL has been verified, ServiceM8 will send a HTTP POST to that URL each time one of the fields on the subscribed object changes. The data POSTed to your callback URL will be JSON in the following format:
```
{
"object": "job",
"entry": [{
"changed_fields": ["badges","generated_job_id"],
"time": "2015-01-01 00:00:00",
"uuid": "de305d54-75b4-431b-adb2-eb6b9e546013"
}],
"resource_url": "https://api.servicem8.com/api_1.0/job/de305d54-75b4-431b-adb2-eb6b9e546013.json"
}
```
The **entry** parameter is an array which contains a single object. To read the values you'll need to use code similar to `myValue = entry[0].time`.
Note that the data POSTed to your callback URL does not contain the object itself — only the UUID and a list of fields that changed. You can request the URL specified by resource\_url to obtain the object itself. The timestamp provided will be in UTC.
> 📘 Most timestamps in ServiceM8 are in the account's local timezone. Webhooks are different, as they use UTC timestamps.
## Callback URL Response Codes
| Code | Title | Description |
| :------ | :-------- | :------------------------------------------------------------------------------------------------------------ |
| 200 | Success | Webhook has completed successfully |
| 410 | Gone | Webhook will be unsubscribed (the same as if you had called the webhook subscription endpoint and removed it) |
| 429 | Throttled | Webhook request volume will be gradually throttled for 15 minutes for this account |
| 4xx/5xx | Error | Webhook will be retried for up to 72 hours, before automatically being cancelled |
## Troubleshooting Failed Webhooks
`GET /webhook_subscriptions` now supports a `status` query parameter so you can inspect deactivated subscriptions as part of debugging.
* `status=active` returns active subscriptions only. This is the default.
* `status=inactive` returns deactivated subscriptions only.
* `status=all` returns both active and inactive subscriptions.
If a webhook has stopped firing, request the inactive subscriptions and inspect the last recorded failure details:
```http
GET /webhook_subscriptions?status=inactive
```
Each returned subscription now includes these troubleshooting fields:
* `active` indicates whether the subscription is currently active.
* `last_failure_reason` contains the most recent failure reason recorded for the subscription, or `null` if none has been recorded.
* `last_failure_at` contains the UTC timestamp of the most recent recorded failure, or `null` if none has been recorded.
Example response:
```json
[
{
"type": "object",
"object": "job",
"callback_url": "https://example.com/hooks/JobChanged",
"fields": ["uuid", "status"],
"unique_id": "integration-a",
"active": false,
"last_failure_reason": "Webhook request failed for over 12 hours",
"last_failure_at": "2026-04-14 03:25:00"
}
]
```
Use `status=all` if you want to compare active and inactive subscriptions in the same response. When a subscription is successfully reactivated, its stored failure snapshot is cleared, so inspect the failure details before reactivating if you need them for debugging.
## How to Use the Webhooks API
Refer to the [Webhooks API Reference](https://developer.servicem8.com/reference/webhook-subscription) for details of the Webhooks endpoint.
## Which objects support webhooks?
The following objects/endpoints support webhooks:
* Job Activity
* Job
* Job Payment
* Note
* Task
* Material
* Company
* Attachment
* Form Response
+47
View File
@@ -0,0 +1,47 @@
import json
import os
import sys
import requests
BASE_URL = os.getenv('SERVICEM8_BASE_URL', 'https://api.servicem8.com')
ENDPOINT = '/webhook_subscriptions/event'
ACCESS_TOKEN = os.getenv('SERVICEM8_ACCESS_TOKEN', 'smk-ac525b-99c4b96305a49c7c-fe4dd3e705b647ea')
EVENT_NAME = os.getenv('SERVICEM8_EVENT', 'form.response_created')
CALLBACK_URL = os.getenv('SERVICEM8_CALLBACK_URL', 'https://nps-dev.coast2cloud.net/webhooks/servicem8/form-response')
UNIQUE_ID = os.getenv('SERVICEM8_UNIQUE_ID', 'dev-form-response')
def pretty_print_json(data):
print(json.dumps(data, indent=2, sort_keys=True))
def create_webhook():
url = f"{BASE_URL}{ENDPOINT}"
headers = {
'X-API-Key': f'{ACCESS_TOKEN}',
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
}
form_data = {
'event': EVENT_NAME,
'callback_url': CALLBACK_URL,
'unique_id': UNIQUE_ID,
}
response = requests.post(url, headers=headers, data=form_data, timeout=30)
print(f'HTTP {response.status_code}')
try:
response_json = response.json()
pretty_print_json(response_json)
except ValueError:
print('Response was not valid JSON:')
print(response.text)
sys.exit(1)
if not response.ok:
sys.exit(1)
if __name__ == '__main__':
create_webhook()
+65
View File
@@ -0,0 +1,65 @@
import json
import os
import sys
import requests
BASE_URL = os.getenv('SERVICEM8_BASE_URL', 'https://api.servicem8.com')
ENDPOINT = '/webhook_subscriptions'
ACCESS_TOKEN = os.getenv('SERVICEM8_ACCESS_TOKEN', 'smk-ac525b-99c4b96305a49c7c-fe4dd3e705b647ea')
OBJECT_NAME = os.getenv('SERVICEM8_OBJECT', 'job')
FIELDS = os.getenv('SERVICEM8_FIELDS', 'uuid,status,date,queue_uuid,queue_expiry_date,work_order_date,active,edit_date,generated_job_id')
CALLBACK_URL = os.getenv('SERVICEM8_CALLBACK_URL', 'https://nps-dev.coast2cloud.net/webhooks/servicem8-object')
UNIQUE_ID = os.getenv('SERVICEM8_UNIQUE_ID', 'dev-job-object')
def pretty_print_json(data):
print(json.dumps(data, indent=2, sort_keys=True))
def parse_fields(value):
return [field.strip() for field in value.split(',') if field.strip()]
def create_webhook():
if not ACCESS_TOKEN:
print('SERVICEM8_ACCESS_TOKEN is required')
sys.exit(1)
fields = parse_fields(FIELDS)
if not fields:
print('SERVICEM8_FIELDS must contain at least one field')
sys.exit(1)
url = f"{BASE_URL}{ENDPOINT}"
headers = {
'X-API-Key': ACCESS_TOKEN,
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
}
form_data = [
('type', 'object'),
('object', OBJECT_NAME),
('callback_url', CALLBACK_URL),
('unique_id', UNIQUE_ID),
]
for field in fields:
form_data.append(('fields', field))
response = requests.post(url, headers=headers, data=form_data, timeout=30)
print(f'HTTP {response.status_code}')
try:
response_json = response.json()
pretty_print_json(response_json)
except ValueError:
print('Response was not valid JSON:')
print(response.text)
sys.exit(1)
if not response.ok:
sys.exit(1)
if __name__ == '__main__':
create_webhook()
+47
View File
@@ -0,0 +1,47 @@
import json
import os
import sys
import requests
BASE_URL = os.getenv('SERVICEM8_BASE_URL', 'https://api.servicem8.com')
ENDPOINT = '/webhook_subscriptions/event'
ACCESS_TOKEN = os.getenv('SERVICEM8_ACCESS_TOKEN', '')
EVENT_NAME = os.getenv('SERVICEM8_EVENT', 'job.updated')
CALLBACK_URL = os.getenv('SERVICEM8_CALLBACK_URL', 'https://nps-dev.coast2cloud.net/webhooks/servicem8-job-updated')
UNIQUE_ID = os.getenv('SERVICEM8_UNIQUE_ID', 'dev-job-updated')
def pretty_print_json(data):
print(json.dumps(data, indent=2, sort_keys=True))
def create_webhook():
url = f"{BASE_URL}{ENDPOINT}"
headers = {
'X-API-Key': f'{ACCESS_TOKEN}',
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
}
form_data = {
'event': EVENT_NAME,
'callback_url': CALLBACK_URL,
'unique_id': UNIQUE_ID,
}
response = requests.post(url, headers=headers, data=form_data, timeout=30)
print(f'HTTP {response.status_code}')
try:
response_json = response.json()
pretty_print_json(response_json)
except ValueError:
print('Response was not valid JSON:')
print(response.text)
sys.exit(1)
if not response.ok:
sys.exit(1)
if __name__ == '__main__':
create_webhook()
+114
View File
@@ -0,0 +1,114 @@
import os
import sys
from typing import Any, Dict, List
import requests
BASE_URL = os.getenv('SERVICEM8_BASE_URL', 'https://api.servicem8.com')
ACCESS_TOKEN = os.getenv('SERVICEM8_ACCESS_TOKEN', 'smk-ac525b-99c4b96305a49c7c-fe4dd3e705b647ea')
STATUS = os.getenv('SERVICEM8_WEBHOOK_STATUS', 'all')
ENDPOINT = '/webhook_subscriptions'
def normalize(item: Dict[str, Any]) -> Dict[str, Any]:
sub_type = item.get('type', '-')
target = item.get('event') if sub_type == 'event' else item.get('object', '-')
fields = ','.join(item.get('fields', [])) if sub_type == 'object' else '-'
active = bool(item.get('active', False))
failure_reason = item.get('last_failure_reason') or ''
failure_at = item.get('last_failure_at') or ''
callback_url = item.get('callback_url', '-')
unique_id = item.get('unique_id') or '-'
status_label = 'ACTIVE' if active else 'INACTIVE'
issue = 'FAIL' if failure_reason else ''
sort_bucket = 0 if (not active or failure_reason) else 1
return {
'status': status_label,
'issue': issue,
'type': sub_type,
'target': target or '-',
'fields': fields or '-',
'unique_id': unique_id,
'callback_url': callback_url,
'failure_at': failure_at or '-',
'failure_reason': failure_reason or '-',
'_sort_bucket': sort_bucket,
}
def truncate(value: str, width: int) -> str:
value = str(value)
return value if len(value) <= width else value[: width - 3] + '...'
def render_table(rows: List[Dict[str, Any]]) -> None:
columns = [
('status', 'STATUS', 9),
('issue', 'ISSUE', 5),
('type', 'TYPE', 8),
('target', 'TARGET', 24),
('fields', 'FIELDS', 24),
('unique_id', 'UNIQUE_ID', 18),
('failure_at', 'FAILURE_AT', 19),
('callback_url', 'CALLBACK_URL', 42),
]
header = ' | '.join(title.ljust(width) for _, title, width in columns)
divider = '-+-'.join('-' * width for _, _, width in columns)
print(header)
print(divider)
for row in rows:
line = ' | '.join(truncate(row[key], width).ljust(width) for key, _, width in columns)
print(line)
def main() -> None:
if STATUS not in {'active', 'inactive', 'all'}:
print('SERVICEM8_WEBHOOK_STATUS must be one of: active, inactive, all')
sys.exit(1)
url = f'{BASE_URL}{ENDPOINT}'
headers = {
'X-API-Key': f'{ACCESS_TOKEN}',
'Accept': 'application/json',
}
params = {'status': STATUS}
response = requests.get(url, headers=headers, params=params, timeout=30)
print(f'HTTP {response.status_code}')
try:
data = response.json()
except ValueError:
print('Response was not valid JSON:')
print(response.text)
sys.exit(1)
if not response.ok:
print(data)
sys.exit(1)
rows = [normalize(item) for item in data]
rows.sort(key=lambda r: (r['_sort_bucket'], r['status'], r['type'], r['target']))
print(f'Subscriptions returned: {len(rows)}')
print(f'Filter: {STATUS}')
print()
if not rows:
print('No webhook subscriptions found.')
return
render_table(rows)
failures = [r for r in rows if r['failure_reason'] != '-']
if failures:
print('\nFailure details:')
for row in failures:
print(f"- {row['type']} {row['target']} -> {row['failure_reason']} (at {row['failure_at']})")
if __name__ == '__main__':
main()
+97
View File
@@ -0,0 +1,97 @@
import json
import logging
import os
import sqlite3
from contextlib import closing
from datetime import datetime, timezone
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, PlainTextResponse
DB_PATH = os.getenv('WEBHOOK_DB_PATH', './servicem8_webhooks.db')
APP_HOST = os.getenv('WEBHOOK_HOST', '0.0.0.0')
APP_PORT = int(os.getenv('WEBHOOK_PORT', '8000'))
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger('servicem8-webhook-receiver')
app = FastAPI(title='ServiceM8 Webhook Receiver', version='1.0.0')
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with closing(get_conn()) as conn:
conn.execute(
'''
CREATE TABLE IF NOT EXISTS webhook_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
client_host TEXT,
method TEXT NOT NULL,
path TEXT NOT NULL,
headers_json TEXT,
payload_json TEXT NOT NULL
)
'''
)
conn.commit()
@app.on_event('startup')
async def startup_event():
init_db()
logger.info('SQLite DB ready at %s', DB_PATH)
@app.get('/health')
async def health():
return {'ok': True}
@app.post('/webhooks/servicem8-job-updated')
async def servicem8_webhook(request: Request):
try:
payload = await request.json()
except Exception:
body = await request.body()
payload = {'_raw_body': body.decode('utf-8', errors='replace')}
headers = dict(request.headers)
client_host = request.client.host if request.client else None
received_at = datetime.now(timezone.utc).isoformat()
with closing(get_conn()) as conn:
conn.execute(
'''
INSERT INTO webhook_events (
received_at,
client_host,
method,
path,
headers_json,
payload_json
) VALUES (?, ?, ?, ?, ?, ?)
''',
(
received_at,
client_host,
request.method,
request.url.path,
json.dumps(headers),
json.dumps(payload),
),
)
conn.commit()
logger.info('Webhook received from %s and stored', client_host)
return PlainTextResponse('OK', status_code=200)
if __name__ == '__main__':
import uvicorn
uvicorn.run('servicem8_webhook_receiver:app', host=APP_HOST, port=APP_PORT, reload=False)
+209
View File
@@ -0,0 +1,209 @@
import json
import logging
import os
import sqlite3
from contextlib import closing
from datetime import datetime, timezone
from fastapi import FastAPI, Request
from fastapi.responses import PlainTextResponse
DB_PATH = os.getenv("WEBHOOK_DB_PATH", "./servicem8_webhooks.db")
APP_HOST = os.getenv("WEBHOOK_HOST", "0.0.0.0")
APP_PORT = int(os.getenv("WEBHOOK_PORT", "18354"))
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("servicem8-webhook-receiver")
app = FastAPI(title="ServiceM8 Webhook Receiver", version="1.1.0")
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with closing(get_conn()) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
client_host TEXT,
method TEXT NOT NULL,
path TEXT NOT NULL,
headers_json TEXT,
payload_json TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_objects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
client_host TEXT,
method TEXT NOT NULL,
path TEXT NOT NULL,
object_type TEXT,
object_uuid TEXT,
changed_fields_json TEXT,
object_time_utc TEXT,
resource_url TEXT,
headers_json TEXT,
payload_json TEXT NOT NULL
)
"""
)
conn.commit()
@app.on_event("startup")
async def startup_event():
init_db()
logger.info("SQLite DB ready at %s", DB_PATH)
@app.get("/health")
async def health():
return {"ok": True}
async def parse_request_payload(request: Request):
content_type = request.headers.get("content-type", "").lower()
if "application/json" in content_type:
try:
return await request.json()
except Exception:
body = await request.body()
return {"_raw_body": body.decode("utf-8", errors="replace")}
if "application/x-www-form-urlencoded" in content_type or "multipart/form-data" in content_type:
try:
form = await request.form()
return dict(form)
except Exception:
body = await request.body()
return {"_raw_body": body.decode("utf-8", errors="replace")}
try:
return await request.json()
except Exception:
body = await request.body()
return {"_raw_body": body.decode("utf-8", errors="replace")}
@app.post("/webhooks/servicem8-job-updated")
async def servicem8_event_webhook(request: Request):
payload = await parse_request_payload(request)
headers = dict(request.headers)
client_host = request.client.host if request.client else None
received_at = datetime.now(timezone.utc).isoformat()
with closing(get_conn()) as conn:
conn.execute(
"""
INSERT INTO webhook_events (
received_at,
client_host,
method,
path,
headers_json,
payload_json
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
received_at,
client_host,
request.method,
request.url.path,
json.dumps(headers),
json.dumps(payload),
),
)
conn.commit()
logger.info("Event webhook received from %s and stored", client_host)
return PlainTextResponse("OK", status_code=200)
@app.post("/webhooks/servicem8-object")
async def servicem8_object_webhook(request: Request):
payload = await parse_request_payload(request)
headers = dict(request.headers)
client_host = request.client.host if request.client else None
received_at = datetime.now(timezone.utc).isoformat()
if isinstance(payload, dict) and payload.get("mode") == "subscribe" and payload.get("challenge"):
challenge = str(payload["challenge"])
logger.info("ServiceM8 webhook verification received from %s", client_host)
return PlainTextResponse(challenge, status_code=200)
object_type = None
object_uuid = None
changed_fields = None
object_time_utc = None
resource_url = None
if isinstance(payload, dict):
object_type = payload.get("object")
resource_url = payload.get("resource_url")
entry = payload.get("entry")
if isinstance(entry, list) and len(entry) > 0 and isinstance(entry[0], dict):
first_entry = entry[0]
object_uuid = first_entry.get("uuid")
changed_fields = first_entry.get("changed_fields")
object_time_utc = first_entry.get("time")
with closing(get_conn()) as conn:
conn.execute(
"""
INSERT INTO webhook_objects (
received_at,
client_host,
method,
path,
object_type,
object_uuid,
changed_fields_json,
object_time_utc,
resource_url,
headers_json,
payload_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
received_at,
client_host,
request.method,
request.url.path,
object_type,
object_uuid,
json.dumps(changed_fields) if changed_fields is not None else None,
object_time_utc,
resource_url,
json.dumps(headers),
json.dumps(payload),
),
)
conn.commit()
logger.info(
"Object webhook received from %s: object=%s uuid=%s",
client_host,
object_type,
object_uuid,
)
return PlainTextResponse("OK", status_code=200)
if __name__ == "__main__":
import uvicorn
uvicorn.run("servicem8_webhook_receiver:app", host=APP_HOST, port=APP_PORT, reload=False)
+255
View File
@@ -0,0 +1,255 @@
import json
import logging
import os
import sqlite3
from contextlib import closing
from datetime import datetime, timezone
from fastapi import FastAPI, Request
from fastapi.responses import PlainTextResponse
DB_PATH = os.getenv("WEBHOOK_DB_PATH", "./servicem8_webhooks.db")
APP_HOST = os.getenv("WEBHOOK_HOST", "0.0.0.0")
APP_PORT = int(os.getenv("WEBHOOK_PORT", "18354"))
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("servicem8-webhook-receiver")
app = FastAPI(title="ServiceM8 Webhook Receiver", version="1.2.1")
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with closing(get_conn()) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
client_host TEXT,
method TEXT NOT NULL,
path TEXT NOT NULL,
headers_json TEXT,
payload_json TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_objects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
client_host TEXT,
method TEXT NOT NULL,
path TEXT NOT NULL,
object_type TEXT,
object_uuid TEXT,
changed_fields_json TEXT,
object_time_utc TEXT,
resource_url TEXT,
headers_json TEXT,
payload_json TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS webhook_form_responses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
received_at TEXT NOT NULL,
client_host TEXT,
method TEXT NOT NULL,
path TEXT NOT NULL,
headers_json TEXT,
payload_json TEXT NOT NULL
)
"""
)
conn.commit()
@app.on_event("startup")
async def startup_event():
init_db()
logger.info("SQLite DB ready at %s", DB_PATH)
@app.get("/health")
async def health():
return {"ok": True}
async def parse_request_payload(request: Request):
content_type = request.headers.get("content-type", "").lower()
if "application/json" in content_type:
try:
return await request.json()
except Exception:
body = await request.body()
return {"_raw_body": body.decode("utf-8", errors="replace")}
if "application/x-www-form-urlencoded" in content_type or "multipart/form-data" in content_type:
try:
form = await request.form()
return dict(form)
except Exception:
body = await request.body()
return {"_raw_body": body.decode("utf-8", errors="replace")}
try:
return await request.json()
except Exception:
body = await request.body()
return {"_raw_body": body.decode("utf-8", errors="replace")}
def maybe_handle_challenge(payload, client_host):
if isinstance(payload, dict) and payload.get("mode") == "subscribe" and payload.get("challenge"):
challenge = str(payload["challenge"])
logger.info("ServiceM8 webhook verification received from %s", client_host)
return PlainTextResponse(challenge, status_code=200)
return None
def store_simple_event(table_name, request: Request, client_host: str, received_at: str, headers: dict, payload):
with closing(get_conn()) as conn:
conn.execute(
f"""
INSERT INTO {table_name} (
received_at,
client_host,
method,
path,
headers_json,
payload_json
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
received_at,
client_host,
request.method,
request.url.path,
json.dumps(headers),
json.dumps(payload),
),
)
conn.commit()
@app.post("/webhooks/servicem8-job-updated")
async def servicem8_event_webhook(request: Request):
payload = await parse_request_payload(request)
headers = dict(request.headers)
client_host = request.client.host if request.client else None
received_at = datetime.now(timezone.utc).isoformat()
challenge_response = maybe_handle_challenge(payload, client_host)
if challenge_response is not None:
return challenge_response
store_simple_event("webhook_events", request, client_host, received_at, headers, payload)
logger.info("Event webhook received from %s and stored", client_host)
return PlainTextResponse("OK", status_code=200)
@app.post("/webhooks/servicem8/form-response")
async def servicem8_form_response_webhook(request: Request):
payload = await parse_request_payload(request)
headers = dict(request.headers)
client_host = request.client.host if request.client else None
received_at = datetime.now(timezone.utc).isoformat()
challenge_response = maybe_handle_challenge(payload, client_host)
if challenge_response is not None:
return challenge_response
store_simple_event("webhook_form_responses", request, client_host, received_at, headers, payload)
logger.info("Form response webhook received from %s and stored", client_host)
return PlainTextResponse("OK", status_code=200)
@app.post("/webhooks/servicem8-object")
async def servicem8_object_webhook(request: Request):
payload = await parse_request_payload(request)
headers = dict(request.headers)
client_host = request.client.host if request.client else None
received_at = datetime.now(timezone.utc).isoformat()
challenge_response = maybe_handle_challenge(payload, client_host)
if challenge_response is not None:
return challenge_response
object_type = None
object_uuid = None
changed_fields = None
object_time_utc = None
resource_url = None
if isinstance(payload, dict):
object_type = payload.get("object")
resource_url = payload.get("resource_url")
entry = payload.get("entry")
if isinstance(entry, list) and len(entry) > 0 and isinstance(entry[0], dict):
first_entry = entry[0]
object_uuid = first_entry.get("uuid")
changed_fields = first_entry.get("changed_fields")
object_time_utc = first_entry.get("time")
with closing(get_conn()) as conn:
conn.execute(
"""
INSERT INTO webhook_objects (
received_at,
client_host,
method,
path,
object_type,
object_uuid,
changed_fields_json,
object_time_utc,
resource_url,
headers_json,
payload_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
received_at,
client_host,
request.method,
request.url.path,
object_type,
object_uuid,
json.dumps(changed_fields) if changed_fields is not None else None,
object_time_utc,
resource_url,
json.dumps(headers),
json.dumps(payload),
),
)
conn.commit()
logger.info(
"Object webhook received from %s: object=%s uuid=%s",
client_host,
object_type,
object_uuid,
)
return PlainTextResponse("OK", status_code=200)
if __name__ == "__main__":
import uvicorn
uvicorn.run("servicem8_webhook_receiver:app", host=APP_HOST, port=APP_PORT, reload=False)
BIN
View File
Binary file not shown.