"""Matrix Webhook utils.""" import logging from http import HTTPStatus from aiohttp import web from nio import AsyncClient from nio.exceptions import LocalProtocolError from nio.responses import RoomSendError, JoinError from . import conf ERROR_MAP = {"M_FORBIDDEN": HTTPStatus.FORBIDDEN} LOGGER = logging.getLogger("matrix_webhook.utils") CLIENT = AsyncClient(conf.MATRIX_URL, conf.MATRIX_ID) def create_json_response(status, ret): """Create a JSON response.""" LOGGER.debug(f"Creating json response: {status=}, {ret=}") response_data = {"status": status, "ret": ret} return web.json_response(response_data, status=status) async def join_room(room_id): """Try to join the room.""" LOGGER.debug(f"Join room {room_id=}") for _ in range(10): try: resp = await CLIENT.join(room_id) if isinstance(resp, JoinError): if resp.status_code == "M_UNKNOWN_TOKEN": LOGGER.warning("Reconnecting") await CLIENT.login(conf.MATRIX_PW) else: return create_json_response( ERROR_MAP[resp.status_code], resp.message ) else: return None except LocalProtocolError as e: LOGGER.error(f"Send error: {e}") LOGGER.warning("Trying again") return create_json_response(HTTPStatus.GATEWAY_TIMEOUT, "Homeserver not responding") async def send_room_message(room_id, content): """Send a message to a room.""" LOGGER.debug(f"Sending room message in {room_id=}: {content=}") for _ in range(10): try: resp = await CLIENT.room_send( room_id=room_id, message_type="m.room.message", content=content ) if isinstance(resp, RoomSendError): if resp.status_code == "M_UNKNOWN_TOKEN": LOGGER.warning("Reconnecting") await CLIENT.login(conf.MATRIX_PW) else: return create_json_response( ERROR_MAP[resp.status_code], resp.message ) else: return create_json_response(HTTPStatus.OK, "OK") except LocalProtocolError as e: LOGGER.error(f"Send error: {e}") LOGGER.warning("Trying again") return create_json_response(HTTPStatus.GATEWAY_TIMEOUT, "Homeserver not responding")