mattermost_bot/bot.py

427 lines
17 KiB
Python

from mattermostdriver import Driver
from typing import Optional
import json
import asyncio
import re
import os
import aiohttp
from askgpt import askGPT
from v3 import Chatbot
from bing import BingBot
from bard import Bardbot
from BingImageGen import ImageGenAsync
from log import getlogger
from pandora import Pandora
import uuid
logger = getlogger()
class Bot:
def __init__(
self,
server_url: str,
username: str,
access_token: Optional[str] = None,
login_id: Optional[str] = None,
password: Optional[str] = None,
openai_api_key: Optional[str] = None,
openai_api_endpoint: Optional[str] = None,
bing_api_endpoint: Optional[str] = None,
pandora_api_endpoint: Optional[str] = None,
pandora_api_model: Optional[str] = None,
bard_token: Optional[str] = None,
bing_auth_cookie: Optional[str] = None,
port: int = 443,
timeout: int = 30,
) -> None:
if server_url is None:
raise ValueError("server url must be provided")
if port is None:
self.port = 443
if timeout is None:
self.timeout = 30
# login relative info
if access_token is None and password is None:
raise ValueError("Either token or password must be provided")
if access_token is not None:
self.driver = Driver(
{
"token": access_token,
"url": server_url,
"port": self.port,
"request_timeout": self.timeout,
}
)
else:
self.driver = Driver(
{
"login_id": login_id,
"password": password,
"url": server_url,
"port": self.port,
"request_timeout": self.timeout,
}
)
# @chatgpt
if username is None:
raise ValueError("username must be provided")
else:
self.username = username
# openai_api_endpoint
if openai_api_endpoint is None:
self.openai_api_endpoint = "https://api.openai.com/v1/chat/completions"
else:
self.openai_api_endpoint = openai_api_endpoint
# aiohttp session
self.session = aiohttp.ClientSession()
self.openai_api_key = openai_api_key
# initialize chatGPT class
if self.openai_api_key is not None:
# request header for !gpt command
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.openai_api_key}",
}
self.askgpt = askGPT(
self.session,
self.openai_api_endpoint,
self.headers,
)
self.chatbot = Chatbot(api_key=self.openai_api_key)
else:
logger.warning(
"openai_api_key is not provided, !gpt and !chat command will not work"
)
self.bing_api_endpoint = bing_api_endpoint
# initialize bingbot
if self.bing_api_endpoint is not None:
self.bingbot = BingBot(
session=self.session,
bing_api_endpoint=self.bing_api_endpoint,
)
else:
logger.warning(
"bing_api_endpoint is not provided, !bing command will not work"
)
# initialize pandora
if pandora_api_endpoint is not None:
self.pandora_api_endpoint = pandora_api_endpoint
self.pandora = Pandora(
api_endpoint=pandora_api_endpoint,
clientSession=self.session
)
if pandora_api_model is None:
self.pandora_api_model = "text-davinci-002-render-sha-mobile"
else:
self.pandora_api_model = pandora_api_model
self.bard_token = bard_token
# initialize bard
if self.bard_token is not None:
self.bardbot = Bardbot(session_id=self.bard_token)
else:
logger.warning("bard_token is not provided, !bard command will not work")
self.bing_auth_cookie = bing_auth_cookie
# initialize image generator
if self.bing_auth_cookie is not None:
self.imagegen = ImageGenAsync(auth_cookie=self.bing_auth_cookie)
else:
logger.warning(
"bing_auth_cookie is not provided, !pic command will not work"
)
# regular expression to match keyword
self.gpt_prog = re.compile(r"^\s*!gpt\s*(.+)$")
self.chat_prog = re.compile(r"^\s*!chat\s*(.+)$")
self.bing_prog = re.compile(r"^\s*!bing\s*(.+)$")
self.bard_prog = re.compile(r"^\s*!bard\s*(.+)$")
self.pic_prog = re.compile(r"^\s*!pic\s*(.+)$")
self.help_prog = re.compile(r"^\s*!help\s*.*$")
self.talk_prog = re.compile(r"^\s*!talk\s*(.+)$")
self.goon_prog = re.compile(r"^\s*!goon\s*.*$")
self.new_prog = re.compile(r"^\s*!new\s*.*$")
self.pandora_data = {}
# close session
def __del__(self) -> None:
self.driver.disconnect()
def login(self) -> None:
self.driver.login()
def pandora_init(self, user_id: str) -> None:
self.pandora_data[user_id] = {
"conversation_id": None,
"parent_message_id": str(uuid.uuid4()),
"first_time": True
}
async def run(self) -> None:
await self.driver.init_websocket(self.websocket_handler)
# websocket handler
async def websocket_handler(self, message) -> None:
print(message)
response = json.loads(message)
if "event" in response:
event_type = response["event"]
if event_type == "posted":
raw_data = response["data"]["post"]
raw_data_dict = json.loads(raw_data)
user_id = raw_data_dict["user_id"]
channel_id = raw_data_dict["channel_id"]
sender_name = response["data"]["sender_name"]
raw_message = raw_data_dict["message"]
if user_id not in self.pandora_data:
self.pandora_init(user_id)
try:
asyncio.create_task(
self.message_callback(
raw_message, channel_id, user_id, sender_name
)
)
except Exception as e:
await asyncio.to_thread(self.send_message, channel_id, f"{e}")
# message callback
async def message_callback(
self, raw_message: str, channel_id: str, user_id: str, sender_name: str
) -> None:
# prevent command trigger loop
if sender_name != self.username:
message = raw_message
if self.openai_api_key is not None:
# !gpt command trigger handler
if self.gpt_prog.match(message):
prompt = self.gpt_prog.match(message).group(1)
try:
response = await self.gpt(prompt)
await asyncio.to_thread(
self.send_message, channel_id, f"{response}"
)
except Exception as e:
logger.error(e, exc_info=True)
raise Exception(e)
# !chat command trigger handler
elif self.chat_prog.match(message):
prompt = self.chat_prog.match(message).group(1)
try:
response = await self.chat(prompt)
await asyncio.to_thread(
self.send_message, channel_id, f"{response}"
)
except Exception as e:
logger.error(e, exc_info=True)
raise Exception(e)
if self.bing_api_endpoint is not None:
# !bing command trigger handler
if self.bing_prog.match(message):
prompt = self.bing_prog.match(message).group(1)
try:
response = await self.bingbot.ask_bing(prompt)
await asyncio.to_thread(
self.send_message, channel_id, f"{response}"
)
except Exception as e:
logger.error(e, exc_info=True)
raise Exception(e)
if self.pandora_api_endpoint is not None:
# !talk command trigger handler
if self.talk_prog.match(message):
prompt = self.talk_prog.match(message).group(1)
try:
if self.pandora_data[user_id]["conversation_id"] is not None:
data = {
"prompt": prompt,
"model": self.pandora_api_model,
"parent_message_id": self.pandora_data[user_id]["parent_message_id"],
"conversation_id": self.pandora_data[user_id]["conversation_id"],
"stream": False,
}
else:
data = {
"prompt": prompt,
"model": self.pandora_api_model,
"parent_message_id": self.pandora_data[user_id]["parent_message_id"],
"stream": False,
}
response = await self.pandora.talk(data)
self.pandora_data[user_id]["conversation_id"] = response['conversation_id']
self.pandora_data[user_id]["parent_message_id"] = response['message']['id']
content = response['message']['content']['parts'][0]
if self.pandora_data[user_id]["first_time"]:
self.pandora_data[user_id]["first_time"] = False
data = {
"model": self.pandora_api_model,
"message_id": self.pandora_data[user_id]["parent_message_id"],
}
await self.pandora.gen_title(data, self.pandora_data[user_id]["conversation_id"])
await asyncio.to_thread(
self.send_message, channel_id, f"{content}"
)
except Exception as e:
logger.error(e, exc_info=True)
raise Exception(e)
# !goon command trigger handler
if self.goon_prog.match(message) and self.pandora_data[user_id]["conversation_id"] is not None:
try:
data = {
"model": self.pandora_api_model,
"parent_message_id": self.pandora_data[user_id]["parent_message_id"],
"conversation_id": self.pandora_data[user_id]["conversation_id"],
"stream": False,
}
response = await self.pandora.goon(data)
self.pandora_data[user_id]["conversation_id"] = response['conversation_id']
self.pandora_data[user_id]["parent_message_id"] = response['message']['id']
content = response['message']['content']['parts'][0]
await asyncio.to_thread(
self.send_message, channel_id, f"{content}"
)
except Exception as e:
logger.error(e, exc_info=True)
raise Exception(e)
# !new command trigger handler
if self.new_prog.match(message):
self.pandora_init(user_id)
try:
await asyncio.to_thread(
self.send_message, channel_id, "New conversation created, please use !talk to start chatting!"
)
except Exception:
pass
if self.bard_token is not None:
# !bard command trigger handler
if self.bard_prog.match(message):
prompt = self.bard_prog.match(message).group(1)
try:
# response is dict object
response = await self.bard(prompt)
content = str(response["content"]).strip()
await asyncio.to_thread(
self.send_message, channel_id, f"{content}"
)
except Exception as e:
logger.error(e, exc_info=True)
raise Exception(e)
if self.bing_auth_cookie is not None:
# !pic command trigger handler
if self.pic_prog.match(message):
prompt = self.pic_prog.match(message).group(1)
# generate image
try:
links = await self.imagegen.get_images(prompt)
image_path = await self.imagegen.save_images(links, "images")
except Exception as e:
logger.error(e, exc_info=True)
raise Exception(e)
# send image
try:
await asyncio.to_thread(
self.send_file, channel_id, prompt, image_path
)
except Exception as e:
logger.error(e, exc_info=True)
raise Exception(e)
# !help command trigger handler
if self.help_prog.match(message):
try:
await asyncio.to_thread(self.send_message, channel_id, self.help())
except Exception as e:
logger.error(e, exc_info=True)
# send message to room
def send_message(self, channel_id: str, message: str) -> None:
self.driver.posts.create_post(
options={
"channel_id": channel_id,
"message": message
}
)
# send file to room
def send_file(self, channel_id: str, message: str, filepath: str) -> None:
filename = os.path.split(filepath)[-1]
try:
file_id = self.driver.files.upload_file(
channel_id=channel_id,
files={
"files": (filename, open(filepath, "rb")),
},
)["file_infos"][0]["id"]
except Exception as e:
logger.error(e, exc_info=True)
raise Exception(e)
try:
self.driver.posts.create_post(
options={
"channel_id": channel_id,
"message": message,
"file_ids": [file_id],
}
)
# remove image after posting
os.remove(filepath)
except Exception as e:
logger.error(e, exc_info=True)
raise Exception(e)
# !gpt command function
async def gpt(self, prompt: str) -> str:
return await self.askgpt.oneTimeAsk(prompt)
# !chat command function
async def chat(self, prompt: str) -> str:
return await self.chatbot.ask_async(prompt)
# !bing command function
async def bing(self, prompt: str) -> str:
return await self.bingbot.ask_bing(prompt)
# !bard command function
async def bard(self, prompt: str) -> str:
return await asyncio.to_thread(self.bardbot.ask, prompt)
# !help command function
def help(self) -> str:
help_info = (
"!gpt [content], generate response without context conversation\n"
+ "!chat [content], chat with context conversation\n"
+ "!bing [content], chat with context conversation powered by Bing AI\n"
+ "!bard [content], chat with Google's Bard\n"
+ "!pic [prompt], Image generation by Microsoft Bing\n"
+ "!talk [content], talk using chatgpt web\n"
+ "!goon, continue the incomplete conversation\n"
+ "!new, start a new conversation\n"
+ "!help, help message"
)
return help_info