minecraft-monitor/backend/main.py

109 lines
2.9 KiB
Python
Raw Normal View History

2024-03-02 10:54:27 +08:00
import json
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from utils import getJavaServerStatus, schedule_job, get_last_hour_date_info, get_current_day_time_utc8, parse_date_to_h_m_s
from contextlib import asynccontextmanager
from db import MCDB
from pydantic import BaseModel
import aiohttp
from pathlib import Path
class Password(BaseModel):
code: str
mcdb = MCDB()
with open(Path(__file__).parent.parent / "config.json", encoding="utf-8") as f:
config = json.load(f)
HOST = config["mc_host"]
PASSWORD = config["password"]
RCON_SERVER = config["rcon_host"]
scheduler = AsyncIOScheduler()
# 每30s记录一次
scheduler.add_job(schedule_job, 'interval', [HOST, mcdb], seconds=30)
origins = [
"*"
]
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
yield
mcdb.cursor.close()
mcdb.conn.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def read_root():
try:
status = await getJavaServerStatus(HOST)
except Exception:
return {
"time": get_current_day_time_utc8(),
"description": "请求错误",
"latency": 10000,
"max": "请求错误",
"online": "请求错误"
}
return {
"time": get_current_day_time_utc8(),
"description": status.description,
"latency": round(status.latency, 2),
"max": status.players.max,
"online": status.players.online
}
@app.get("/get_latency")
async def get_latency_info():
# 获取一个小时前的时间
one_hour_ago = get_last_hour_date_info()
mcdb.cursor.execute("SELECT time, latency FROM mcstatus WHERE time >= ?", (one_hour_ago, ))
rows = mcdb.cursor.fetchall()
data = [{"time": parse_date_to_h_m_s(row[0]), "latency": row[1]} for row in rows]
return data
@app.post("/restart-server")
async def restart_mc_server(password: Password):
if PASSWORD != password.code:
return {
"status": "fail",
"msg": "密码错误"
}
try:
async with aiohttp.ClientSession() as asession:
async with asession.post(RCON_SERVER,
json={"code": password.code}) as resp1:
result1 = await resp1.json()
return {
"status": "成功",
"msg": "重启指令已发送请耐心等待1分钟或者查看图表观察服务器是否正常上线"
}
except Exception:
return {
"status": "失败",
"msg": "重启指令发送失败,请重试或者联系管理员"
}
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=10000)