Files
plumbing-dashy/backend/servicem8.py
T
2026-05-21 15:49:36 +10:00

113 lines
4.1 KiB
Python

import requests
import os
from datetime import datetime, date, timedelta
from sqlalchemy.orm import Session
from . import cally_models
from dotenv import load_dotenv
load_dotenv()
SM8_TOKEN = os.getenv("SERVICEM8_ACCESS_TOKEN")
BASE_URL = "https://api.servicem8.com/api_1.0"
def get_sm8_headers():
return {
"X-Api-Key": SM8_TOKEN,
"Accept": "application/json",
"Content-Type": "application/json"
}
def sync_from_servicem8(db: Session):
headers = get_sm8_headers()
# 1. Sync Staff
staff_resp = requests.get(f"{BASE_URL}/staff.json", headers=headers)
if staff_resp.status_code == 200:
staff_list = staff_resp.json()
for s in staff_list:
# We only care about active staff who are shown on the schedule
if s.get('active') == 1 and s.get('hide_from_schedule') == 0:
first = s.get('first', '')
last = s.get('last', '')
db_staff = cally_models.Sm8Staff(
uuid=s.get('uuid'),
name=f"{first} {last}".strip() or "Unknown Staff",
active=True
)
db.merge(db_staff)
db.commit()
# 2. Sync Schedule (Fetch last 30 days and next 60 days)
start_date = (date.today() - timedelta(days=31)).isoformat()
# ServiceM8 API v1.0 OData does not support 'ge', but supports 'gt'
# We use 'and' to combine filters
params = {
"$filter": f"activity_was_scheduled eq 1 and start_date gt '{start_date} 00:00:00'"
}
activity_resp = requests.get(f"{BASE_URL}/jobactivity.json", headers=headers, params=params)
if activity_resp.status_code == 200:
activities = activity_resp.json()
# Clear existing cached schedule in the range to avoid duplicates
db.query(cally_models.ScheduleDay).delete()
# Track busy days: {staff_uuid: {date_str: [job_uuids]}}
busy_map = {}
unique_job_uuids = set()
for a in activities:
staff_uuid = a.get('staff_uuid')
start_dt_str = a.get('start_date')
job_uuid = a.get('job_uuid')
if not staff_uuid or not start_dt_str or not job_uuid:
continue
# Extract date from 'YYYY-MM-DD HH:MM:SS'
activity_date_str = start_dt_str.split(' ')[0]
if staff_uuid not in busy_map:
busy_map[staff_uuid] = {}
if activity_date_str not in busy_map[staff_uuid]:
busy_map[staff_uuid][activity_date_str] = []
if job_uuid not in busy_map[staff_uuid][activity_date_str]:
busy_map[staff_uuid][activity_date_str].append(job_uuid)
unique_job_uuids.add(job_uuid)
# 3. Sync Job Details for found activities
# We need human-readable IDs (#123) from the 'job' table
for job_uuid in unique_job_uuids:
# Check if we already have this job detail
existing_job = db.query(cally_models.Sm8Job).filter(cally_models.Sm8Job.uuid == job_uuid).first()
if not existing_job:
job_resp = requests.get(f"{BASE_URL}/job/{job_uuid}.json", headers=headers)
if job_resp.status_code == 200:
j = job_resp.json()
db_job = cally_models.Sm8Job(
uuid=job_uuid,
job_id=j.get('generated_job_id', 'Unknown'),
address=j.get('job_address', '')
)
db.add(db_job)
db.commit()
# 4. Save Schedule to DB
for staff_uuid, dates_dict in busy_map.items():
for d_str, job_list in dates_dict.items():
d_obj = date.fromisoformat(d_str)
db_day = cally_models.ScheduleDay(
staff_uuid=staff_uuid,
date=d_obj,
is_busy=True,
job_count=len(job_list),
job_uuids=",".join(job_list)
)
db.add(db_day)
db.commit()
return True
return False