from nio import ( AsyncClient, AsyncClientConfig, LoginResponse, KeyVerificationEvent, KeyVerificationStart, KeyVerificationCancel, KeyVerificationKey, KeyVerificationMac, ToDeviceError, LocalProtocolError, ) import click import traceback import sys import os import json import asyncio import signal # The directory containing the store and credentials file CONFIG_DIRECTORY = '.' # file to store credentials in case you want to run program multiple times # login credentials JSON file CONFIG_FILE = os.path.join(CONFIG_DIRECTORY, 'credentials.json') # directory to store persistent data for end-to-end encryption STORE_PATH = os.path.join(CONFIG_DIRECTORY, 'store') # local directory # Default Homeserver URL HOMESERVER_URL = 'https://matrix.gaja-group.com' def write_details_to_disk(resp: LoginResponse, credentials: dict) -> None: """Write the required login details to disk. It will allow following logins to be made without password. Arguments: --------- resp : LoginResponse - successful client login response credentials : dict - The credentials used to sign in """ # open the config file in write-mode with open(CONFIG_FILE, 'w') as f: # write the login details to disk json.dump( { 'homeserver': credentials[ 'homeserver'], # e.g. "https://matrix.example.org" 'device_name': credentials['device_name'], # e.g. 'matrix-bot' 'room_id': credentials['room_id'], # e.g. '!yourRoomId:example.org' 'user_id': resp.user_id, # e.g. '@user:example.org' 'device_id': resp.device_id, # device ID, 10 uppercase letters 'access_token': resp.access_token, # cryptogr. access token }, f, ) class Bot: def __init__(self) -> None: self._loop = asyncio.get_event_loop() signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) for sig in signals: self._loop.add_signal_handler( sig, lambda sig=sig: asyncio.create_task(self.shutdown(sig))) @classmethod async def login(cls: any) -> object: bot = cls() await bot._login() return bot @property def client(self) -> AsyncClient: return self._client @property def loop(self) -> asyncio.BaseEventLoop: return self._loop @property def client_config(self) -> AsyncClientConfig: return self._client_config @property def config(self) -> dict: return self._config def send_text_message_to_room(self, room_id: str, message: str) -> None: async def send_message(room_id: str, message: str) -> None: """Login and wait for and perform emoji verify.""" # Set up event callbacks content = {'msgtype': 'm.text', 'body': message} await self._login() if room_id is None: room_id = self.config['room_id'] await self.client.sync(timeout=30000, full_state=True) await self.client.room_send( room_id=room_id, message_type='m.room.message', content=content, ignore_unverified_devices=True, ) await self.client.close() self.loop.run_until_complete(send_message(room_id, message)) async def _login(self) -> AsyncClient: """Login to the matrix homeserver defined in the config file. """ self._client_config = AsyncClientConfig( max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=True, ) # If there are no previously-saved credentials, we'll use the password if not os.path.exists(CONFIG_FILE): click.secho( 'First time use. Did not find credential file. Asking ' 'for homeserver, user, and password to create ' 'credential file.\n', bold=True, ) if not os.path.exists(STORE_PATH): os.makedirs(STORE_PATH) credentials = self.ask_credentials() # Initialize the matrix client client = AsyncClient( credentials['homeserver'], credentials['user_id'], store_path=STORE_PATH, config=self.client_config, ) pw = click.prompt(click.style('Your Password', bold=True), hide_input=True) resp = await client.login(password=pw, device_name=credentials['device_name']) del pw # check that we logged in succesfully if isinstance(resp, LoginResponse): write_details_to_disk(resp, credentials) else: click.secho( f'homeserver = {credentials["homeserver"]}; ' f' user = {credentials["user_id"]}', fg='red', ) click.secho(f'Failed to log in: {resp}', fg='red') sys.exit(1) self._config = { 'user_id': credentials['user_id'], 'homeserver': credentials['homeserver'], 'room_id': credentials['room_id'], 'device_name': resp['device_name'], 'device_id': resp.device_id, 'access_token': resp.access_token, } click.secho( 'Logged in using a password. Credentials were stored. ' 'On next execution the stored login credentials will ' 'be used.', fg='green', ) # Otherwise the config file exists, so we'll use the stored credentials else: # open the file in read-only mode with open(CONFIG_FILE, 'r') as f: self._config = json.load(f) # Initialize the matrix client based on credentials from file client = AsyncClient( self.config['homeserver'], self.config['user_id'], device_id=self.config['device_id'], store_path=STORE_PATH, config=self.client_config, ) client.restore_login( user_id=self.config['user_id'], device_id=self.config['device_id'], access_token=self.config['access_token'], ) # click.secho('Logged in using stored credentials.', fg='green') self._client = client return client async def shutdown(self, sig: any) -> None: await self.client.close() tasks = [ t for t in asyncio.all_tasks() if t is not asyncio.current_task() ] for task in tasks: task.cancel() await task await asyncio.gather(*tasks, return_exceptions=False) self.loop.stop() def verify(self) -> None: async def verify() -> None: """Login and wait for and perform emoji verify.""" # Set up event callbacks client = await self._login() client.add_to_device_callback(self.to_device_callback, (KeyVerificationEvent, )) # Sync encryption keys with the server # Required for participating in encrypted rooms if self.client.should_upload_keys: await self.client.keys_upload() click.secho('\nStarting verification process...', bold=True) click.secho( '\nThis program is ready and waiting for the other ' 'party to initiate an emoji verification with us by ' 'selecting "Verify by Emoji" in their Matrix ' 'client.', fg='green', ) await self.client.sync_forever(timeout=30000, full_state=True) self.loop.run_until_complete(verify()) def _ask_credentials(self) -> dict: """Ask the user for credentials """ homeserver = HOMESERVER_URL homeserver = click.prompt( click.style('Enter your homeserver URL', bold=True), default=homeserver, ) if not homeserver.startswith('https://'): homeserver = 'https://' + homeserver user_id = '@user:gaja-group.com' user_id = click.prompt(click.style('Enter your full user ID', bold=True), default=user_id) device_name = 'matrix-bot' device_name = click.prompt( click.style('Choose a name for this device', bold=True), default=device_name, ) room_id = '!yourRoomId:gaja-group.com' room_id = click.prompt( click.style('Enter a default room ID to send to', bold=True), default=room_id, ) return { 'homeserver': homeserver, 'user_id': user_id, 'device_name': device_name, 'room_id': room_id, } async def to_device_callback(self, event): # noqa """Handle events sent to device.""" try: client = self.client if isinstance(event, KeyVerificationStart): # first step """ first step: receive KeyVerificationStart KeyVerificationStart( source={'content': {'method': 'm.sas.v1', 'from_device': 'DEVICEIDXY', 'key_agreement_protocols': ['curve25519-hkdf-sha256', 'curve25519'], 'hashes': ['sha256'], 'message_authentication_codes': ['hkdf-hmac-sha256', 'hmac-sha256'], 'short_authentication_string': ['decimal', 'emoji'], 'transaction_id': 'SomeTxId' }, 'type': 'm.key.verification.start', 'sender': '@user2:example.org' }, sender='@user2:example.org', transaction_id='SomeTxId', from_device='DEVICEIDXY', method='m.sas.v1', key_agreement_protocols=[ 'curve25519-hkdf-sha256', 'curve25519'], hashes=['sha256'], message_authentication_codes=[ 'hkdf-hmac-sha256', 'hmac-sha256'], short_authentication_string=['decimal', 'emoji']) """ if 'emoji' not in event.short_authentication_string: click.secho( 'Other device does not support emoji verification ' f'{event.short_authentication_string}.', fg='red', ) return resp = await client.accept_key_verification( event.transaction_id) if isinstance(resp, ToDeviceError): click.secho(f'accept_key_verification failed with {resp}', fg='red') sas = client.key_verifications[event.transaction_id] todevice_msg = sas.share_key() resp = await client.to_device(todevice_msg) if isinstance(resp, ToDeviceError): click.secho(f'to_device failed with {resp}', fg='red') elif isinstance(event, KeyVerificationCancel): # anytime """ at any time: receive KeyVerificationCancel KeyVerificationCancel(source={ 'content': {'code': 'm.mismatched_sas', 'reason': 'Mismatched authentication string', 'transaction_id': 'SomeTxId'}, 'type': 'm.key.verification.cancel', 'sender': '@user2:example.org'}, sender='@user2:example.org', transaction_id='SomeTxId', code='m.mismatched_sas', reason='Mismatched short authentication string') """ # There is no need to issue a # client.cancel_key_verification(tx_id, reject=False) # here. The SAS flow is already cancelled. # We only need to inform the user. click.secho( '\nVerification has been cancelled by ' f'{event.sender} for reason "{event.reason}".', fg='yellow', ) elif isinstance(event, KeyVerificationKey): # second step """ Second step is to receive KeyVerificationKey KeyVerificationKey( source={'content': { 'key': 'SomeCryptoKey', 'transaction_id': 'SomeTxId'}, 'type': 'm.key.verification.key', 'sender': '@user2:example.org' }, sender='@user2:example.org', transaction_id='SomeTxId', key='SomeCryptoKey') """ click.secho('\nEmoji verification initiated.\n') sas = client.key_verifications[event.transaction_id] emojis = sas.get_emoji() emoji_list = [' '.join(e) for e in emojis] click.echo(', '.join(emoji_list)) # print(f'{sas.get_emoji()}')' print() try: if click.confirm( click.style('Do the emojis match?', bold=True), ): click.secho( '\nMatch! The verification for this ' 'device will be accepted.', fg='green', ) resp = await client.confirm_short_auth_string( event.transaction_id) if isinstance(resp, ToDeviceError): click.secho( 'confirm_short_auth_string failed with ' f'{resp}', fg='red', ) else: # no, don't match, reject click.secho( '\nNo match! Device will NOT be verified ' 'by rejecting verification.', fg='yellow', ) resp = await client.cancel_key_verification( event.transaction_id, reject=True) if isinstance(resp, ToDeviceError): click.secho( f'cancel_key_verification failed with {resp}', fg='red', ) except click.exceptions.Abort: # C or anything for cancel click.secho( 'Cancelled by user! Verification will be ' 'cancelled.', fg='red', ) resp = await client.cancel_key_verification( event.transaction_id, reject=False) if isinstance(resp, ToDeviceError): print(f'cancel_key_verification failed with {resp}') elif isinstance(event, KeyVerificationMac): # third step """ Third step is to receive KeyVerificationMac KeyVerificationMac( source={'content': { 'mac': {'ed25519:DEVICEIDXY': 'SomeKey1', 'ed25519:SomeKey2': 'SomeKey3'}, 'keys': 'SomeCryptoKey4', 'transaction_id': 'SomeTxId'}, 'type': 'm.key.verification.mac', 'sender': '@user2:example.org'}, sender='@user2:example.org', transaction_id='SomeTxId', mac={'ed25519:DEVICEIDXY': 'SomeKey1', 'ed25519:SomeKey2': 'SomeKey3'}, keys='SomeCryptoKey4') """ sas = client.key_verifications[event.transaction_id] try: todevice_msg = sas.get_mac() except LocalProtocolError as e: # e.g. it might have been cancelled by ourselves click.secho( f'Cancelled or protocol error: Reason: {e}.\n' f'Verification with {event.sender} not ' 'concluded. Try again?', fg='yellow', ) else: resp = await client.to_device(todevice_msg) if isinstance(resp, ToDeviceError): print(f'to_device failed with {resp}') # print(f'sas.we_started_it = {sas.we_started_it}\n' # f'sas.sas_accepted = {sas.sas_accepted}\n' # f'sas.canceled = {sas.canceled}\n' # f'sas.timed_out = {sas.timed_out}\n' # f'sas.verified = {sas.verified}\n' # f'sas.verified_devices = {sas.verified_devices}\n') click.secho('Emoji verification was successful!', fg='green') await self.shutdown([], self._loop) else: print(f'Received unexpected event type {type(event)}. ' f'Event is {event}. Event will be ignored.') except BaseException: print(traceback.format_exc()) @click.group() @click.option('-v', '--verbose', count=True) def cli(verbose: int) -> None: # click.secho('Matrix Bot\n', bold='true') pass @cli.command('verify') def verify_command() -> None: bot = Bot() bot.verify() @cli.command('message') @click.argument('message') @click.option('-r', '--room', help='the room to send to') def send_command(message: str, room: str) -> None: bot = Bot() bot.send_text_message_to_room(room, message) if __name__ == '__main__': cli()