import asyncio import os from pathlib import Path import re import sys import traceback from typing import Union, Optional import uuid import aiohttp from nio import ( AsyncClient, AsyncClientConfig, InviteMemberEvent, JoinError, KeyVerificationCancel, KeyVerificationEvent, EncryptionError, KeyVerificationKey, KeyVerificationMac, KeyVerificationStart, LocalProtocolError, LoginResponse, MatrixRoom, MegolmEvent, RoomMessageText, ToDeviceError, ) from nio.store.database import SqliteStore from askgpt import askGPT from chatgpt_bing import GPTBOT from BingImageGen import ImageGenAsync from log import getlogger from send_image import send_room_image from send_message import send_room_message from bard import Bardbot from flowise import flowise_query from pandora_api import Pandora logger = getlogger() chatgpt_api_endpoint = "https://api.openai.com/v1/chat/completions" base_path = Path(os.path.dirname(__file__)).parent class Bot: def __init__( self, homeserver: str, user_id: str, device_id: str, api_endpoint: Optional[str] = None, openai_api_key: Union[str, None] = None, temperature: Union[float, None] = None, room_id: Union[str, None] = None, password: Union[str, None] = None, access_token: Union[str, None] = None, bard_token: Union[str, None] = None, jailbreakEnabled: Union[bool, None] = True, bing_auth_cookie: Union[str, None] = "", markdown_formatted: Union[bool, None] = False, output_four_images: Union[bool, None] = False, import_keys_path: Optional[str] = None, import_keys_password: Optional[str] = None, flowise_api_url: Optional[str] = None, flowise_api_key: Optional[str] = None, pandora_api_endpoint: Optional[str] = None, pandora_api_model: Optional[str] = None, ): if homeserver is None or user_id is None or device_id is None: logger.warning("homeserver && user_id && device_id is required") sys.exit(1) if password is None and access_token is None: logger.warning("password or access_toekn is required") sys.exit(1) self.homeserver = homeserver self.user_id = user_id self.password = password self.access_token = access_token self.bard_token = bard_token self.device_id = device_id self.room_id = room_id self.openai_api_key = openai_api_key self.bing_auth_cookie = bing_auth_cookie self.api_endpoint = api_endpoint self.import_keys_path = import_keys_path self.import_keys_password = import_keys_password self.flowise_api_url = flowise_api_url self.flowise_api_key = flowise_api_key self.pandora_api_endpoint = pandora_api_endpoint self.temperature = temperature self.session = aiohttp.ClientSession() if openai_api_key is not None: if not self.openai_api_key.startswith("sk-"): logger.warning("invalid openai api key") sys.exit(1) if jailbreakEnabled is None: self.jailbreakEnabled = True else: self.jailbreakEnabled = jailbreakEnabled if markdown_formatted is None: self.markdown_formatted = False else: self.markdown_formatted = markdown_formatted if output_four_images is None: self.output_four_images = False else: self.output_four_images = output_four_images # initialize AsyncClient object self.store_path = base_path self.config = AsyncClientConfig( store=SqliteStore, store_name="db", store_sync_tokens=True, encryption_enabled=True, ) self.client = AsyncClient( homeserver=self.homeserver, user=self.user_id, device_id=self.device_id, config=self.config, store_path=self.store_path, ) if self.access_token is not None: self.client.access_token = self.access_token # setup event callbacks self.client.add_event_callback(self.message_callback, (RoomMessageText,)) self.client.add_event_callback(self.decryption_failure, (MegolmEvent,)) self.client.add_event_callback(self.invite_callback, (InviteMemberEvent,)) self.client.add_to_device_callback( self.to_device_callback, (KeyVerificationEvent,) ) # regular expression to match keyword commands self.gpt_prog = re.compile(r"^\s*!gpt\s*(.+)$") self.chat_prog = re.compile(r"^\s*!chat\s*(.+)$") self.bing_prog = re.compile(r"^\s*!bing\s*(.+)$") self.bard_prog = re.compile(r"^\s*!bard\s*(.+)$") self.pic_prog = re.compile(r"^\s*!pic\s*(.+)$") self.lc_prog = re.compile(r"^\s*!lc\s*(.+)$") self.help_prog = re.compile(r"^\s*!help\s*.*$") self.talk_prog = re.compile(r"^\s*!talk\s*(.+)$") self.goon_prog = re.compile(r"^\s*!goon\s*.*$") self.new_prog = re.compile(r"^\s*!new\s*(.+)$") # initialize askGPT class self.askgpt = askGPT(self.session) # request header for !gpt command self.gptheaders = { "Content-Type": "application/json", "Authorization": f"Bearer {self.openai_api_key}", } # initialize bing and chatgpt if self.api_endpoint is not None: self.gptbot = GPTBOT(self.api_endpoint, self.session) self.chatgpt_data = {} self.bing_data = {} # initialize BingImageGenAsync if self.bing_auth_cookie != "": self.imageGen = ImageGenAsync(self.bing_auth_cookie, quiet=True) # initialize pandora if pandora_api_endpoint is not None: self.pandora = Pandora( api_endpoint=pandora_api_endpoint, clientSession=self.session ) if pandora_api_model is None: self.pandora_api_model = "text-davinci-002-render-sha-mobile" else: self.pandora_api_model = pandora_api_model self.pandora_data = {} # initialize bard self.bard_data = {} def __del__(self): try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(self._close()) async def _close(self): await self.session.close() def chatgpt_session_init(self, sender_id: str) -> None: self.chatgpt_data[sender_id] = { "first_time": True, } def bing_session_init(self, sender_id: str) -> None: self.bing_data[sender_id] = { "first_time": True, } def pandora_session_init(self, sender_id: str) -> None: self.pandora_data[sender_id] = { "conversation_id": None, "parent_message_id": str(uuid.uuid4()), "first_time": True, } async def bard_session_init(self, sender_id: str) -> None: self.bard_data[sender_id] = { "instance": await Bardbot.create(self.bard_token, 60), } # message_callback RoomMessageText event async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> None: if self.room_id is None: room_id = room.room_id else: # if event room id does not match the room id in config, return if room.room_id != self.room_id: return room_id = self.room_id # reply event_id reply_to_event_id = event.event_id # sender_id sender_id = event.sender # user_message raw_user_message = event.body # print info to console logger.info( f"Message received in room {room.display_name}\n" f"{room.user_name(event.sender)} | {raw_user_message}" ) # prevent command trigger loop if self.user_id != event.sender: # remove newline character from event.body content_body = re.sub("\r\n|\r|\n", " ", raw_user_message) # !gpt command if self.openai_api_key is not None: m = self.gpt_prog.match(content_body) if m: prompt = m.group(1) try: asyncio.create_task( self.gpt( room_id, reply_to_event_id, prompt, sender_id, raw_user_message, ) ) except Exception as e: logger.error(e, exc_info=True) if self.api_endpoint is not None: # chatgpt n = self.chat_prog.match(content_body) if n: if sender_id not in self.chatgpt_data: self.chatgpt_session_init(sender_id) prompt = n.group(1) if self.openai_api_key is not None: try: asyncio.create_task( self.chat( room_id, reply_to_event_id, prompt, sender_id, raw_user_message, ) ) except Exception as e: logger.error(e, exc_info=True) else: logger.warning("No API_KEY provided") await send_room_message( self.client, room_id, reply_message="API_KEY not provided" ) # bing ai # if self.bing_api_endpoint != "": # bing ai can be used without cookie b = self.bing_prog.match(content_body) if b: if sender_id not in self.bing_data: self.bing_session_init(sender_id) prompt = b.group(1) # raw_content_body used for construct formatted_body try: asyncio.create_task( self.bing( room_id, reply_to_event_id, prompt, sender_id, raw_user_message, ) ) except Exception as e: logger.error(e, exc_info=True) # Image Generation by Microsoft Bing if self.bing_auth_cookie != "": i = self.pic_prog.match(content_body) if i: prompt = i.group(1) try: asyncio.create_task(self.pic(room_id, prompt)) except Exception as e: logger.error(e, exc_info=True) # Google's Bard if self.bard_token is not None: if sender_id not in self.bard_data: await self.bard_session_init(sender_id) b = self.bard_prog.match(content_body) if b: prompt = b.group(1) try: asyncio.create_task( self.bard( room_id, reply_to_event_id, prompt, sender_id, raw_user_message, ) ) except Exception as e: logger.error(e, exc_info=True) # lc command if self.flowise_api_url is not None: m = self.lc_prog.match(content_body) if m: prompt = m.group(1) try: asyncio.create_task( self.lc( room_id, reply_to_event_id, prompt, sender_id, raw_user_message, ) ) except Exception as e: await send_room_message(self.client, room_id, reply_message={e}) logger.error(e, exc_info=True) # pandora if self.pandora_api_endpoint is not None: t = self.talk_prog.match(content_body) if t: if sender_id not in self.pandora_data: self.pandora_session_init(sender_id) prompt = t.group(1) try: asyncio.create_task( self.talk( room_id, reply_to_event_id, prompt, sender_id, raw_user_message, ) ) except Exception as e: logger.error(e, exc_info=True) g = self.goon_prog.match(content_body) if g: if sender_id not in self.pandora_data: self.pandora_session_init(sender_id) try: asyncio.create_task( self.goon( room_id, reply_to_event_id, sender_id, raw_user_message, ) ) except Exception as e: logger.error(e, exc_info=True) # !new command n = self.new_prog.match(content_body) if n: new_command_kind = n.group(1) try: asyncio.create_task( self.new( room_id, reply_to_event_id, sender_id, raw_user_message, new_command_kind, ) ) except Exception as e: logger.error(e, exc_info=True) # help command h = self.help_prog.match(content_body) if h: try: asyncio.create_task(self.help(room_id)) except Exception as e: logger.error(e, exc_info=True) # message_callback decryption_failure event async def decryption_failure(self, room: MatrixRoom, event: MegolmEvent) -> None: if not isinstance(event, MegolmEvent): return logger.error( f"Failed to decrypt message: {event.event_id} \ from {event.sender} in {room.room_id}\n" + "Please make sure the bot current session is verified" ) # invite_callback event async def invite_callback(self, room: MatrixRoom, event: InviteMemberEvent) -> None: """Handle an incoming invite event. If an invite is received, then join the room specified in the invite. code copied from: https://github.com/8go/matrix-eno-bot/blob/ad037e02bd2960941109e9526c1033dd157bb212/callbacks.py#L104 """ logger.debug(f"Got invite to {room.room_id} from {event.sender}.") # Attempt to join 3 times before giving up for attempt in range(3): result = await self.client.join(room.room_id) if type(result) == JoinError: logger.error( f"Error joining room {room.room_id} (attempt %d): %s", attempt, result.message, ) else: break else: logger.error("Unable to join room: %s", room.room_id) # Successfully joined room logger.info(f"Joined {room.room_id}") # to_device_callback event async def to_device_callback(self, event: KeyVerificationEvent) -> None: """Handle events sent to device. Specifically this will perform Emoji verification. It will accept an incoming Emoji verification requests and follow the verification protocol. code copied from: https://github.com/8go/matrix-eno-bot/blob/ad037e02bd2960941109e9526c1033dd157bb212/callbacks.py#L127 """ try: client = self.client logger.debug( f"Device Event of type {type(event)} received in " "to_device_cb()." ) if isinstance(event, KeyVerificationStart): # first step """first step: receive KeyVerificationStart KeyVerificationStart( source={'content': {'method': 'm.sas.v1', 'from_device': 'DEVICEIDXY', 'key_agreement_protocols': ['curve25519-hkdf-sha256', 'curve25519'], 'hashes': ['sha256'], 'message_authentication_codes': ['hkdf-hmac-sha256', 'hmac-sha256'], 'short_authentication_string': ['decimal', 'emoji'], 'transaction_id': 'SomeTxId' }, 'type': 'm.key.verification.start', 'sender': '@user2:example.org' }, sender='@user2:example.org', transaction_id='SomeTxId', from_device='DEVICEIDXY', method='m.sas.v1', key_agreement_protocols=[ 'curve25519-hkdf-sha256', 'curve25519'], hashes=['sha256'], message_authentication_codes=[ 'hkdf-hmac-sha256', 'hmac-sha256'], short_authentication_string=['decimal', 'emoji']) """ if "emoji" not in event.short_authentication_string: estr = ( "Other device does not support emoji verification " f"{event.short_authentication_string}. Aborting." ) logger.info(estr) return resp = await client.accept_key_verification(event.transaction_id) if isinstance(resp, ToDeviceError): estr = f"accept_key_verification() failed with {resp}" logger.info(estr) sas = client.key_verifications[event.transaction_id] todevice_msg = sas.share_key() resp = await client.to_device(todevice_msg) if isinstance(resp, ToDeviceError): estr = f"to_device() failed with {resp}" logger.info(estr) elif isinstance(event, KeyVerificationCancel): # anytime """at any time: receive KeyVerificationCancel KeyVerificationCancel(source={ 'content': {'code': 'm.mismatched_sas', 'reason': 'Mismatched authentication string', 'transaction_id': 'SomeTxId'}, 'type': 'm.key.verification.cancel', 'sender': '@user2:example.org'}, sender='@user2:example.org', transaction_id='SomeTxId', code='m.mismatched_sas', reason='Mismatched short authentication string') """ # There is no need to issue a # client.cancel_key_verification(tx_id, reject=False) # here. The SAS flow is already cancelled. # We only need to inform the user. estr = ( f"Verification has been cancelled by {event.sender} " f'for reason "{event.reason}".' ) logger.info(estr) elif isinstance(event, KeyVerificationKey): # second step """Second step is to receive KeyVerificationKey KeyVerificationKey( source={'content': { 'key': 'SomeCryptoKey', 'transaction_id': 'SomeTxId'}, 'type': 'm.key.verification.key', 'sender': '@user2:example.org' }, sender='@user2:example.org', transaction_id='SomeTxId', key='SomeCryptoKey') """ sas = client.key_verifications[event.transaction_id] logger.info(f"{sas.get_emoji()}") # don't log the emojis # The bot process must run in forground with a screen and # keyboard so that user can accept/reject via keyboard. # For emoji verification bot must not run as service or # in background. # yn = input("Do the emojis match? (Y/N) (C for Cancel) ") # automatic match, so we use y yn = "y" if yn.lower() == "y": estr = ( "Match! The verification for this " "device will be accepted." ) logger.info(estr) resp = await client.confirm_short_auth_string(event.transaction_id) if isinstance(resp, ToDeviceError): estr = "confirm_short_auth_string() " f"failed with {resp}" logger.info(estr) elif yn.lower() == "n": # no, don't match, reject estr = ( "No match! Device will NOT be verified " "by rejecting verification." ) logger.info(estr) resp = await client.cancel_key_verification( event.transaction_id, reject=True ) if isinstance(resp, ToDeviceError): estr = f"cancel_key_verification failed with {resp}" logger.info(estr) else: # C or anything for cancel estr = "Cancelled by user! Verification will be " "cancelled." logger.info(estr) resp = await client.cancel_key_verification( event.transaction_id, reject=False ) if isinstance(resp, ToDeviceError): estr = f"cancel_key_verification failed with {resp}" logger.info(estr) elif isinstance(event, KeyVerificationMac): # third step """Third step is to receive KeyVerificationMac KeyVerificationMac( source={'content': { 'mac': {'ed25519:DEVICEIDXY': 'SomeKey1', 'ed25519:SomeKey2': 'SomeKey3'}, 'keys': 'SomeCryptoKey4', 'transaction_id': 'SomeTxId'}, 'type': 'm.key.verification.mac', 'sender': '@user2:example.org'}, sender='@user2:example.org', transaction_id='SomeTxId', mac={'ed25519:DEVICEIDXY': 'SomeKey1', 'ed25519:SomeKey2': 'SomeKey3'}, keys='SomeCryptoKey4') """ sas = client.key_verifications[event.transaction_id] try: todevice_msg = sas.get_mac() except LocalProtocolError as e: # e.g. it might have been cancelled by ourselves estr = ( f"Cancelled or protocol error: Reason: {e}.\n" f"Verification with {event.sender} not concluded. " "Try again?" ) logger.info(estr) else: resp = await client.to_device(todevice_msg) if isinstance(resp, ToDeviceError): estr = f"to_device failed with {resp}" logger.info(estr) estr = ( f"sas.we_started_it = {sas.we_started_it}\n" f"sas.sas_accepted = {sas.sas_accepted}\n" f"sas.canceled = {sas.canceled}\n" f"sas.timed_out = {sas.timed_out}\n" f"sas.verified = {sas.verified}\n" f"sas.verified_devices = {sas.verified_devices}\n" ) logger.info(estr) estr = ( "Emoji verification was successful!\n" "Initiate another Emoji verification from " "another device or room if desired. " "Or if done verifying, hit Control-C to stop the " "bot in order to restart it as a service or to " "run it in the background." ) logger.info(estr) else: estr = ( f"Received unexpected event type {type(event)}. " f"Event is {event}. Event will be ignored." ) logger.info(estr) except BaseException: estr = traceback.format_exc() logger.info(estr) # !chat command async def chat( self, room_id, reply_to_event_id, prompt, sender_id, raw_user_message ): try: await self.client.room_typing(room_id, timeout=300000) if ( self.chatgpt_data[sender_id]["first_time"] or "conversationId" not in self.chatgpt_data[sender_id] ): self.chatgpt_data[sender_id]["first_time"] = False payload = { "message": prompt, } else: payload = { "message": prompt, "conversationId": self.chatgpt_data[sender_id]["conversationId"], "parentMessageId": self.chatgpt_data[sender_id]["parentMessageId"], } payload.update( { "clientOptions": { "clientToUse": "chatgpt", "openaiApiKey": self.openai_api_key, "modelOptions": { "temperature": self.temperature, }, } } ) resp = await self.gptbot.queryChatGPT(payload) content = resp["response"] self.chatgpt_data[sender_id]["conversationId"] = resp["conversationId"] self.chatgpt_data[sender_id]["parentMessageId"] = resp["messageId"] await send_room_message( self.client, room_id, reply_message=content, reply_to_event_id="", sender_id=sender_id, user_message=raw_user_message, markdown_formatted=self.markdown_formatted, ) except Exception as e: await send_room_message(self.client, room_id, reply_message=str(e)) # !gpt command async def gpt( self, room_id, reply_to_event_id, prompt, sender_id, raw_user_message ) -> None: try: # sending typing state await self.client.room_typing(room_id, timeout=30000) # timeout 300s text = await asyncio.wait_for( self.askgpt.oneTimeAsk( prompt, chatgpt_api_endpoint, self.gptheaders, self.temperature ), timeout=300, ) text = text.strip() await send_room_message( self.client, room_id, reply_message=text, reply_to_event_id="", sender_id=sender_id, user_message=raw_user_message, markdown_formatted=self.markdown_formatted, ) except Exception: await send_room_message( self.client, room_id, reply_message="Error encountered, please try again or contact admin.", ) # !bing command async def bing( self, room_id, reply_to_event_id, prompt, sender_id, raw_user_message ) -> None: try: # sending typing state await self.client.room_typing(room_id, timeout=300000) if ( self.bing_data[sender_id]["first_time"] or "conversationId" not in self.bing_data[sender_id] ): self.bing_data[sender_id]["first_time"] = False payload = { "message": prompt, "clientOptions": { "clientToUse": "bing", }, } else: payload = { "message": prompt, "clientOptions": { "clientToUse": "bing", }, "conversationSignature": self.bing_data[sender_id][ "conversationSignature" ], "conversationId": self.bing_data[sender_id]["conversationId"], "clientId": self.bing_data[sender_id]["clientId"], "invocationId": self.bing_data[sender_id]["invocationId"], } resp = await self.gptbot.queryBing(payload) content = "".join( [body["text"] for body in resp["details"]["adaptiveCards"][0]["body"]] ) self.bing_data[sender_id]["conversationSignature"] = resp[ "conversationSignature" ] self.bing_data[sender_id]["conversationId"] = resp["conversationId"] self.bing_data[sender_id]["clientId"] = resp["clientId"] self.bing_data[sender_id]["invocationId"] = resp["invocationId"] text = content.strip() await send_room_message( self.client, room_id, reply_message=text, reply_to_event_id="", sender_id=sender_id, user_message=raw_user_message, markdown_formatted=self.markdown_formatted, ) except Exception as e: await send_room_message(self.client, room_id, reply_message=str(e)) # !bard command async def bard( self, room_id, reply_to_event_id, prompt, sender_id, raw_user_message ) -> None: try: # sending typing state await self.client.room_typing(room_id) response = await self.bard_data[sender_id]["instance"].ask(prompt) content = str(response["content"]).strip() await send_room_message( self.client, room_id, reply_message=content, reply_to_event_id="", sender_id=sender_id, user_message=raw_user_message, markdown_formatted=self.markdown_formatted, ) except TimeoutError: await send_room_message(self.client, room_id, reply_message="TimeoutError") except Exception as e: await send_room_message( self.client, room_id, reply_message="Error calling Bard API, please contact admin.", ) # !lc command async def lc( self, room_id, reply_to_event_id, prompt, sender_id, raw_user_message ) -> None: try: # sending typing state await self.client.room_typing(room_id) if self.flowise_api_key is not None: headers = {"Authorization": f"Bearer {self.flowise_api_key}"} response = await flowise_query(self.flowise_api_url, prompt, self.session, headers) else: response = await flowise_query(self.flowise_api_url, prompt, self.session) await send_room_message( self.client, room_id, reply_message=response, reply_to_event_id="", sender_id=sender_id, user_message=raw_user_message, markdown_formatted=self.markdown_formatted, ) except Exception as e: await send_room_message( self.client, room_id, reply_message="Error calling flowise API, please contact admin.", ) # !talk command async def talk( self, room_id, reply_to_event_id, prompt, sender_id, raw_user_message ) -> None: try: if self.pandora_data[sender_id]["conversation_id"] is not None: data = { "prompt": prompt, "model": self.pandora_api_model, "parent_message_id": self.pandora_data[sender_id][ "parent_message_id" ], "conversation_id": self.pandora_data[sender_id]["conversation_id"], "stream": False, } else: data = { "prompt": prompt, "model": self.pandora_api_model, "parent_message_id": self.pandora_data[sender_id][ "parent_message_id" ], "stream": False, } # sending typing state await self.client.room_typing(room_id) response = await self.pandora.talk(data) self.pandora_data[sender_id]["conversation_id"] = response[ "conversation_id" ] self.pandora_data[sender_id]["parent_message_id"] = response["message"][ "id" ] content = response["message"]["content"]["parts"][0] if self.pandora_data[sender_id]["first_time"]: self.pandora_data[sender_id]["first_time"] = False data = { "model": self.pandora_api_model, "message_id": self.pandora_data[sender_id]["parent_message_id"], } await self.pandora.gen_title( data, self.pandora_data[sender_id]["conversation_id"] ) await send_room_message( self.client, room_id, reply_message=content, reply_to_event_id="", sender_id=sender_id, user_message=raw_user_message, markdown_formatted=self.markdown_formatted, ) except Exception as e: await send_room_message(self.client, room_id, reply_message=str(e)) # !goon command async def goon( self, room_id, reply_to_event_id, sender_id, raw_user_message ) -> None: try: # sending typing state await self.client.room_typing(room_id) data = { "model": self.pandora_api_model, "parent_message_id": self.pandora_data[sender_id]["parent_message_id"], "conversation_id": self.pandora_data[sender_id]["conversation_id"], "stream": False, } response = await self.pandora.goon(data) self.pandora_data[sender_id]["conversation_id"] = response[ "conversation_id" ] self.pandora_data[sender_id]["parent_message_id"] = response["message"][ "id" ] content = response["message"]["content"]["parts"][0] await send_room_message( self.client, room_id, reply_message=content, reply_to_event_id="", sender_id=sender_id, user_message=raw_user_message, markdown_formatted=self.markdown_formatted, ) except Exception as e: await send_room_message(self.client, room_id, reply_message=str(e)) # !new command async def new( self, room_id, reply_to_event_id, sender_id, raw_user_message, new_command_kind, ) -> None: try: if "talk" in new_command_kind: self.pandora_session_init(sender_id) content = ( "New conversation created, please use !talk to start chatting!" ) elif "chat" in new_command_kind: self.chatgpt_session_init(sender_id) content = ( "New conversation created, please use !chat to start chatting!" ) elif "bing" in new_command_kind: self.bing_session_init(sender_id) content = ( "New conversation created, please use !bing to start chatting!" ) elif "bard" in new_command_kind: await self.bard_session_init(sender_id) content = ( "New conversation created, please use !bard to start chatting!" ) else: content = "Unkown keyword, please use !help to see the usage!" await send_room_message( self.client, room_id, reply_message=content, reply_to_event_id="", sender_id=sender_id, user_message=raw_user_message, markdown_formatted=self.markdown_formatted, ) except Exception as e: await send_room_message(self.client, room_id, reply_message=str(e)) # !pic command async def pic(self, room_id, prompt): try: await self.client.room_typing(room_id, timeout=300000) # generate image links = await self.imageGen.get_images(prompt) image_path_list = await self.imageGen.save_images( links, base_path / "images", self.output_four_images ) # send image for image_path in image_path_list: await send_room_image(self.client, room_id, image_path) await self.client.room_typing(room_id, typing_state=False) except Exception as e: await send_room_message(self.client, room_id, reply_message=str(e)) # !help command async def help(self, room_id): help_info = ( "!gpt [prompt], generate a one time response without context conversation\n" + "!chat [prompt], chat with context conversation\n" + "!bing [prompt], chat with context conversation powered by Bing AI\n" + "!bard [prompt], chat with Google's Bard\n" + "!pic [prompt], Image generation by Microsoft Bing\n" + "!talk [content], talk using chatgpt web (pandora)\n" + "!goon, continue the incomplete conversation (pandora)\n" + "!new + [chat,bing,talk,bard], start a new conversation \n" + "!lc [prompt], chat using langchain api\n" + "!help, help message" ) # noqa: E501 await send_room_message(self.client, room_id, reply_message=help_info) # bot login async def login(self) -> None: if self.access_token is not None: logger.info("Login via access_token") else: logger.info("Login via password") try: resp = await self.client.login(password=self.password) if not isinstance(resp, LoginResponse): logger.error("Login Failed") sys.exit(1) except Exception as e: logger.error(f"Error: {e}", exc_info=True) # import keys async def import_keys(self): resp = await self.client.import_keys( self.import_keys_path, self.import_keys_password ) if isinstance(resp, EncryptionError): logger.error(f"import_keys failed with {resp}") else: logger.info( "import_keys success, please remove import_keys configuration!!!" ) # sync messages in the room async def sync_forever(self, timeout=30000, full_state=True) -> None: await self.client.sync_forever(timeout=timeout, full_state=full_state)