2023-03-22 14:28:22 +00:00
|
|
|
"""
|
|
|
|
Code derived from:
|
|
|
|
https://github.com/acheong08/EdgeGPT/blob/f940cecd24a4818015a8b42a2443dd97c3c2a8f4/src/ImageGen.py
|
|
|
|
"""
|
|
|
|
from log import getlogger
|
2023-04-10 17:04:06 +00:00
|
|
|
|
|
|
|
from typing import Union
|
2023-03-22 14:28:22 +00:00
|
|
|
from uuid import uuid4
|
|
|
|
import os
|
2023-04-10 17:04:06 +00:00
|
|
|
import contextlib
|
|
|
|
import aiohttp
|
|
|
|
import asyncio
|
|
|
|
import random
|
2023-03-22 14:28:22 +00:00
|
|
|
import time
|
|
|
|
import requests
|
|
|
|
import regex
|
|
|
|
|
|
|
|
logger = getlogger()
|
|
|
|
|
2023-04-10 17:04:06 +00:00
|
|
|
BING_URL = "https://www.bing.com"
|
|
|
|
# Generate random IP between range 13.104.0.0/14
|
|
|
|
FORWARDED_IP = (
|
|
|
|
f"13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}"
|
|
|
|
)
|
|
|
|
HEADERS = {
|
|
|
|
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
|
|
|
|
"accept-language": "en-US,en;q=0.9",
|
|
|
|
"cache-control": "max-age=0",
|
|
|
|
"content-type": "application/x-www-form-urlencoded",
|
|
|
|
"referrer": "https://www.bing.com/images/create/",
|
|
|
|
"origin": "https://www.bing.com",
|
|
|
|
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.63",
|
|
|
|
"x-forwarded-for": FORWARDED_IP,
|
|
|
|
}
|
|
|
|
|
|
|
|
# Error messages
|
|
|
|
error_timeout = "Your request has timed out."
|
|
|
|
error_redirect = "Redirect failed"
|
|
|
|
error_blocked_prompt = (
|
|
|
|
"Your prompt has been blocked by Bing. Try to change any bad words and try again."
|
|
|
|
)
|
|
|
|
error_noresults = "Could not get results"
|
|
|
|
error_unsupported_lang = "\nthis language is currently not supported by bing"
|
|
|
|
error_bad_images = "Bad images"
|
|
|
|
error_no_images = "No images"
|
|
|
|
#
|
|
|
|
sending_message = "Sending request..."
|
|
|
|
wait_message = "Waiting for results..."
|
|
|
|
download_message = "\nDownloading images..."
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def debug(debug_file, text_var):
|
|
|
|
"""helper function for debug"""
|
|
|
|
with open(f"{debug_file}", "a") as f:
|
|
|
|
f.write(str(text_var))
|
|
|
|
|
2023-03-22 14:28:22 +00:00
|
|
|
|
|
|
|
class ImageGen:
|
|
|
|
"""
|
|
|
|
Image generation by Microsoft Bing
|
2023-04-10 17:04:06 +00:00
|
|
|
Parameters:3
|
2023-03-22 14:28:22 +00:00
|
|
|
auth_cookie: str
|
|
|
|
"""
|
|
|
|
|
2023-04-10 17:04:06 +00:00
|
|
|
def __init__(
|
|
|
|
self, auth_cookie: str, debug_file: Union[str, None] = None, quiet: bool = False
|
|
|
|
) -> None:
|
2023-03-22 14:28:22 +00:00
|
|
|
self.session: requests.Session = requests.Session()
|
2023-04-10 17:04:06 +00:00
|
|
|
self.session.headers = HEADERS
|
2023-03-22 14:28:22 +00:00
|
|
|
self.session.cookies.set("_U", auth_cookie)
|
2023-04-10 17:04:06 +00:00
|
|
|
self.quiet = quiet
|
|
|
|
self.debug_file = debug_file
|
|
|
|
if self.debug_file:
|
|
|
|
self.debug = partial(debug, self.debug_file)
|
|
|
|
|
2023-03-22 14:28:22 +00:00
|
|
|
|
|
|
|
def get_images(self, prompt: str) -> list:
|
|
|
|
"""
|
|
|
|
Fetches image links from Bing
|
|
|
|
Parameters:
|
|
|
|
prompt: str
|
|
|
|
"""
|
2023-04-10 17:04:06 +00:00
|
|
|
if not self.quiet:
|
|
|
|
print(sending_message)
|
|
|
|
if self.debug_file:
|
|
|
|
self.debug(sending_message)
|
|
|
|
url_encoded_prompt = requests.utils.quote(prompt)
|
|
|
|
# https://www.bing.com/images/create?q=<PROMPT>&rt=3&FORM=GENCRE
|
2023-03-22 14:28:22 +00:00
|
|
|
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
|
|
|
|
response = self.session.post(url, allow_redirects=False)
|
2023-04-10 17:04:06 +00:00
|
|
|
# check for content waring message
|
|
|
|
if "this prompt has been blocked" in response.text.lower():
|
|
|
|
if self.debug_file:
|
|
|
|
self.debug(f"ERROR: {error_blocked_prompt}")
|
|
|
|
raise Exception(
|
|
|
|
error_blocked_prompt,
|
|
|
|
)
|
|
|
|
if (
|
|
|
|
"we're working hard to offer image creator in more languages"
|
|
|
|
in response.text.lower()
|
|
|
|
):
|
|
|
|
if self.debug_file:
|
|
|
|
self.debug(f"ERROR: {error_unsupported_lang}")
|
|
|
|
raise Exception(error_unsupported_lang)
|
2023-03-22 14:28:22 +00:00
|
|
|
if response.status_code != 302:
|
2023-04-10 17:04:06 +00:00
|
|
|
# if rt4 fails, try rt3
|
|
|
|
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
|
|
|
|
response3 = self.session.post(url, allow_redirects=False, timeout=200)
|
2023-03-22 15:40:32 +00:00
|
|
|
if response3.status_code != 302:
|
2023-04-10 17:04:06 +00:00
|
|
|
if self.debug_file:
|
|
|
|
self.debug(f"ERROR: {error_redirect}")
|
|
|
|
print(f"ERROR: {response3.text}")
|
|
|
|
raise Exception(error_redirect)
|
|
|
|
response = response3
|
2023-03-22 14:28:22 +00:00
|
|
|
# Get redirect URL
|
2023-03-22 15:40:32 +00:00
|
|
|
redirect_url = response.headers["Location"].replace("&nfy=1", "")
|
2023-03-22 14:28:22 +00:00
|
|
|
request_id = redirect_url.split("id=")[-1]
|
|
|
|
self.session.get(f"{BING_URL}{redirect_url}")
|
|
|
|
# https://www.bing.com/images/create/async/results/{ID}?q={PROMPT}
|
|
|
|
polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
|
|
|
|
# Poll for results
|
2023-04-10 17:04:06 +00:00
|
|
|
if self.debug_file:
|
|
|
|
self.debug("Polling and waiting for result")
|
|
|
|
if not self.quiet:
|
|
|
|
print("Waiting for results...")
|
|
|
|
start_wait = time.time()
|
2023-03-22 14:28:22 +00:00
|
|
|
while True:
|
2023-04-10 17:04:06 +00:00
|
|
|
if int(time.time() - start_wait) > 200:
|
|
|
|
if self.debug_file:
|
|
|
|
self.debug(f"ERROR: {error_timeout}")
|
|
|
|
raise Exception(error_timeout)
|
|
|
|
if not self.quiet:
|
|
|
|
print(".", end="", flush=True)
|
2023-03-22 14:28:22 +00:00
|
|
|
response = self.session.get(polling_url)
|
|
|
|
if response.status_code != 200:
|
2023-04-10 17:04:06 +00:00
|
|
|
if self.debug_file:
|
|
|
|
self.debug(f"ERROR: {error_noresults}")
|
|
|
|
raise Exception(error_noresults)
|
|
|
|
if not response.text or response.text.find("errorMessage") != -1:
|
2023-03-22 14:28:22 +00:00
|
|
|
time.sleep(1)
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
break
|
|
|
|
# Use regex to search for src=""
|
|
|
|
image_links = regex.findall(r'src="([^"]+)"', response.text)
|
2023-03-28 12:40:33 +00:00
|
|
|
# Remove size limit
|
|
|
|
normal_image_links = [link.split("?w=")[0] for link in image_links]
|
2023-03-22 14:28:22 +00:00
|
|
|
# Remove duplicates
|
2023-04-10 17:04:06 +00:00
|
|
|
normal_image_links = list(set(normal_image_links))
|
|
|
|
|
|
|
|
# bad_images = [
|
|
|
|
# "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
|
|
|
|
# "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
|
|
|
|
# ]
|
|
|
|
# for img in normal_image_links:
|
|
|
|
# if img in bad_images:
|
|
|
|
# raise Exception("Bad images")
|
|
|
|
# No images
|
|
|
|
if not normal_image_links:
|
|
|
|
raise Exception(error_no_images)
|
|
|
|
return normal_image_links
|
2023-03-22 14:28:22 +00:00
|
|
|
|
|
|
|
def save_images(self, links: list, output_dir: str) -> str:
|
|
|
|
"""
|
|
|
|
Saves images to output directory
|
|
|
|
"""
|
2023-04-10 17:04:06 +00:00
|
|
|
|
|
|
|
# image name
|
|
|
|
image_name = str(uuid4())
|
|
|
|
# since matrix only support one media attachment per message, we just need one link
|
|
|
|
if links:
|
|
|
|
link = links.pop()
|
|
|
|
|
|
|
|
image_path = os.path.join(output_dir, f"{image_name}.jpeg")
|
|
|
|
|
|
|
|
with contextlib.suppress(FileExistsError):
|
|
|
|
os.mkdir(output_dir)
|
2023-03-22 14:28:22 +00:00
|
|
|
try:
|
2023-04-10 17:04:06 +00:00
|
|
|
with self.session.get(link, stream=True) as response:
|
|
|
|
# save response to file
|
|
|
|
response.raise_for_status()
|
|
|
|
with open(
|
|
|
|
os.path.join(output_dir, image_path), "wb"
|
|
|
|
) as output_file:
|
|
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
|
|
output_file.write(chunk)
|
|
|
|
return image_path
|
|
|
|
except requests.exceptions.MissingSchema as url_exception:
|
|
|
|
raise Exception(
|
|
|
|
"Inappropriate contents found in the generated images. Please try again or try another prompt.",
|
|
|
|
) from url_exception
|
|
|
|
|
|
|
|
|
|
|
|
class ImageGenAsync:
|
|
|
|
"""
|
|
|
|
Image generation by Microsoft Bing
|
|
|
|
Parameters:
|
|
|
|
auth_cookie: str
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, auth_cookie: str, quiet: bool = True) -> None:
|
|
|
|
self.session = aiohttp.ClientSession(
|
|
|
|
headers=HEADERS,
|
|
|
|
cookies={"_U": auth_cookie},
|
|
|
|
)
|
|
|
|
self.quiet = quiet
|
|
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
|
return self
|
|
|
|
|
|
|
|
async def __aexit__(self, *excinfo) -> None:
|
|
|
|
await self.session.close()
|
|
|
|
|
|
|
|
async def get_images(self, prompt: str) -> list:
|
|
|
|
"""
|
|
|
|
Fetches image links from Bing
|
|
|
|
Parameters:
|
|
|
|
prompt: str
|
|
|
|
"""
|
|
|
|
if not self.quiet:
|
|
|
|
print("Sending request...")
|
|
|
|
url_encoded_prompt = requests.utils.quote(prompt)
|
|
|
|
# https://www.bing.com/images/create?q=<PROMPT>&rt=3&FORM=GENCRE
|
|
|
|
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
|
|
|
|
async with self.session.post(url, allow_redirects=False) as response:
|
|
|
|
content = await response.text()
|
|
|
|
if "this prompt has been blocked" in content.lower():
|
|
|
|
raise Exception(
|
|
|
|
"Your prompt has been blocked by Bing. Try to change any bad words and try again.",
|
|
|
|
)
|
|
|
|
if response.status != 302:
|
|
|
|
# if rt4 fails, try rt3
|
|
|
|
url = (
|
|
|
|
f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
|
|
|
|
)
|
|
|
|
async with self.session.post(
|
|
|
|
url,
|
|
|
|
allow_redirects=False,
|
|
|
|
timeout=200,
|
|
|
|
) as response3:
|
|
|
|
if response3.status != 302:
|
|
|
|
print(f"ERROR: {response3.text}")
|
|
|
|
raise Exception("Redirect failed")
|
|
|
|
response = response3
|
|
|
|
# Get redirect URL
|
|
|
|
redirect_url = response.headers["Location"].replace("&nfy=1", "")
|
|
|
|
request_id = redirect_url.split("id=")[-1]
|
|
|
|
await self.session.get(f"{BING_URL}{redirect_url}")
|
|
|
|
# https://www.bing.com/images/create/async/results/{ID}?q={PROMPT}
|
|
|
|
polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
|
|
|
|
# Poll for results
|
|
|
|
if not self.quiet:
|
|
|
|
print("Waiting for results...")
|
|
|
|
while True:
|
|
|
|
if not self.quiet:
|
|
|
|
print(".", end="", flush=True)
|
|
|
|
# By default, timeout is 300s, change as needed
|
|
|
|
response = await self.session.get(polling_url)
|
|
|
|
if response.status != 200:
|
|
|
|
raise Exception("Could not get results")
|
|
|
|
content = await response.text()
|
|
|
|
if content and content.find("errorMessage") == -1:
|
|
|
|
break
|
|
|
|
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
continue
|
|
|
|
# Use regex to search for src=""
|
|
|
|
image_links = regex.findall(r'src="([^"]+)"', content)
|
|
|
|
# Remove size limit
|
|
|
|
normal_image_links = [link.split("?w=")[0] for link in image_links]
|
|
|
|
# Remove duplicates
|
|
|
|
normal_image_links = list(set(normal_image_links))
|
|
|
|
|
|
|
|
# Bad images
|
|
|
|
bad_images = [
|
|
|
|
"https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
|
|
|
|
"https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
|
|
|
|
]
|
|
|
|
for im in normal_image_links:
|
|
|
|
if im in bad_images:
|
|
|
|
raise Exception("Bad images")
|
|
|
|
# No images
|
|
|
|
if not normal_image_links:
|
|
|
|
raise Exception("No images")
|
|
|
|
return normal_image_links
|
|
|
|
|
|
|
|
async def save_images(self, links: list, output_dir: str) -> str:
|
|
|
|
"""
|
|
|
|
Saves images to output directory
|
|
|
|
"""
|
|
|
|
if not self.quiet:
|
|
|
|
print("\nDownloading images...")
|
|
|
|
with contextlib.suppress(FileExistsError):
|
2023-03-22 14:28:22 +00:00
|
|
|
os.mkdir(output_dir)
|
2023-04-10 17:04:06 +00:00
|
|
|
|
2023-03-22 14:28:22 +00:00
|
|
|
# image name
|
|
|
|
image_name = str(uuid4())
|
|
|
|
# since matrix only support one media attachment per message, we just need one link
|
|
|
|
if links:
|
|
|
|
link = links.pop()
|
2023-04-10 17:04:06 +00:00
|
|
|
|
|
|
|
image_path = os.path.join(output_dir, f"{image_name}.jpeg")
|
|
|
|
try:
|
|
|
|
async with self.session.get(link, raise_for_status=True) as response:
|
|
|
|
# save response to file
|
|
|
|
with open(image_path, "wb") as output_file:
|
|
|
|
async for chunk in response.content.iter_chunked(8192):
|
|
|
|
output_file.write(chunk)
|
|
|
|
return f"{output_dir}/{image_name}.jpeg"
|
|
|
|
|
|
|
|
except aiohttp.client_exceptions.InvalidURL as url_exception:
|
|
|
|
raise Exception(
|
|
|
|
"Inappropriate contents found in the generated images. Please try again or try another prompt.",
|
|
|
|
) from url_exception
|