matrix-webhook/matrix_webhook/utils.py
2021-10-12 16:23:24 +02:00

78 lines
2.7 KiB
Python

"""Matrix Webhook utils."""
import logging
from http import HTTPStatus
from aiohttp import web
from nio import AsyncClient
from nio.exceptions import LocalProtocolError
from nio.responses import RoomSendError, JoinError
from . import conf
ERROR_MAP = {
"M_FORBIDDEN": HTTPStatus.FORBIDDEN,
"M_CONSENT_NOT_GIVEN": HTTPStatus.FORBIDDEN,
}
LOGGER = logging.getLogger("matrix_webhook.utils")
CLIENT = AsyncClient(conf.MATRIX_URL, conf.MATRIX_ID)
def error_map(resp):
"""Map response errors to HTTP status."""
if resp.status_code == "M_UNKNOWN":
# in this case, we should directly consider the HTTP status from the response
# ref. https://matrix.org/docs/spec/client_server/r0.6.1#api-standards
return resp.transport_response.status
return ERROR_MAP[resp.status_code]
def create_json_response(status, ret):
"""Create a JSON response."""
LOGGER.debug(f"Creating json response: {status=}, {ret=}")
response_data = {"status": status, "ret": ret}
return web.json_response(response_data, status=status)
async def join_room(room_id):
"""Try to join the room."""
LOGGER.debug(f"Join room {room_id=}")
for _ in range(10):
try:
resp = await CLIENT.join(room_id)
if isinstance(resp, JoinError):
if resp.status_code == "M_UNKNOWN_TOKEN":
LOGGER.warning("Reconnecting")
await CLIENT.login(conf.MATRIX_PW)
else:
return create_json_response(error_map(resp), resp.message)
else:
return None
except LocalProtocolError as e:
LOGGER.error(f"Send error: {e}")
LOGGER.warning("Trying again")
return create_json_response(HTTPStatus.GATEWAY_TIMEOUT, "Homeserver not responding")
async def send_room_message(room_id, content):
"""Send a message to a room."""
LOGGER.debug(f"Sending room message in {room_id=}: {content=}")
for _ in range(10):
try:
resp = await CLIENT.room_send(
room_id=room_id, message_type="m.room.message", content=content
)
if isinstance(resp, RoomSendError):
if resp.status_code == "M_UNKNOWN_TOKEN":
LOGGER.warning("Reconnecting")
await CLIENT.login(conf.MATRIX_PW)
else:
return create_json_response(error_map(resp), resp.message)
else:
return create_json_response(HTTPStatus.OK, "OK")
except LocalProtocolError as e:
LOGGER.error(f"Send error: {e}")
LOGGER.warning("Trying again")
return create_json_response(HTTPStatus.GATEWAY_TIMEOUT, "Homeserver not responding")