split code in utils / handler / app

This commit is contained in:
Guilhem Saurel 2021-08-08 08:41:37 +02:00
parent 2b7b79971d
commit 7f20fb7ff9
4 changed files with 178 additions and 164 deletions

View file

@ -1,170 +1,9 @@
"""
Matrix Webhook.
Post a message to a matrix room with a simple HTTP POST
"""
import asyncio
import json
"""Matrix Webhook module entrypoint."""
import logging
from http import HTTPStatus
from signal import SIGINT, SIGTERM
from aiohttp import web
from markdown import markdown
from nio import AsyncClient
from nio.exceptions import LocalProtocolError
from nio.responses import RoomSendError
from . import conf, formatters
ERROR_MAP = {"M_FORBIDDEN": HTTPStatus.FORBIDDEN}
CLIENT = AsyncClient(conf.MATRIX_URL, conf.MATRIX_ID)
LOGGER = logging.getLogger("matrix-webhook")
async def handler(request):
"""
Coroutine given to the server, st. it knows what to do with an HTTP request.
This one handles a POST, checks its content, and forwards it to the matrix room.
"""
LOGGER.debug(f"Handling {request=}")
data = await request.read()
try:
data = json.loads(data.decode())
except json.decoder.JSONDecodeError:
return create_json_response(HTTPStatus.BAD_REQUEST, "Invalid JSON")
# legacy naming
if "text" in data and "body" not in data:
data["body"] = data["text"]
# allow key to be passed as a parameter
if "key" in request.rel_url.query and "key" not in data:
data["key"] = request.rel_url.query["key"]
if "formatter" in request.rel_url.query:
try:
data = getattr(formatters, request.rel_url.query["formatter"])(data)
except AttributeError:
return create_json_response(HTTPStatus.BAD_REQUEST, "Unknown formatter")
if "room_id" in request.rel_url.query and "room_id" not in data:
data["room_id"] = request.rel_url.query["room_id"]
if "room_id" not in data:
data["room_id"] = request.path.lstrip("/")
missing = []
for key in ["body", "key", "room_id"]:
if key not in data or not data[key]:
missing.append(key)
if missing:
return create_json_response(
HTTPStatus.BAD_REQUEST, f"Missing {', '.join(missing)}"
)
if data["key"] != conf.API_KEY:
return create_json_response(HTTPStatus.UNAUTHORIZED, "Invalid API key")
if "formatted_body" in data:
formatted_body = data["formatted_body"]
else:
formatted_body = markdown(str(data["body"]), extensions=["extra"])
content = {
"msgtype": "m.text",
"body": data["body"],
"format": "org.matrix.custom.html",
"formatted_body": formatted_body,
}
for _ in range(10):
try:
resp = await send_room_message(data["room_id"], content)
if isinstance(resp, RoomSendError):
if resp.status_code == "M_UNKNOWN_TOKEN":
LOGGER.warning("Reconnecting")
await CLIENT.login(conf.MATRIX_PW)
else:
return create_json_response(
ERROR_MAP[resp.status_code], resp.message
)
else:
break
except LocalProtocolError as e:
LOGGER.error(f"Send error: {e}")
LOGGER.warning("Trying again")
else:
return create_json_response(
HTTPStatus.GATEWAY_TIMEOUT, "Homeserver not responding"
)
return create_json_response(HTTPStatus.OK, "OK")
def create_json_response(status, ret):
"""Create a JSON response."""
LOGGER.debug(f"Creating json response: {status=}, {ret=}")
response_data = {"status": status, "ret": ret}
return web.json_response(response_data, status=status)
async def send_room_message(room_id, content):
"""Send a message to a room."""
LOGGER.debug(f"Sending room message in {room_id=}: {content=}")
return await CLIENT.room_send(
room_id=room_id, message_type="m.room.message", content=content
)
async def main(event):
"""
Launch main coroutine.
matrix client login & start web server
"""
LOGGER.info(f"Log in {conf.MATRIX_ID=} on {conf.MATRIX_URL=}")
await CLIENT.login(conf.MATRIX_PW)
server = web.Server(handler)
runner = web.ServerRunner(server)
await runner.setup()
LOGGER.info(f"Binding on {conf.SERVER_ADDRESS=}")
site = web.TCPSite(runner, *conf.SERVER_ADDRESS)
await site.start()
# Run until we get a shutdown request
await event.wait()
# Cleanup
await runner.cleanup()
await CLIENT.close()
def terminate(event, signal):
"""Close handling stuff."""
event.set()
asyncio.get_event_loop().remove_signal_handler(signal)
def run():
"""Launch everything."""
LOGGER.info("Starting...")
loop = asyncio.get_event_loop()
event = asyncio.Event()
for sig in (SIGINT, SIGTERM):
loop.add_signal_handler(sig, terminate, event, sig)
loop.run_until_complete(main(event))
LOGGER.info("Closing...")
loop.close()
from . import app, conf
if __name__ == "__main__":
log_format = "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"
logging.basicConfig(level=50 - 10 * conf.VERBOSE, format=log_format)
run()
app.run()

56
matrix_webhook/app.py Normal file
View file

@ -0,0 +1,56 @@
"""Matrix Webhook app."""
import asyncio
import logging
from signal import SIGINT, SIGTERM
from aiohttp import web
from . import conf, handler, utils
LOGGER = logging.getLogger("matrix_webhook.app")
async def main(event):
"""
Launch main coroutine.
matrix client login & start web server
"""
LOGGER.info(f"Log in {conf.MATRIX_ID=} on {conf.MATRIX_URL=}")
await utils.CLIENT.login(conf.MATRIX_PW)
server = web.Server(handler.matrix_webhook)
runner = web.ServerRunner(server)
await runner.setup()
LOGGER.info(f"Binding on {conf.SERVER_ADDRESS=}")
site = web.TCPSite(runner, *conf.SERVER_ADDRESS)
await site.start()
# Run until we get a shutdown request
await event.wait()
# Cleanup
await runner.cleanup()
await utils.CLIENT.close()
def terminate(event, signal):
"""Close handling stuff."""
event.set()
asyncio.get_event_loop().remove_signal_handler(signal)
def run():
"""Launch everything."""
LOGGER.info("Starting...")
loop = asyncio.get_event_loop()
event = asyncio.Event()
for sig in (SIGINT, SIGTERM):
loop.add_signal_handler(sig, terminate, event, sig)
loop.run_until_complete(main(event))
LOGGER.info("Closing...")
loop.close()

72
matrix_webhook/handler.py Normal file
View file

@ -0,0 +1,72 @@
"""Matrix Webhook main request handler."""
import json
import logging
from http import HTTPStatus
from markdown import markdown
from . import conf, formatters, utils
LOGGER = logging.getLogger("matrix_webhook.handler")
async def matrix_webhook(request):
"""
Coroutine given to the server, st. it knows what to do with an HTTP request.
This one handles a POST, checks its content, and forwards it to the matrix room.
"""
LOGGER.debug(f"Handling {request=}")
data = await request.read()
try:
data = json.loads(data.decode())
except json.decoder.JSONDecodeError:
return utils.create_json_response(HTTPStatus.BAD_REQUEST, "Invalid JSON")
# legacy naming
if "text" in data and "body" not in data:
data["body"] = data["text"]
# allow key to be passed as a parameter
if "key" in request.rel_url.query and "key" not in data:
data["key"] = request.rel_url.query["key"]
if "formatter" in request.rel_url.query:
try:
data = getattr(formatters, request.rel_url.query["formatter"])(data)
except AttributeError:
return utils.create_json_response(
HTTPStatus.BAD_REQUEST, "Unknown formatter"
)
if "room_id" in request.rel_url.query and "room_id" not in data:
data["room_id"] = request.rel_url.query["room_id"]
if "room_id" not in data:
data["room_id"] = request.path.lstrip("/")
missing = []
for key in ["body", "key", "room_id"]:
if key not in data or not data[key]:
missing.append(key)
if missing:
return utils.create_json_response(
HTTPStatus.BAD_REQUEST, f"Missing {', '.join(missing)}"
)
if data["key"] != conf.API_KEY:
return utils.create_json_response(HTTPStatus.UNAUTHORIZED, "Invalid API key")
if "formatted_body" in data:
formatted_body = data["formatted_body"]
else:
formatted_body = markdown(str(data["body"]), extensions=["extra"])
content = {
"msgtype": "m.text",
"body": data["body"],
"format": "org.matrix.custom.html",
"formatted_body": formatted_body,
}
return await utils.send_room_message(data["room_id"], content)

47
matrix_webhook/utils.py Normal file
View file

@ -0,0 +1,47 @@
"""Matrix Webhook utils."""
import logging
from http import HTTPStatus
from aiohttp import web
from nio import AsyncClient
from nio.exceptions import LocalProtocolError
from nio.responses import RoomSendError
from . import conf
ERROR_MAP = {"M_FORBIDDEN": HTTPStatus.FORBIDDEN}
LOGGER = logging.getLogger("matrix_webhook.utils")
CLIENT = AsyncClient(conf.MATRIX_URL, conf.MATRIX_ID)
def create_json_response(status, ret):
"""Create a JSON response."""
LOGGER.debug(f"Creating json response: {status=}, {ret=}")
response_data = {"status": status, "ret": ret}
return web.json_response(response_data, status=status)
async def send_room_message(room_id, content):
"""Send a message to a room."""
LOGGER.debug(f"Sending room message in {room_id=}: {content=}")
for _ in range(10):
try:
resp = await CLIENT.room_send(
room_id=room_id, message_type="m.room.message", content=content
)
if isinstance(resp, RoomSendError):
if resp.status_code == "M_UNKNOWN_TOKEN":
LOGGER.warning("Reconnecting")
await CLIENT.login(conf.MATRIX_PW)
else:
return create_json_response(
ERROR_MAP[resp.status_code], resp.message
)
else:
return create_json_response(HTTPStatus.OK, "OK")
except LocalProtocolError as e:
LOGGER.error(f"Send error: {e}")
LOGGER.warning("Trying again")
return create_json_response(HTTPStatus.GATEWAY_TIMEOUT, "Homeserver not responding")