minecraft-monitor/backend/main.py
2024-03-02 10:56:03 +08:00

109 lines
No EOL
2.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from utils import getJavaServerStatus, schedule_job, get_last_hour_date_info, get_current_day_time_utc8, parse_date_to_h_m_s
from contextlib import asynccontextmanager
from db import MCDB
from pydantic import BaseModel
import aiohttp
from pathlib import Path
class Password(BaseModel):
code: str
mcdb = MCDB()
with open(Path(__file__).parent.parent / "config.json", encoding="utf-8") as f:
config = json.load(f)
HOST = config["mc_host"]
PASSWORD = config["password"]
RCON_SERVER = config["rcon_host"]
scheduler = AsyncIOScheduler()
# 每30s记录一次
scheduler.add_job(schedule_job, 'interval', [HOST, mcdb], seconds=30)
origins = [
"*"
]
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
yield
mcdb.cursor.close()
mcdb.conn.close()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def read_root():
try:
status = await getJavaServerStatus(HOST)
except Exception:
return {
"time": get_current_day_time_utc8(),
"description": "请求错误",
"latency": 10000,
"max": "请求错误",
"online": "请求错误"
}
return {
"time": get_current_day_time_utc8(),
"description": status.description,
"latency": round(status.latency, 2),
"max": status.players.max,
"online": status.players.online
}
@app.get("/get_latency")
async def get_latency_info():
# 获取一个小时前的时间
one_hour_ago = get_last_hour_date_info()
mcdb.cursor.execute("SELECT time, latency FROM mcstatus WHERE time >= ?", (one_hour_ago, ))
rows = mcdb.cursor.fetchall()
data = [{"time": parse_date_to_h_m_s(row[0]), "latency": row[1]} for row in rows]
return data
@app.post("/restart-server")
async def restart_mc_server(password: Password):
if PASSWORD != password.code:
return {
"status": "fail",
"msg": "密码错误"
}
try:
async with aiohttp.ClientSession() as asession:
async with asession.post(RCON_SERVER,
json={"code": password.code}) as resp1:
result1 = await resp1.json()
return {
"status": "成功",
"msg": "重启指令已发送请耐心等待1分钟或者查看图表观察服务器是否正常上线"
}
except Exception:
return {
"status": "失败",
"msg": "重启指令发送失败,请重试或者联系管理员"
}
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=10000)