SIGNING_SECRET値が設定されている場合、各リクエストがsesinetdインスタンスから送信され、攻撃者からでないことが検証されます。
スクリプトを実行するには、mysqlユーザ名とmysqlパスワードを指定する必要があります。スクリプトがデータベースに使用するデフォルト名は、sidefx_webhookです。
python3 webhook_server.py <mysql username> <mysql password>
Note
署名シークレットを変更する度に、Webhookサーバーを更新して、sesinetdからのリクエストを正しく検証できるようにする必要があります。
import sys
import json
import hmac
import hashlib
import time
import os
import asyncio
import threading
import queue
import mysql.connector
from mysql.connector import errorcode
sys.path.append(os.path.expandvars("$HFS/houdini/python3.9libs/"))
import hwebserver
Response = hwebserver.Response
SIGNING_SECRET = None
DB_NAME = "sidefx_webhook"
USER = None
PASSWORD = None
TABLES = {}
TABLES['checkin'] = (
"CREATE TABLE `checkin` ("
" `license_id` char(8) NOT NULL,"
" `count` int(64) NOT NULL,"
" `product` varchar(100) NOT NULL,"
" `version` varchar(100) NOT NULL,"
" `total_tokens` int(64) NOT NULL DEFAULT 0,"
" `timestamp` int(11) NOT NULL"
")"
)
TABLES['checkout'] = (
"CREATE TABLE `checkout` ("
" `license_id` char(8) NOT NULL,"
" `count` int(64) NOT NULL,"
" `product` varchar(100) NOT NULL,"
" `version` varchar(100) NOT NULL,"
" `total_tokens` int(64) NOT NULL DEFAULT 0,"
" `timestamp` int(11) NOT NULL"
")"
)
TABLES['heartbeat'] = (
"CREATE TABLE `heartbeat` ("
" `license_id` char(8) NOT NULL,"
" `product` varchar(100) NOT NULL,"
" `version` varchar(100) NOT NULL,"
" `success_count` int(64) NOT NULL,"
" `fail_count` int(64) NOT NULL,"
" `timestamp` int(11) NOT NULL"
")"
)
def _compute_sidefx_signature(timestamp, body):
base_string = u"v0:{}:{}".format(timestamp, body.decode())
return b"v0=" + hmac.new(SIGNING_SECRET.encode(), msg=base_string.encode(),
digestmod=hashlib.sha256).hexdigest().encode()
def _sidefx_signature_errors(request):
"""Validate that the response was signed by sesinetd. See
https://sidefx.com/docs/houdini/licensing/webhooks/
"""
expected_signature = request.headers().get("X-SideFX-Signature")
if expected_signature is None:
return "Missing header X-SideFX-Signature"
expected_signature = expected_signature.encode()
timestamp = request.headers().get("X-SideFX-Request-Timestamp")
if timestamp is None:
return "Missing header X-SideFX-Request-Timestamp"
try:
float(timestamp)
except ValueError:
return "X-SideFX-Request-Timestamp is not a number"
# Watch out for replay attacks. Ignore anything that's more than five
# minutes old.
if abs(time.time() - float(timestamp)) > 300:
return "The timestamp is too old"
signature = _compute_sidefx_signature(timestamp, request.body())
if hasattr(hmac, "compare_digest"):
signature_matches = hmac.compare_digest(signature, expected_signature)
else:
signature_matches = signature == expected_signature
if not signature_matches:
return "Incorrect signature header"
return None
def _create_database(cursor):
"""Create the mysql db used to store all events the webserver receives."""
try:
cursor.execute(f"CREATE DATABASE {DB_NAME} DEFAULT CHARACTER SET 'utf8'")
except mysql.connector.Error as err:
print(f"Failed creating database: {err}")
sys.exit(1)
def _create_tables(cursor):
"""Create all tables for every event that the webserver may receive."""
try:
cursor.execute(f"USE {DB_NAME}")
except mysql.connector.Error as err:
print(f"Database {DB_NAME} does not exist.")
if err.errno == errorcode.ER_BAD_DB_ERROR:
_create_database(cursor)
print(f"Database {DB_NAME} created successfully.")
cnx.database = DB_NAME
else:
print(err)
sys.exit(1)
for table_name in TABLES:
table_description = TABLES[table_name]
try:
print(f"Creating table {table_name}: ", end='')
cursor.execute(table_description)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
print("already exists")
else:
print(err.msg)
else:
print("OK")
def handle_checkin(cursor, info):
"""Handle all checkin events by saving them into the `checkin` sql
table."""
sql = "INSERT INTO checkin (license_id, seat_id, user_machine, product, \
version, available, total_tokens, timestamp) VALUES (%(license_id)s, \
%(seat_id)s, %(user_machine)s, %(product)s, %(version)s, \
%(available)s, %(total_tokens)s, %(timestamp)s)"
timestamp = info["event_time"]
ev = info["event"]
values = {}
values["license_id"] = ev.get("license_id", "")
values["count"] = ev.get("count", 0)
values["product"] = ev.get("product", "")
values["version"] = ev.get("version", "")
values["total_tokens"] = ev.get("total_tokens", 0)
values["timestamp"] = timestamp
cursor.execute(sql, values)
return True
def handle_checkout(cursor, info):
"""Handle all checkout events by saving them into the `checkout` sql
table."""
sql = "INSERT INTO checkout (license_id, seat_id, user_machine, product, \
version, available, total_tokens, timestamp) VALUES (%(license_id)s, \
%(seat_id)s, %(user_machine)s, %(product)s, %(version)s, \
%(available)s, %(total_tokens)s, %(timestamp)s)"
timestamp = info["event_time"]
ev = info["event"]
values = {}
values["license_id"] = ev.get("license_id", "")
values["count"] = ev.get("count", 0)
values["product"] = ev.get("product", "")
values["version"] = ev.get("version", "")
values["total_tokens"] = ev.get("total_tokens", 0)
values["timestamp"] = timestamp
cursor.execute(sql, values)
return True
def handle_heartbeat(cursor, info):
"""Handle all heartbeat events by saving them into the `heartbeat` sql
table."""
sql = "INSERT INTO heartbeat (license_id, seat_id, user_machine, product, \
success, timestamp) VALUES (%(license_id)s, %(seat_id)s, \
%(user_machine)s, %(product)s, %(success)s, %(timestamp)s)"
timestamp = info["event_time"]
ev = info["event"]
values = {}
values["license_id"] = ev.get("license_id", "")
values["product"] = ev.get("product", "")
values["version"] = ev.get("version", "")
values["success_count"] = ev.get("success_count", 0)
values["fail_count"] = ev.get("fail_count", 0)
values["timestamp"] = timestamp
cursor.execute(sql, values)
return True
class StopToken():
"""Stop token to indicate to the queue we are done handling events."""
def __init__(self):
pass
def handle_rate_limit(ev):
"""Handle events that inform the server they have been rate limited."""
minute_rate_limited = ev.get("minute_rate_limited", 0)
print(f"Server has been rate limited: {minute_rate_limited}")
async def handle_webhook_events():
"""Handle the queue of webhook events that the webserver received."""
handlers = dict()
handlers["E100"] = handle_checkin
handlers["E101"] = handle_checkout
handlers["E102"] = handle_heartbeat
cnx = mysql.connector.connect(user=USER, password=PASSWORD)
cursor = cnx.cursor()
#_create_database(cursor)
_create_tables(cursor)
try:
while True:
try:
ev = EVENT_QUEUE.get()
if isinstance(ev, StopToken):
break
ev_type = ev.get("type", "")
if ev_type == "event_callback":
event_id = ev["event_id"]
fn = handlers.get(event_id)
if fn is not None:
if fn(cursor, ev):
cnx.commit()
elif ev_type == "rate_limited":
handle_rate_limit(ev)
except Exception as err:
print(err)
finally:
EVENT_QUEUE.task_done()
finally:
if cursor is not None:
cursor.close()
def asyncio_handle_webhooks():
"""Kick off handling the webhook events."""
loop = asyncio.DefaultEventLoopPolicy().new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(handle_webhook_events())
@hwebserver.urlHandler("/sidefx.webhook")
def webhook(request):
"""Handle all incoming webhook events. To ensure we handle these events
quickly we place all events onto a queue that a background thread handles.
If the server takes to long sesinetd will think the server is in a bad
state and will stop sending events."""
global EVENT_QUEUE
if request.contentType() != "application/json":
return Response(400, "Expected JSON body")
if SIGNING_SECRET is not None:
errors = _sidefx_signature_errors(request)
if errors is not None:
print(f"invalid signature: {errors}")
return Response(errors, 400)
data = json.loads(request.body())
if data["type"] == "url_verification":
resp_json = dict()
resp_json["challenge"] = data["challenge"]
return Response(json.dumps(resp_json), 200, "application/json")
# Handle the event in the background so that we can respond quickly. If we
# take to long then sesinetd will think this server is in a bad state.
EVENT_QUEUE.put(data)
return Response(b"", 200)
def main():
global SIGNING_SECRET, EVENT_QUEUE, USER, PASSWORD
SIGNING_SECRET = "8fjP3P1EYJiUwU9ONjRGw00UxgbHNm"
USER = sys.argv[1]
PASSWORD = sys.argv[2]
EVENT_QUEUE = queue.Queue()
background_thread = threading.Thread(target=asyncio_handle_webhooks)
background_thread.start()
try:
hwebserver.run(8080, debug=True, reload_source_changes=False)
finally:
# Add the stop token to the queue to inform the background worker
# thread to stop handling events.
EVENT_QUEUE.put(StopToken())
if background_thread is not None:
background_thread.join()
if __name__ == "__main__":
main()