from starlette.responses import HTMLResponse
from starlette.requests import Request
from starlette.routing import Route
import os
import json
import time
from sqlalchemy import select, update
from config import AUTHORIZATION_NEEDED, USE_REDIS_CACHE
from api.database import database, cache_database, ranking_cache, check_whitelist, check_blacklist, redis, result, daily_reward, user
from api.crypt import decrypt_fields, encryptAES
from api.templates import EXP_UNLOCKED_SONGS, TITLE_LISTS, SONG_LIST
from api.misc import inform_page, safe_int
async def ranking(request: Request):
decrypted_fields, _ = await decrypt_fields(request)
if not decrypted_fields:
return HTMLResponse("""
"""
# Loop leaderboard
for rank, (username, data) in enumerate(sorted_players, start=1):
html += f"""
#{rank}
{data['score']}
"""
else:
if cached and USE_REDIS_CACHE:
play_results = json.loads(cached)
elif cached:
play_results = cached
else:
query = select(result).where((result.c.id == song_id) & (result.c.mode == mode)).order_by(result.c.score.desc())
play_results = await database.fetch_all(query)
play_results = [dict(row) for row in play_results]
if USE_REDIS_CACHE:
await redis.set(cache_key, json.dumps(play_results), ex=300)
# log to cache db
query = ranking_cache.insert().values(
key=cache_key,
value=play_results,
expire_at=None # individual LB, no expiration needed
)
await cache_database.execute(query)
query = select(user).where(user.c.device_id == device_id)
user_result = await database.fetch_one(query)
query = select(daily_reward).where(daily_reward.c.device_id == device_id)
device_result = await database.fetch_one(query)
device_result = dict(device_result) if device_result else {"title": "1", "avatar": 1}
user_id = user_result[0] if user_result else None
username = user_result[1] if user_result else f"Guest({device_id[-6:]})"
play_record = None
if user_id:
play_record = next(
(record for record in play_results if safe_int(record[3]) == user_id),
None
)
if not play_record:
play_record = next((record for record in play_results if record['vid'] == device_id and record['sid'] is None), None)
player_rank = None
avatar_index = str(play_record['avatar']) if play_record else "1"
user_score = play_record['score'] if play_record else 0
for rank, result_obj in enumerate(play_results, start=1):
if user_result and safe_int(result_obj['sid']) == user_id:
player_rank = rank
break
elif result_obj['vid'] == device_id and result_obj['sid'] in (None, ''):
player_rank = rank
break
html += f"""
You
{"#" + str(player_rank) if player_rank else "N/A"}
{username}
{user_score}
"""
html += """
"""
for rank, record in enumerate(play_results, start=1):
username = f"Guest({record['vid'][-6:]})"
device_info = None
if record['sid']:
query = select(user.c.username).where(user.c.id == record['sid'])
user_data = await database.fetch_one(query)
if user_data:
username = user_data["username"]
query = select(daily_reward.c.title).where(daily_reward.c.device_id == record['vid'])
device_title = await database.fetch_one(query)
if device_title:
device_info = device_title["title"]
else:
device_info = "1"
avatar_id = record['avatar'] if record['avatar'] else 1
avatar_url = f"/files/image/icon/avatar/{avatar_id}.png"
score = record['score']
html += f"""
#{rank}
{score}
"""
html += "
"
encrypted_mass = encryptAES(("vid=" + device_id + "&dummy=").encode("utf-8"))
html += f"""
Go Back
"""
file_path = os.path.join("files", "ranking.html")
try:
with open(file_path, "r", encoding="utf-8") as file:
html_content = file.read().format(text=html)
except FileNotFoundError:
return HTMLResponse("""
Ranking file not found
""", status_code=500)
return HTMLResponse(html_content)
else:
return HTMLResponse("""
Access denied
""", status_code=403)
async def status(request: Request):
decrypted_fields, _ = await decrypt_fields(request)
if not decrypted_fields:
return HTMLResponse("""
Invalid request data
""", status_code=400)
should_serve = True
if AUTHORIZATION_NEEDED:
should_serve = await check_whitelist(decrypted_fields) and not await check_blacklist(decrypted_fields)
if should_serve:
device_id = decrypted_fields[b'vid'][0].decode()
set_title = int(decrypted_fields[b'set_title'][0].decode()) if b'set_title' in decrypted_fields else None
page_id = int(decrypted_fields[b'page_id'][0].decode()) if b'page_id' in decrypted_fields else 0
if set_title:
update_query = (
update(daily_reward)
.where(daily_reward.c.device_id == device_id)
.values(title=set_title)
)
await database.execute(update_query)
query = select(daily_reward).where(daily_reward.c.device_id == device_id)
user_data = await database.fetch_one(query)
user_name = f"Guest({device_id[-6:]})"
if user_data:
query = select(user.c.username).where(user.c.device_id == device_id)
user_result = await database.fetch_one(query)
if user_result:
user_name = user_result["username"]
html = ""
if user_data:
player_element = f"""
{user_name}
Level {user_data['lvl']}
"""
html += player_element
page_name = ["Special", "Normal", "Master", "God"]
buttons_html = '
'
html += f"
{buttons_html}
"
selected_titles = TITLE_LISTS.get(page_id, [])
titles_html = '
'
for index, num in enumerate(selected_titles):
if index % 2 == 0:
if index != 0:
titles_html += '
'
titles_html += '
'
if num == user_data["title"]:
titles_html += f"""

"""
else:
encrypted_mass = encryptAES(f"vid={device_id}&title_id={num}&page_id={page_id}&dummy=".encode("utf-8"))
titles_html += f"""
"""
titles_html += '
'
html += titles_html
file_path = os.path.join("files", "status.html")
try:
with open(file_path, "r", encoding="utf-8") as file:
html_content = file.read().format(text=html)
except FileNotFoundError:
return HTMLResponse("""