matrix-bot/matrix_bot/bot.py

491 lines
18 KiB
Python

from nio import (
AsyncClient,
AsyncClientConfig,
LoginResponse,
KeyVerificationEvent,
KeyVerificationStart,
KeyVerificationCancel,
KeyVerificationKey,
KeyVerificationMac,
ToDeviceError,
LocalProtocolError,
)
import click
import traceback
import sys
import os
import json
import asyncio
import signal
# The directory containing the store and credentials file
CONFIG_DIRECTORY = '.'
# file to store credentials in case you want to run program multiple times
# login credentials JSON file
CONFIG_FILE = os.path.join(CONFIG_DIRECTORY, 'credentials.json')
# directory to store persistent data for end-to-end encryption
STORE_PATH = os.path.join(CONFIG_DIRECTORY, 'store') # local directory
# Default Homeserver URL
HOMESERVER_URL = 'https://matrix.gaja-group.com'
def write_details_to_disk(resp: LoginResponse, credentials: dict) -> None:
"""Write the required login details to disk.
It will allow following logins to be made without password.
Arguments:
---------
resp : LoginResponse - successful client login response
credentials : dict - The credentials used to sign in
"""
# open the config file in write-mode
with open(CONFIG_FILE, 'w') as f:
# write the login details to disk
json.dump(
{
'homeserver': credentials[
'homeserver'], # e.g. "https://matrix.example.org"
'device_name':
credentials['device_name'], # e.g. 'matrix-bot'
'room_id':
credentials['room_id'], # e.g. '!yourRoomId:example.org'
'user_id': resp.user_id, # e.g. '@user:example.org'
'device_id':
resp.device_id, # device ID, 10 uppercase letters
'access_token': resp.access_token, # cryptogr. access token
},
f,
)
class Bot:
def __init__(self) -> None:
self._loop = asyncio.get_event_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for sig in signals:
self._loop.add_signal_handler(
sig, lambda sig=sig: asyncio.create_task(self.shutdown(sig)))
@classmethod
async def login(cls: any) -> object:
bot = cls()
await bot._login()
return bot
@property
def client(self) -> AsyncClient:
return self._client
@property
def loop(self) -> asyncio.BaseEventLoop:
return self._loop
@property
def client_config(self) -> AsyncClientConfig:
return self._client_config
@property
def config(self) -> dict:
return self._config
def send_text_message_to_room(self, room_id: str, message: str) -> None:
async def send_message(room_id: str, message: str) -> None:
"""Login and wait for and perform emoji verify."""
# Set up event callbacks
content = {'msgtype': 'm.text', 'body': message}
await self._login()
if room_id is None:
room_id = self.config['room_id']
await self.client.sync(timeout=30000, full_state=True)
await self.client.room_send(
room_id=room_id,
message_type='m.room.message',
content=content,
ignore_unverified_devices=True,
)
await self.client.close()
self.loop.run_until_complete(send_message(room_id, message))
async def _login(self) -> AsyncClient:
"""Login to the matrix homeserver defined in the config file.
"""
self._client_config = AsyncClientConfig(
max_limit_exceeded=0,
max_timeouts=0,
store_sync_tokens=True,
encryption_enabled=True,
)
# If there are no previously-saved credentials, we'll use the password
if not os.path.exists(CONFIG_FILE):
click.secho(
'First time use. Did not find credential file. Asking '
'for homeserver, user, and password to create '
'credential file.\n',
bold=True,
)
if not os.path.exists(STORE_PATH):
os.makedirs(STORE_PATH)
credentials = self.ask_credentials()
# Initialize the matrix client
client = AsyncClient(
credentials['homeserver'],
credentials['user_id'],
store_path=STORE_PATH,
config=self.client_config,
)
pw = click.prompt(click.style('Your Password', bold=True),
hide_input=True)
resp = await client.login(password=pw,
device_name=credentials['device_name'])
del pw
# check that we logged in succesfully
if isinstance(resp, LoginResponse):
write_details_to_disk(resp, credentials)
else:
click.secho(
f'homeserver = {credentials["homeserver"]}; '
f' user = {credentials["user_id"]}',
fg='red',
)
click.secho(f'Failed to log in: {resp}', fg='red')
sys.exit(1)
self._config = {
'user_id': credentials['user_id'],
'homeserver': credentials['homeserver'],
'room_id': credentials['room_id'],
'device_name': resp['device_name'],
'device_id': resp.device_id,
'access_token': resp.access_token,
}
click.secho(
'Logged in using a password. Credentials were stored. '
'On next execution the stored login credentials will '
'be used.',
fg='green',
)
# Otherwise the config file exists, so we'll use the stored credentials
else:
# open the file in read-only mode
with open(CONFIG_FILE, 'r') as f:
self._config = json.load(f)
# Initialize the matrix client based on credentials from file
client = AsyncClient(
self.config['homeserver'],
self.config['user_id'],
device_id=self.config['device_id'],
store_path=STORE_PATH,
config=self.client_config,
)
client.restore_login(
user_id=self.config['user_id'],
device_id=self.config['device_id'],
access_token=self.config['access_token'],
)
# click.secho('Logged in using stored credentials.', fg='green')
self._client = client
return client
async def shutdown(self, sig: any) -> None:
await self.client.close()
tasks = [
t for t in asyncio.all_tasks() if t is not asyncio.current_task()
]
for task in tasks:
task.cancel()
await task
await asyncio.gather(*tasks, return_exceptions=False)
self.loop.stop()
def verify(self) -> None:
async def verify() -> None:
"""Login and wait for and perform emoji verify."""
# Set up event callbacks
client = await self._login()
client.add_to_device_callback(self.to_device_callback,
(KeyVerificationEvent, ))
# Sync encryption keys with the server
# Required for participating in encrypted rooms
if self.client.should_upload_keys:
await self.client.keys_upload()
click.secho('\nStarting verification process...', bold=True)
click.secho(
'\nThis program is ready and waiting for the other '
'party to initiate an emoji verification with us by '
'selecting "Verify by Emoji" in their Matrix '
'client.',
fg='green',
)
await self.client.sync_forever(timeout=30000, full_state=True)
self.loop.run_until_complete(verify())
def _ask_credentials(self) -> dict:
"""Ask the user for credentials
"""
homeserver = HOMESERVER_URL
homeserver = click.prompt(
click.style('Enter your homeserver URL', bold=True),
default=homeserver,
)
if not homeserver.startswith('https://'):
homeserver = 'https://' + homeserver
user_id = '@user:gaja-group.com'
user_id = click.prompt(click.style('Enter your full user ID',
bold=True),
default=user_id)
device_name = 'matrix-bot'
device_name = click.prompt(
click.style('Choose a name for this device', bold=True),
default=device_name,
)
room_id = '!yourRoomId:gaja-group.com'
room_id = click.prompt(
click.style('Enter a default room ID to send to', bold=True),
default=room_id,
)
return {
'homeserver': homeserver,
'user_id': user_id,
'device_name': device_name,
'room_id': room_id,
}
async def to_device_callback(self, event): # noqa
"""Handle events sent to device."""
try:
client = self.client
if isinstance(event, KeyVerificationStart): # first step
""" first step: receive KeyVerificationStart
KeyVerificationStart(
source={'content':
{'method': 'm.sas.v1',
'from_device': 'DEVICEIDXY',
'key_agreement_protocols':
['curve25519-hkdf-sha256', 'curve25519'],
'hashes': ['sha256'],
'message_authentication_codes':
['hkdf-hmac-sha256', 'hmac-sha256'],
'short_authentication_string':
['decimal', 'emoji'],
'transaction_id': 'SomeTxId'
},
'type': 'm.key.verification.start',
'sender': '@user2:example.org'
},
sender='@user2:example.org',
transaction_id='SomeTxId',
from_device='DEVICEIDXY',
method='m.sas.v1',
key_agreement_protocols=[
'curve25519-hkdf-sha256', 'curve25519'],
hashes=['sha256'],
message_authentication_codes=[
'hkdf-hmac-sha256', 'hmac-sha256'],
short_authentication_string=['decimal', 'emoji'])
"""
if 'emoji' not in event.short_authentication_string:
click.secho(
'Other device does not support emoji verification '
f'{event.short_authentication_string}.',
fg='red',
)
return
resp = await client.accept_key_verification(
event.transaction_id)
if isinstance(resp, ToDeviceError):
click.secho(f'accept_key_verification failed with {resp}',
fg='red')
sas = client.key_verifications[event.transaction_id]
todevice_msg = sas.share_key()
resp = await client.to_device(todevice_msg)
if isinstance(resp, ToDeviceError):
click.secho(f'to_device failed with {resp}', fg='red')
elif isinstance(event, KeyVerificationCancel): # anytime
""" at any time: receive KeyVerificationCancel
KeyVerificationCancel(source={
'content': {'code': 'm.mismatched_sas',
'reason': 'Mismatched authentication string',
'transaction_id': 'SomeTxId'},
'type': 'm.key.verification.cancel',
'sender': '@user2:example.org'},
sender='@user2:example.org',
transaction_id='SomeTxId',
code='m.mismatched_sas',
reason='Mismatched short authentication string')
"""
# There is no need to issue a
# client.cancel_key_verification(tx_id, reject=False)
# here. The SAS flow is already cancelled.
# We only need to inform the user.
click.secho(
'\nVerification has been cancelled by '
f'{event.sender} for reason "{event.reason}".',
fg='yellow',
)
elif isinstance(event, KeyVerificationKey): # second step
""" Second step is to receive KeyVerificationKey
KeyVerificationKey(
source={'content': {
'key': 'SomeCryptoKey',
'transaction_id': 'SomeTxId'},
'type': 'm.key.verification.key',
'sender': '@user2:example.org'
},
sender='@user2:example.org',
transaction_id='SomeTxId',
key='SomeCryptoKey')
"""
click.secho('\nEmoji verification initiated.\n')
sas = client.key_verifications[event.transaction_id]
emojis = sas.get_emoji()
emoji_list = [' '.join(e) for e in emojis]
click.echo(', '.join(emoji_list))
# print(f'{sas.get_emoji()}')'
print()
try:
if click.confirm(
click.style('Do the emojis match?', bold=True), ):
click.secho(
'\nMatch! The verification for this '
'device will be accepted.',
fg='green',
)
resp = await client.confirm_short_auth_string(
event.transaction_id)
if isinstance(resp, ToDeviceError):
click.secho(
'confirm_short_auth_string failed with '
f'{resp}',
fg='red',
)
else: # no, don't match, reject
click.secho(
'\nNo match! Device will NOT be verified '
'by rejecting verification.',
fg='yellow',
)
resp = await client.cancel_key_verification(
event.transaction_id, reject=True)
if isinstance(resp, ToDeviceError):
click.secho(
f'cancel_key_verification failed with {resp}',
fg='red',
)
except click.exceptions.Abort: # C or anything for cancel
click.secho(
'Cancelled by user! Verification will be '
'cancelled.',
fg='red',
)
resp = await client.cancel_key_verification(
event.transaction_id, reject=False)
if isinstance(resp, ToDeviceError):
print(f'cancel_key_verification failed with {resp}')
elif isinstance(event, KeyVerificationMac): # third step
""" Third step is to receive KeyVerificationMac
KeyVerificationMac(
source={'content': {
'mac': {'ed25519:DEVICEIDXY': 'SomeKey1',
'ed25519:SomeKey2': 'SomeKey3'},
'keys': 'SomeCryptoKey4',
'transaction_id': 'SomeTxId'},
'type': 'm.key.verification.mac',
'sender': '@user2:example.org'},
sender='@user2:example.org',
transaction_id='SomeTxId',
mac={'ed25519:DEVICEIDXY': 'SomeKey1',
'ed25519:SomeKey2': 'SomeKey3'},
keys='SomeCryptoKey4')
"""
sas = client.key_verifications[event.transaction_id]
try:
todevice_msg = sas.get_mac()
except LocalProtocolError as e:
# e.g. it might have been cancelled by ourselves
click.secho(
f'Cancelled or protocol error: Reason: {e}.\n'
f'Verification with {event.sender} not '
'concluded. Try again?',
fg='yellow',
)
else:
resp = await client.to_device(todevice_msg)
if isinstance(resp, ToDeviceError):
print(f'to_device failed with {resp}')
# print(f'sas.we_started_it = {sas.we_started_it}\n'
# f'sas.sas_accepted = {sas.sas_accepted}\n'
# f'sas.canceled = {sas.canceled}\n'
# f'sas.timed_out = {sas.timed_out}\n'
# f'sas.verified = {sas.verified}\n'
# f'sas.verified_devices = {sas.verified_devices}\n')
click.secho('Emoji verification was successful!',
fg='green')
await self.shutdown([], self._loop)
else:
print(f'Received unexpected event type {type(event)}. '
f'Event is {event}. Event will be ignored.')
except BaseException:
print(traceback.format_exc())
@click.group()
@click.option('-v', '--verbose', count=True)
def cli(verbose: int) -> None:
# click.secho('Matrix Bot\n', bold='true')
pass
@cli.command('verify')
def verify_command() -> None:
bot = Bot()
bot.verify()
@cli.command('message')
@click.argument('message')
@click.option('-r', '--room', help='the room to send to')
def send_command(message: str, room: str) -> None:
bot = Bot()
bot.send_text_message_to_room(room, message)
if __name__ == '__main__':
cli()