If the SIGNING_SECRET value is set then each request will be verified that it
came from your sesinetd instance and not from an attacker.
To run the script the mysql username and mysql password must be provided. The
default name the script uses for the database is sidefx_webhook.
python3 webhook_server.py <mysql username> <mysql password>
Note
Anytime the signing secret changes the webhook server must be updated so that it can correctly verify the requests from sesinetd.
import sys
import json
import hmac
import hashlib
import time
import os
import asyncio
import threading
import queue
import mysql.connector
from mysql.connector import errorcode
sys.path.append(os.path.expandvars("$HFS/houdini/python3.9libs/"))
import hwebserver
Response = hwebserver.Response
SIGNING_SECRET = None
DB_NAME = "sidefx_webhook"
USER = None
PASSWORD = None
TABLES = {}
TABLES['checkin'] = (
"CREATE TABLE `checkin` ("
" `license_id` char(8) NOT NULL,"
" `count` int(64) NOT NULL,"
" `product` varchar(100) NOT NULL,"
" `version` varchar(100) NOT NULL,"
" `total_tokens` int(64) NOT NULL DEFAULT 0,"
" `timestamp` int(11) NOT NULL"
")"
)
TABLES['checkout'] = (
"CREATE TABLE `checkout` ("
" `license_id` char(8) NOT NULL,"
" `count` int(64) NOT NULL,"
" `product` varchar(100) NOT NULL,"
" `version` varchar(100) NOT NULL,"
" `total_tokens` int(64) NOT NULL DEFAULT 0,"
" `timestamp` int(11) NOT NULL"
")"
)
TABLES['heartbeat'] = (
"CREATE TABLE `heartbeat` ("
" `license_id` char(8) NOT NULL,"
" `product` varchar(100) NOT NULL,"
" `version` varchar(100) NOT NULL,"
" `success_count` int(64) NOT NULL,"
" `fail_count` int(64) NOT NULL,"
" `timestamp` int(11) NOT NULL"
")"
)
def _compute_sidefx_signature(timestamp, body):
base_string = u"v0:{}:{}".format(timestamp, body.decode())
return b"v0=" + hmac.new(SIGNING_SECRET.encode(), msg=base_string.encode(),
digestmod=hashlib.sha256).hexdigest().encode()
def _sidefx_signature_errors(request):
"""Validate that the response was signed by sesinetd. See
https://sidefx.com/docs/houdini/licensing/webhooks/
"""
expected_signature = request.headers().get("X-SideFX-Signature")
if expected_signature is None:
return "Missing header X-SideFX-Signature"
expected_signature = expected_signature.encode()
timestamp = request.headers().get("X-SideFX-Request-Timestamp")
if timestamp is None:
return "Missing header X-SideFX-Request-Timestamp"
try:
float(timestamp)
except ValueError:
return "X-SideFX-Request-Timestamp is not a number"
# Watch out for replay attacks. Ignore anything that's more than five
# minutes old.
if abs(time.time() - float(timestamp)) > 300:
return "The timestamp is too old"
signature = _compute_sidefx_signature(timestamp, request.body())
if hasattr(hmac, "compare_digest"):
signature_matches = hmac.compare_digest(signature, expected_signature)
else:
signature_matches = signature == expected_signature
if not signature_matches:
return "Incorrect signature header"
return None
def _create_database(cursor):
"""Create the mysql db used to store all events the webserver receives."""
try:
cursor.execute(f"CREATE DATABASE {DB_NAME} DEFAULT CHARACTER SET 'utf8'")
except mysql.connector.Error as err:
print(f"Failed creating database: {err}")
sys.exit(1)
def _create_tables(cursor):
"""Create all tables for every event that the webserver may receive."""
try:
cursor.execute(f"USE {DB_NAME}")
except mysql.connector.Error as err:
print(f"Database {DB_NAME} does not exist.")
if err.errno == errorcode.ER_BAD_DB_ERROR:
_create_database(cursor)
print(f"Database {DB_NAME} created successfully.")
cnx.database = DB_NAME
else:
print(err)
sys.exit(1)
for table_name in TABLES:
table_description = TABLES[table_name]
try:
print(f"Creating table {table_name}: ", end='')
cursor.execute(table_description)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
print("already exists")
else:
print(err.msg)
else:
print("OK")
def handle_checkin(cursor, info):
"""Handle all checkin events by saving them into the `checkin` sql
table."""
sql = "INSERT INTO checkin (license_id, seat_id, user_machine, product, \
version, available, total_tokens, timestamp) VALUES (%(license_id)s, \
%(seat_id)s, %(user_machine)s, %(product)s, %(version)s, \
%(available)s, %(total_tokens)s, %(timestamp)s)"
timestamp = info["event_time"]
ev = info["event"]
values = {}
values["license_id"] = ev.get("license_id", "")
values["count"] = ev.get("count", 0)
values["product"] = ev.get("product", "")
values["version"] = ev.get("version", "")
values["total_tokens"] = ev.get("total_tokens", 0)
values["timestamp"] = timestamp
cursor.execute(sql, values)
return True
def handle_checkout(cursor, info):
"""Handle all checkout events by saving them into the `checkout` sql
table."""
sql = "INSERT INTO checkout (license_id, seat_id, user_machine, product, \
version, available, total_tokens, timestamp) VALUES (%(license_id)s, \
%(seat_id)s, %(user_machine)s, %(product)s, %(version)s, \
%(available)s, %(total_tokens)s, %(timestamp)s)"
timestamp = info["event_time"]
ev = info["event"]
values = {}
values["license_id"] = ev.get("license_id", "")
values["count"] = ev.get("count", 0)
values["product"] = ev.get("product", "")
values["version"] = ev.get("version", "")
values["total_tokens"] = ev.get("total_tokens", 0)
values["timestamp"] = timestamp
cursor.execute(sql, values)
return True
def handle_heartbeat(cursor, info):
"""Handle all heartbeat events by saving them into the `heartbeat` sql
table."""
sql = "INSERT INTO heartbeat (license_id, seat_id, user_machine, product, \
success, timestamp) VALUES (%(license_id)s, %(seat_id)s, \
%(user_machine)s, %(product)s, %(success)s, %(timestamp)s)"
timestamp = info["event_time"]
ev = info["event"]
values = {}
values["license_id"] = ev.get("license_id", "")
values["product"] = ev.get("product", "")
values["version"] = ev.get("version", "")
values["success_count"] = ev.get("success_count", 0)
values["fail_count"] = ev.get("fail_count", 0)
values["timestamp"] = timestamp
cursor.execute(sql, values)
return True
class StopToken():
"""Stop token to indicate to the queue we are done handling events."""
def __init__(self):
pass
def handle_rate_limit(ev):
"""Handle events that inform the server they have been rate limited."""
minute_rate_limited = ev.get("minute_rate_limited", 0)
print(f"Server has been rate limited: {minute_rate_limited}")
async def handle_webhook_events():
"""Handle the queue of webhook events that the webserver received."""
handlers = dict()
handlers["E100"] = handle_checkin
handlers["E101"] = handle_checkout
handlers["E102"] = handle_heartbeat
cnx = mysql.connector.connect(user=USER, password=PASSWORD)
cursor = cnx.cursor()
#_create_database(cursor)
_create_tables(cursor)
try:
while True:
try:
ev = EVENT_QUEUE.get()
if isinstance(ev, StopToken):
break
ev_type = ev.get("type", "")
if ev_type == "event_callback":
event_id = ev["event_id"]
fn = handlers.get(event_id)
if fn is not None:
if fn(cursor, ev):
cnx.commit()
elif ev_type == "rate_limited":
handle_rate_limit(ev)
except Exception as err:
print(err)
finally:
EVENT_QUEUE.task_done()
finally:
if cursor is not None:
cursor.close()
def asyncio_handle_webhooks():
"""Kick off handling the webhook events."""
loop = asyncio.DefaultEventLoopPolicy().new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(handle_webhook_events())
@hwebserver.urlHandler("/sidefx.webhook")
def webhook(request):
"""Handle all incoming webhook events. To ensure we handle these events
quickly we place all events onto a queue that a background thread handles.
If the server takes to long sesinetd will think the server is in a bad
state and will stop sending events."""
global EVENT_QUEUE
if request.contentType() != "application/json":
return Response(400, "Expected JSON body")
if SIGNING_SECRET is not None:
errors = _sidefx_signature_errors(request)
if errors is not None:
print(f"invalid signature: {errors}")
return Response(errors, 400)
data = json.loads(request.body())
if data["type"] == "url_verification":
resp_json = dict()
resp_json["challenge"] = data["challenge"]
return Response(json.dumps(resp_json), 200, "application/json")
# Handle the event in the background so that we can respond quickly. If we
# take to long then sesinetd will think this server is in a bad state.
EVENT_QUEUE.put(data)
return Response(b"", 200)
def main():
global SIGNING_SECRET, EVENT_QUEUE, USER, PASSWORD
SIGNING_SECRET = "8fjP3P1EYJiUwU9ONjRGw00UxgbHNm"
USER = sys.argv[1]
PASSWORD = sys.argv[2]
EVENT_QUEUE = queue.Queue()
background_thread = threading.Thread(target=asyncio_handle_webhooks)
background_thread.start()
try:
hwebserver.run(8080, debug=True, reload_source_changes=False)
finally:
# Add the stop token to the queue to inform the background worker
# thread to stop handling events.
EVENT_QUEUE.put(StopToken())
if background_thread is not None:
background_thread.join()
if __name__ == "__main__":
main()