Add Image Generation by Microsoft Bing

This commit is contained in:
hibobmaster 2023-03-22 22:28:22 +08:00
parent 599d186ba7
commit 1a12094f1c
Signed by: bobmaster
GPG key ID: FC3F06DF7EA00106
7 changed files with 303 additions and 53 deletions

3
.gitignore vendored
View file

@ -29,6 +29,9 @@ MANIFEST
bot
bot.log
# image generation folder
images/
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.

113
BingImageGen.py Normal file
View file

@ -0,0 +1,113 @@
"""
Code derived from:
https://github.com/acheong08/EdgeGPT/blob/f940cecd24a4818015a8b42a2443dd97c3c2a8f4/src/ImageGen.py
"""
from log import getlogger
from uuid import uuid4
import os
import urllib
import time
import requests
import regex
BING_URL = "https://www.bing.com"
logger = getlogger()
class ImageGen:
"""
Image generation by Microsoft Bing
Parameters:
auth_cookie: str
"""
def __init__(self, auth_cookie: str) -> None:
self.session: requests.Session = requests.Session()
self.session.headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "en-US,en;q=0.9",
"cache-control": "max-age=0",
"content-type": "application/x-www-form-urlencoded",
"referrer": "https://www.bing.com/images/create/",
"origin": "https://www.bing.com",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.63",
}
self.session.cookies.set("_U", auth_cookie)
def get_images(self, prompt: str) -> list:
"""
Fetches image links from Bing
Parameters:
prompt: str
"""
print("Sending request...")
url_encoded_prompt = urllib.parse.quote(prompt)
# https://www.bing.com/images/create?q=<PROMPT>&rt=4&FORM=GENCRE
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
response = self.session.post(url, allow_redirects=False)
if response.status_code != 302:
logger.error(f"ERROR: {response.text}")
return []
# Get redirect URL
redirect_url = response.headers["Location"]
request_id = redirect_url.split("id=")[-1]
self.session.get(f"{BING_URL}{redirect_url}")
# https://www.bing.com/images/create/async/results/{ID}?q={PROMPT}
polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
# Poll for results
print("Waiting for results...")
while True:
print(".", end="", flush=True)
response = self.session.get(polling_url)
if response.status_code != 200:
logger.error("Could not get results", exc_info=True)
return []
if response.text == "":
time.sleep(1)
continue
else:
break
# Use regex to search for src=""
image_links = regex.findall(r'src="([^"]+)"', response.text)
# Remove duplicates
return list(set(image_links))
def save_images(self, links: list, output_dir: str) -> str:
"""
Saves images to output directory
"""
print("\nDownloading images...")
try:
os.mkdir(output_dir)
except FileExistsError:
pass
# image name
image_name = str(uuid4())
# since matrix only support one media attachment per message, we just need one link
if links:
link = links.pop()
else:
logger.error("Get Image URL failed")
# return "" if there is no link
return ""
with self.session.get(link, stream=True) as response:
# save response to file
response.raise_for_status()
with open(f"{output_dir}/{image_name}.jpeg", "wb") as output_file:
for chunk in response.iter_content(chunk_size=8192):
output_file.write(chunk)
# image_num = 0
# for link in links:
# with self.session.get(link, stream=True) as response:
# # save response to file
# response.raise_for_status()
# with open(f"{output_dir}/{image_num}.jpeg", "wb") as output_file:
# for chunk in response.iter_content(chunk_size=8192):
# output_file.write(chunk)
#
# image_num += 1
# return image path
return f"{output_dir}/{image_name}.jpeg"

166
bot.py
View file

@ -10,6 +10,8 @@ from send_message import send_room_message
from v3 import Chatbot
from log import getlogger
from bing import BingBot
from BingImageGen import ImageGen
from send_image import send_room_image
"""
free api_endpoint from https://github.com/ayaka14732/ChatGPTAPIFree
"""
@ -32,6 +34,7 @@ class Bot:
bing_api_endpoint: Optional[str] = '',
access_token: Optional[str] = '',
jailbreakEnabled: Optional[bool] = False,
bing_auth_cookie: Optional[str] = '',
):
self.homeserver = homeserver
self.user_id = user_id
@ -41,6 +44,7 @@ class Bot:
self.api_key = api_key
self.bing_api_endpoint = bing_api_endpoint
self.jailbreakEnabled = jailbreakEnabled
self.bing_auth_cookie = bing_auth_cookie
# initialize AsyncClient object
self.store_path = os.getcwd()
self.config = AsyncClientConfig(store=SqliteStore,
@ -56,6 +60,9 @@ class Bot:
self.gpt_prog = re.compile(r"^\s*!gpt\s*(.+)$")
self.chat_prog = re.compile(r"^\s*!chat\s*(.+)$")
self.bing_prog = re.compile(r"^\s*!bing\s*(.+)$")
self.pic_prog = re.compile(r"^\s*!pic\s*(.+)$")
self.help_prog = re.compile(r"^\s*!help\s*.*$")
# initialize chatbot and chatgpt_api_endpoint
if self.api_key != '':
self.chatbot = Chatbot(api_key=self.api_key)
@ -76,6 +83,10 @@ class Bot:
if self.bing_api_endpoint != '':
self.bingbot = BingBot(bing_api_endpoint, jailbreakEnabled=self.jailbreakEnabled)
# initialize BingImageGen
if self.bing_auth_cookie != '':
self.imageGen = ImageGen(self.bing_auth_cookie)
# message_callback event
async def message_callback(self, room: MatrixRoom, event: RoomMessageText) -> None:
if self.room_id == '':
@ -95,62 +106,119 @@ class Bot:
f"{room.user_name(event.sender)} | {event.body}"
)
# remove newline character from event.body
event.body = re.sub("\r\n|\r|\n", " ", event.body)
if self.user_id != event.sender:
# remove newline character from event.body
event.body = re.sub("\r\n|\r|\n", " ", event.body)
# chatgpt
n = self.chat_prog.match(event.body)
if n:
if self.api_key != '':
# sending typing state
await self.client.room_typing(room_id)
# chatgpt
n = self.chat_prog.match(event.body)
if n:
prompt = n.group(1)
try:
# run synchronous function in different thread
text = await asyncio.to_thread(self.chatbot.ask, prompt)
text = text.strip()
await send_room_message(self.client, room_id, send_text=text,
reply_to_event_id=reply_to_event_id)
except Exception as e:
logger.error("Error", exc_info=True)
print(f"Error: {e}")
pass
else:
logger.warning("No API_KEY provided")
await send_room_message(self.client, room_id, send_text="API_KEY not provided")
if self.api_key != '':
await self.gpt(room_id, reply_to_event_id, prompt)
else:
logger.warning("No API_KEY provided")
await send_room_message(self.client, room_id, send_text="API_KEY not provided")
m = self.gpt_prog.match(event.body)
if m:
# sending typing state
await self.client.room_typing(room_id)
prompt = m.group(1)
try:
# timeout 60s
text = await asyncio.wait_for(ask(prompt, self.chatgpt_api_endpoint, self.headers), timeout=60)
except TimeoutError:
logger.error("timeoutException", exc_info=True)
text = "Timeout error"
m = self.gpt_prog.match(event.body)
if m:
prompt = m.group(1)
await self.chat(room_id, reply_to_event_id, prompt)
# bing ai
if self.bing_api_endpoint != '':
b = self.bing_prog.match(event.body)
if b:
prompt = b.group(1)
await self.bing(room_id, reply_to_event_id, prompt)
# Image Generation by Microsoft Bing
if self.bing_auth_cookie != '':
i = self.pic_prog.match(event.body)
if i:
prompt = i.group(1)
await self.pic(room_id, prompt)
# help command
h = self.help_prog.match(event.body)
if h:
await self.help(room_id)
# !gpt command
async def gpt(self, room_id, reply_to_event_id, prompt):
await self.client.room_typing(room_id)
try:
# run synchronous function in different thread
text = await asyncio.to_thread(self.chatbot.ask, prompt)
text = text.strip()
await send_room_message(self.client, room_id, send_text=text,
reply_to_event_id=reply_to_event_id)
except Exception as e:
logger.error("Error", exc_info=True)
print(f"Error: {e}")
# bing ai
if self.bing_api_endpoint != '':
b = self.bing_prog.match(event.body)
if b:
# sending typing state
await self.client.room_typing(room_id)
prompt = b.group(1)
try:
# timeout 120s
text = await asyncio.wait_for(self.bingbot.ask_bing(prompt), timeout=120)
except TimeoutError:
logger.error("timeoutException", exc_info=True)
text = "Timeout error"
text = text.strip()
await send_room_message(self.client, room_id, send_text=text,
reply_to_event_id=reply_to_event_id)
# !chat command
async def chat(self, room_id, reply_to_event_id, prompt):
try:
# sending typing state
await self.client.room_typing(room_id)
# timeout 120s
text = await asyncio.wait_for(ask(prompt, self.chatgpt_api_endpoint, self.headers), timeout=120)
except TimeoutError:
logger.error("timeoutException", exc_info=True)
text = "Timeout error"
text = text.strip()
try:
await send_room_message(self.client, room_id, send_text=text,
reply_to_event_id=reply_to_event_id)
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
# !bing command
async def bing(self, room_id, reply_to_event_id, prompt):
try:
# sending typing state
await self.client.room_typing(room_id)
# timeout 120s
text = await asyncio.wait_for(self.bingbot.ask_bing(prompt), timeout=120)
except TimeoutError:
logger.error("timeoutException", exc_info=True)
text = "Timeout error"
text = text.strip()
try:
await send_room_message(self.client, room_id, send_text=text,
reply_to_event_id=reply_to_event_id)
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
# !pic command
async def pic(self, room_id, prompt):
try:
# generate image
generated_image_path = self.imageGen.save_images(
self.imageGen.get_images(prompt),
"images",
)
# send image
if generated_image_path != "":
await send_room_image(self.client, room_id, generated_image_path)
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
# !help command
async def help(self, room_id):
try:
# sending typing state
await self.client.room_typing(room_id)
help_info = "!gpt [content], generate response without context conversation\n" + \
"!chat [content], chat with context conversation\n" + \
"!bing [content], chat with context conversation powered by Bing AI\n" + \
"!pic [prompt], Image generation by Microsoft Bing"
await send_room_message(self.client, room_id, send_text=help_info)
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
# bot login
async def login(self) -> None:
@ -161,7 +229,7 @@ class Bot:
print(f"Login Failed: {resp}")
sys.exit(1)
except Exception as e:
logger.error("Error Exception", exc_info=True)
logger.error(f"Error: {e}", exc_info=True)
# sync messages in the room
async def sync_forever(self, timeout=30000):

View file

@ -16,6 +16,7 @@ async def main():
bing_api_endpoint=config.get('bing_api_endpoint', ''),
access_token=config.get('access_token', ''),
jailbreakEnabled=config.get('jailbreakEnabled', False),
bing_auth_cookie=config.get('bing_auth_cookie', ''),
)
if config.get('access_token', '') == '':
await matrix_bot.login()

View file

@ -24,10 +24,12 @@ lxml==4.9.2
matrix-nio==0.20.1
multidict==6.0.4
peewee==3.16.0
Pillow==9.4.0
pycparser==2.21
pycryptodome==3.17
pycryptodomex==3.17
pyrsistent==0.19.3
python-magic==0.4.27
python-olm==3.1.3
python-socks==2.1.1
regex==2022.10.31

59
send_image.py Normal file
View file

@ -0,0 +1,59 @@
"""
code derived from:
https://matrix-nio.readthedocs.io/en/latest/examples.html#sending-an-image
"""
import os
import aiofiles.os
import magic
from PIL import Image
from nio import AsyncClient, UploadResponse
from log import getlogger
logger = getlogger()
async def send_room_image(client: AsyncClient,
room_id: str, image: str):
"""
image: image path
"""
mime_type = magic.from_file(image, mime=True) # e.g. "image/jpeg"
im = Image.open(image)
(width, height) = im.size # im.size returns (width,height) tuple
# first do an upload of image, then send URI of upload to room
file_stat = await aiofiles.os.stat(image)
async with aiofiles.open(image, "r+b") as f:
resp, maybe_keys = await client.upload(
f,
content_type=mime_type, # image/jpeg
filename=os.path.basename(image),
filesize=file_stat.st_size,
)
if not isinstance(resp, UploadResponse):
logger.warning(f"Failed to generate image. Failure response: {resp}")
await client.room_send(
room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": f"Failed to generate image. Failure response: {resp}", },
ignore_unverified_devices=True,
)
return
content = {
"body": os.path.basename(image), # descriptive title
"info": {
"size": file_stat.st_size,
"mimetype": mime_type,
"w": width, # width in pixel
"h": height, # height in pixel
},
"msgtype": "m.image",
"url": resp.content_uri,
}
try:
await client.room_send(room_id, message_type="m.room.message", content=content)
except Exception as e:
logger.error(f"Image send of file {image} failed.\n Error: {e}", exc_info=True)

View file

@ -3,13 +3,17 @@ from nio import AsyncClient
async def send_room_message(client: AsyncClient,
room_id: str,
reply_to_event_id: str,
send_text: str) -> None:
send_text: str,
reply_to_event_id: str = '') -> None:
if reply_to_event_id == '':
content = {"msgtype": "m.text", "body": f"{send_text}", }
else:
content={"msgtype": "m.text", "body": f"{send_text}",
"m.relates_to": {"m.in_reply_to": {"event_id": reply_to_event_id}}, }
await client.room_send(
room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": f"{send_text}",
"m.relates_to": {"m.in_reply_to": {"event_id": reply_to_event_id}}},
content=content,
ignore_unverified_devices=True,
)
await client.room_typing(room_id, typing_state=False)