matrix-bot/matrix_bot/bot.py

578 lines
22 KiB
Python

from nio import (AsyncClient, AsyncClientConfig, LoginResponse,
KeyVerificationEvent, KeyVerificationStart,
KeyVerificationCancel, KeyVerificationKey, KeyVerificationMac,
ToDeviceError, LocalProtocolError, SyncResponse,
RoomMessageText)
import click
import traceback
import sys
import os
import json
import asyncio
import markdown
from .config import (CONFIG_FILE, STORE_PATH, NEXT_BATCH_PATH, HOMESERVER_URL,
FIFO_PATH)
def write_details_to_disk(resp: LoginResponse, credentials: dict) -> None:
"""Write the required login details to disk.
It will allow following logins to be made without password.
Arguments:
---------
resp : LoginResponse - successful client login response
credentials : dict - The credentials used to sign in
"""
# open the config file in write-mode
with open(CONFIG_FILE, 'w') as f:
# write the login details to disk
json.dump(
{
'homeserver': credentials[
'homeserver'], # e.g. "https://matrix.example.org"
'device_name':
credentials['device_name'], # e.g. 'matrix-bot'
'room_id':
credentials['room_id'], # e.g. '!yourRoomId:example.org'
'user_id': resp.user_id, # e.g. '@user:example.org'
'device_id':
resp.device_id, # device ID, 10 uppercase letters
'access_token': resp.access_token, # cryptogr. access token
},
f,
)
class Bot():
def __init__(self) -> None:
self._loop = asyncio.get_event_loop()
@classmethod
async def login(cls: any) -> any:
bot = cls()
await bot._login()
return bot
@property
def client(self) -> AsyncClient:
return self._client
@property
def loop(self) -> asyncio.BaseEventLoop:
return self._loop
@property
def client_config(self) -> AsyncClientConfig:
return self._client_config
@property
def config(self) -> dict:
return self._config
async def send_message_to_room(
self,
room_id: str,
content: dict,
message_type: str = 'm.room.message') -> None:
"""Login and wait for and perform emoji verify."""
# Set up event callbacks
await self._login()
if room_id is None:
room_id = self.config['room_id']
await self.client.sync(timeout=30000, full_state=True)
await self.client.room_send(
room_id=room_id,
message_type=message_type,
content=content,
ignore_unverified_devices=True,
)
await self.client.set_presence('offline')
await self.client.close()
async def send_text_to_room(self, room_id: str, message: str) -> None:
content = {
'msgtype': 'm.text',
'format': 'org.matrix.custom.html',
'formatted_body': markdown.markdown(message),
}
await self.send_message_to_room(room_id, content)
async def send_notice_to_room(self, room_id: str, message: str) -> None:
content = {
'msgtype': 'm.notice',
'format': 'org.matrix.custom.html',
'formatted_body': markdown.markdown(message),
}
await self.send_message_to_room(room_id, content)
async def _login(self) -> AsyncClient:
"""Login to the matrix homeserver defined in the config file.
"""
self._client_config = AsyncClientConfig(
max_limit_exceeded=0,
max_timeouts=0,
store_sync_tokens=True,
encryption_enabled=True,
)
# If there are no previously-saved credentials, we'll use the password
if not os.path.exists(CONFIG_FILE):
click.secho(
'First time use. Did not find credential file. Asking '
'for homeserver, user, and password to create '
'credential file.\n',
bold=True,
)
if not os.path.exists(STORE_PATH):
os.makedirs(STORE_PATH)
credentials = self._ask_credentials()
# Initialize the matrix client
client = AsyncClient(
credentials['homeserver'],
credentials['user_id'],
store_path=STORE_PATH,
config=self.client_config,
)
pw = click.prompt(click.style('Your Password', bold=True),
hide_input=True)
resp = await client.login(password=pw,
device_name=credentials['device_name'])
del pw
# check that we logged in succesfully
if isinstance(resp, LoginResponse):
write_details_to_disk(resp, credentials)
else:
click.secho(
f'homeserver = {credentials["homeserver"]}; '
f' user = {credentials["user_id"]}',
fg='red',
)
click.secho(f'Failed to log in: {resp}', fg='red')
sys.exit(1)
self._config = {
'user_id': credentials['user_id'],
'homeserver': credentials['homeserver'],
'room_id': credentials['room_id'],
'device_name': credentials['device_name'],
'device_id': resp.device_id,
'access_token': resp.access_token,
}
click.secho(
'Logged in using a password. Credentials were stored. '
'On next execution the stored login credentials will '
'be used.',
fg='green',
)
# Otherwise the config file exists, so we'll use the stored credentials
else:
# open the file in read-only mode
with open(CONFIG_FILE, 'r') as f:
self._config = json.load(f)
# Initialize the matrix client based on credentials from file
client = AsyncClient(
self.config['homeserver'],
self.config['user_id'],
device_id=self.config['device_id'],
store_path=STORE_PATH,
config=self.client_config,
)
client.restore_login(
user_id=self.config['user_id'],
device_id=self.config['device_id'],
access_token=self.config['access_token'],
)
# click.secho('Logged in using stored credentials.', fg='green')
self._client = client
await self.client.set_presence('online')
return client
async def shutdown(self, sig: any) -> None:
await self.client.close()
tasks = [
t for t in asyncio.all_tasks() if t is not asyncio.current_task()
]
for task in tasks:
task.cancel()
await task
await asyncio.gather(*tasks, return_exceptions=False)
self.loop.stop()
async def _verify(self) -> None:
"""Login and wait for and perform emoji verify."""
# Set up event callbacks
client = await self._login()
client.add_to_device_callback(self._to_device_callback,
(KeyVerificationEvent, ))
# Sync encryption keys with the server
# Required for participating in encrypted rooms
if self.client.should_upload_keys:
await self.client.keys_upload()
click.secho('\nStarting verification process...', bold=True)
click.secho(
'\nThis program is ready and waiting for the other '
'party to initiate an emoji verification with us by '
'selecting "Verify by Emoji" in their Matrix '
'client.',
fg='green',
)
await self.client.sync_forever(timeout=30000, full_state=True)
def verify(self) -> None:
self._run_async(self._verify())
def run(self) -> None:
self._run_async(self._run())
async def _run(self) -> None:
fifo_name = FIFO_PATH
if os.path.exists(fifo_name):
os.remove(fifo_name)
os.mkfifo(fifo_name)
pipe_fd = os.open(fifo_name, (os.O_RDONLY | os.O_NONBLOCK))
client = await self._login()
# we read the previously-written token...
next_batch_name = NEXT_BATCH_PATH
if os.path.exists(next_batch_name):
with open(next_batch_name, 'r') as next_batch_token:
# ... and well async_client to use it
self.client.next_batch = next_batch_token.read()
client.add_response_callback(self._sync_callback, SyncResponse)
client.add_event_callback(self._room_text_message_callback,
RoomMessageText)
await asyncio.gather(
self.client.sync_forever(timeout=30000, full_state=True),
self._fifo_reader(pipe_fd),
)
def _run_async(self, future: asyncio.Future) -> None:
try:
self.loop.run_until_complete(future)
except Exception:
print(traceback.format_exc())
sys.exit(1)
except asyncio.exceptions.CancelledError:
sys.exit(0)
except KeyboardInterrupt:
sys.exit(0)
def _ask_credentials(self) -> dict:
"""Ask the user for credentials
"""
try:
homeserver = HOMESERVER_URL
homeserver = click.prompt(
click.style('Enter your homeserver URL', bold=True),
default=homeserver,
)
if not homeserver.startswith('https://'):
homeserver = 'https://' + homeserver
user_id = '@user:gaja-group.com'
user_id = click.prompt(click.style('Enter your full user ID',
bold=True),
default=user_id)
device_name = 'matrix-bot'
device_name = click.prompt(
click.style('Choose a name for this device', bold=True),
default=device_name,
)
room_id = '!yourRoomId:gaja-group.com'
room_id = click.prompt(
click.style('Enter a default room ID to send to', bold=True),
default=room_id,
)
except click.exceptions.Abort:
sys.exit(0)
return {
'homeserver': homeserver,
'user_id': user_id,
'device_name': device_name,
'room_id': room_id,
}
async def _to_device_callback(self, event): # noqa
"""Handle events sent to device."""
try:
client = self.client
if isinstance(event, KeyVerificationStart): # first step
""" first step: receive KeyVerificationStart
KeyVerificationStart(
source={'content':
{'method': 'm.sas.v1',
'from_device': 'DEVICEIDXY',
'key_agreement_protocols':
['curve25519-hkdf-sha256', 'curve25519'],
'hashes': ['sha256'],
'message_authentication_codes':
['hkdf-hmac-sha256', 'hmac-sha256'],
'short_authentication_string':
['decimal', 'emoji'],
'transaction_id': 'SomeTxId'
},
'type': 'm.key.verification.start',
'sender': '@user2:example.org'
},
sender='@user2:example.org',
transaction_id='SomeTxId',
from_device='DEVICEIDXY',
method='m.sas.v1',
key_agreement_protocols=[
'curve25519-hkdf-sha256', 'curve25519'],
hashes=['sha256'],
message_authentication_codes=[
'hkdf-hmac-sha256', 'hmac-sha256'],
short_authentication_string=['decimal', 'emoji'])
"""
if 'emoji' not in event.short_authentication_string:
click.secho(
'Other device does not support emoji verification '
f'{event.short_authentication_string}.',
fg='red',
)
return
resp = await client.accept_key_verification(
event.transaction_id)
if isinstance(resp, ToDeviceError):
click.secho(f'accept_key_verification failed with {resp}',
fg='red')
sas = client.key_verifications[event.transaction_id]
todevice_msg = sas.share_key()
resp = await client.to_device(todevice_msg)
if isinstance(resp, ToDeviceError):
click.secho(f'to_device failed with {resp}', fg='red')
elif isinstance(event, KeyVerificationCancel): # anytime
""" at any time: receive KeyVerificationCancel
KeyVerificationCancel(source={
'content': {'code': 'm.mismatched_sas',
'reason': 'Mismatched authentication string',
'transaction_id': 'SomeTxId'},
'type': 'm.key.verification.cancel',
'sender': '@user2:example.org'},
sender='@user2:example.org',
transaction_id='SomeTxId',
code='m.mismatched_sas',
reason='Mismatched short authentication string')
"""
# There is no need to issue a
# client.cancel_key_verification(tx_id, reject=False)
# here. The SAS flow is already cancelled.
# We only need to inform the user.
click.secho(
'\nVerification has been cancelled by '
f'{event.sender} for reason "{event.reason}".',
fg='yellow',
)
elif isinstance(event, KeyVerificationKey): # second step
""" Second step is to receive KeyVerificationKey
KeyVerificationKey(
source={'content': {
'key': 'SomeCryptoKey',
'transaction_id': 'SomeTxId'},
'type': 'm.key.verification.key',
'sender': '@user2:example.org'
},
sender='@user2:example.org',
transaction_id='SomeTxId',
key='SomeCryptoKey')
"""
click.secho('\nEmoji verification initiated.\n')
sas = client.key_verifications[event.transaction_id]
emojis = sas.get_emoji()
emoji_list = [' '.join(e) for e in emojis]
click.echo(', '.join(emoji_list))
# print(f'{sas.get_emoji()}')'
print()
try:
if click.confirm(
click.style('Do the emojis match?', bold=True), ):
click.secho(
'\nMatch! The verification for this '
'device will be accepted.',
fg='green',
)
resp = await client.confirm_short_auth_string(
event.transaction_id)
if isinstance(resp, ToDeviceError):
click.secho(
'confirm_short_auth_string failed with '
f'{resp}',
fg='red',
)
else: # no, don't match, reject
click.secho(
'\nNo match! Device will NOT be verified '
'by rejecting verification.',
fg='yellow',
)
resp = await client.cancel_key_verification(
event.transaction_id, reject=True)
if isinstance(resp, ToDeviceError):
click.secho(
f'cancel_key_verification failed with {resp}',
fg='red',
)
except click.exceptions.Abort: # C or anything for cancel
click.secho(
'Cancelled by user! Verification will be '
'cancelled.',
fg='red',
)
resp = await client.cancel_key_verification(
event.transaction_id, reject=False)
if isinstance(resp, ToDeviceError):
print(f'cancel_key_verification failed with {resp}')
elif isinstance(event, KeyVerificationMac): # third step
""" Third step is to receive KeyVerificationMac
KeyVerificationMac(
source={'content': {
'mac': {'ed25519:DEVICEIDXY': 'SomeKey1',
'ed25519:SomeKey2': 'SomeKey3'},
'keys': 'SomeCryptoKey4',
'transaction_id': 'SomeTxId'},
'type': 'm.key.verification.mac',
'sender': '@user2:example.org'},
sender='@user2:example.org',
transaction_id='SomeTxId',
mac={'ed25519:DEVICEIDXY': 'SomeKey1',
'ed25519:SomeKey2': 'SomeKey3'},
keys='SomeCryptoKey4')
"""
sas = client.key_verifications[event.transaction_id]
try:
todevice_msg = sas.get_mac()
except LocalProtocolError as e:
# e.g. it might have been cancelled by ourselves
click.secho(
f'Cancelled or protocol error: Reason: {e}.\n'
f'Verification with {event.sender} not '
'concluded. Try again?',
fg='yellow',
)
else:
resp = await client.to_device(todevice_msg)
if isinstance(resp, ToDeviceError):
print(f'to_device failed with {resp}')
# print(f'sas.we_started_it = {sas.we_started_it}\n'
# f'sas.sas_accepted = {sas.sas_accepted}\n'
# f'sas.canceled = {sas.canceled}\n'
# f'sas.timed_out = {sas.timed_out}\n'
# f'sas.verified = {sas.verified}\n'
# f'sas.verified_devices = {sas.verified_devices}\n')
click.secho(
'Emoji verification was successful! Please use Ctrl+C '
'to exit.',
fg='green')
else:
print(f'Received unexpected event type {type(event)}. '
f'Event is {event}. Event will be ignored.')
except BaseException:
print(traceback.format_exc())
def _sync_callback(self, event: any) -> None:
with open(NEXT_BATCH_PATH, 'w') as next_batch_token:
next_batch_token.write(event.next_batch)
async def _room_text_message_callback(self, room: any,
message: any) -> None:
if room.room_id == self.config['room_id']:
if message.body.startswith('!help'):
await self.send_notice_to_room(
room.room_id, """##### Usage
**!help** - displays this help message
""")
async def _fifo_reader(self, pipe_fd: str) -> None:
with os.fdopen(pipe_fd) as file:
while True:
data = file.read()
if len(data) > 0:
try:
data = data.split('\0')
for d in data:
if (len(d) > 0):
cmd = json.loads(d)
room_id = self.config['room_id']
if cmd['type'] == 'message':
if 'room_id' in cmd.keys():
room_id = cmd['room_id']
await self.send_notice_to_room(
room_id, cmd['content'])
except json.JSONDecodeError as e:
print('JSON decode error:', e)
pass
await asyncio.sleep(1)
@click.group(invoke_without_command=True)
@click.option('-v', '--verbose', count=True)
@click.pass_context
def cli(ctx: click.Context, verbose: int) -> None:
# click.secho('Matrix Bot\n', bold='true')
if ctx.invoked_subcommand is None:
bot = Bot()
bot.run()
@cli.command('verify')
def verify_command() -> None:
bot = Bot()
bot.verify()
@cli.command('message')
@click.argument('message')
@click.option('-r', '--room', help='the room to send to')
def send_command(message: str, room: str) -> None:
bot = Bot()
bot._run_async(bot.send_text_to_room(room, message))
@cli.command('run')
def run() -> None:
bot = Bot()
bot.run()
if __name__ == '__main__':
cli()