opus-submitter/polylan_submitter/noita/api.py

247 lines
7.6 KiB
Python

from django.http import HttpRequest
from django.core.files.base import ContentFile
from django.db.models import (
F,
Case,
When,
Sum,
Count,
IntegerField,
Subquery,
OuterRef,
Window,
)
from django.db.models.functions import Rank
from ninja import Router, File
from ninja.files import UploadedFile
from noita.schemas import ObjectivOut, ResultsOut, LeaderboardOut
from noita.services.objectives import parse_objectives_and_store
from .models import LogfileSubmission, Objectiv, ObjectivPoint
from .schemas import NoitaSubmissionOut
router = Router()
@router.get("objectives", response=list[ObjectivOut])
def get_my_objectives(request: HttpRequest):
return Objectiv.objects.order_by("-count").filter(user=request.user)
@router.get("results", response=ResultsOut)
def get_results(request: HttpRequest):
"""
Get the user's score based on their objectives.
Calculates points as: ObjectivPoint.point * min(max_count, count) for each objective
Uses Django ORM annotate for efficient queryset computation.
"""
# Fetch points from ObjectivPoint using Subquery
user_objectives = Objectiv.objects.filter(user=request.user).annotate(
# Get points per objective from ObjectivPoint
points_per_objectiv=Subquery(
ObjectivPoint.objects.filter(objectiv_id=OuterRef("objectiv_id")).values(
"point"
)[:1],
output_field=IntegerField(),
),
# Get max_count from ObjectivPoint
max_objectives=Subquery(
ObjectivPoint.objects.filter(objectiv_id=OuterRef("objectiv_id")).values(
"max_count"
)[:1],
output_field=IntegerField(),
),
)
# Handle negative max_count (means unlimited)
user_objectives = user_objectives.annotate(
effective_max=Case(
When(max_objectives__lt=0, then=F("count")),
default=F("max_objectives"),
output_field=IntegerField(),
)
)
# Calculate capped count and total points
user_objectives = user_objectives.annotate(
capped_count=Case(
When(effective_max__lt=F("count"), then=F("effective_max")),
default=F("count"),
output_field=IntegerField(),
),
total_points=F("points_per_objectiv") * F("capped_count"),
)
# Get total score
total_score_result = user_objectives.aggregate(Sum("total_points"))[
"total_points__sum"
]
total_score = total_score_result or 0
# Build response with all objectives
objectives_with_points = [
{
"objectiv_id": obj.objectiv_id,
"count": obj.count,
"points_per_objectiv": obj.points_per_objectiv or 0,
"total_points": obj.total_points or 0,
}
for obj in user_objectives.order_by("-total_points")
]
return {
"total_score": total_score,
"objectives": objectives_with_points,
}
@router.get("leaderboard", response=LeaderboardOut)
def get_leaderboard(request: HttpRequest):
"""
Get the global leaderboard for all users ranked by total score.
Uses Window functions to rank users by their total score in descending order.
"""
from django.contrib.auth import get_user_model
User = get_user_model()
# Get all objectives with calculated points
all_objectives = (
Objectiv.objects.annotate(
# Fetch points from ObjectivPoint using Subquery
points_per_objectiv=Subquery(
ObjectivPoint.objects.filter(
objectiv_id=OuterRef("objectiv_id")
).values("point")[:1],
output_field=IntegerField(),
),
# Get max_count from ObjectivPoint
max_objectives=Subquery(
ObjectivPoint.objects.filter(
objectiv_id=OuterRef("objectiv_id")
).values("max_count")[:1],
output_field=IntegerField(),
),
)
.annotate(
# Handle negative max_count (means unlimited)
effective_max=Case(
When(max_objectives__lt=0, then=F("count")),
default=F("max_objectives"),
output_field=IntegerField(),
)
)
.annotate(
# Calculate capped count and total points
capped_count=Case(
When(effective_max__lt=F("count"), then=F("effective_max")),
default=F("count"),
output_field=IntegerField(),
),
total_points=F("points_per_objectiv") * F("capped_count"),
)
)
# Get user totals using subquery
user_totals = (
all_objectives.values("user")
.annotate(total_score=Sum("total_points"))
.values("user", "total_score")
)
# Get unique users and their scores, then apply ranking
leaderboard = (
User.objects.filter(objectiv__isnull=False)
.distinct()
.annotate(
total_score=Subquery(
user_totals.filter(user=OuterRef("id")).values("total_score")[:1],
output_field=IntegerField(),
)
)
.annotate(objectives_count=Count("objectiv", distinct=True))
.annotate(
rank=Window(
expression=Rank(),
order_by=F("total_score").desc(),
)
)
.values("rank", "username", "total_score", "objectives_count")
.order_by("rank")
)
return {
"leaderboard": [
{
"rank": entry["rank"],
"username": entry["username"],
"total_score": entry["total_score"] or 0,
"objectives_count": entry["objectives_count"],
}
for entry in leaderboard
]
}
@router.post("submit", response={200: NoitaSubmissionOut, 400: dict})
def submit_log_file(request: HttpRequest, file: UploadedFile = File(...)):
"""
Submit a Noita run file (log file, screenshot, or video).
Accepts:
- Text files (.txt) for polylan_mod_log.txt
- Images (.png, .jpg, .gif)
- Videos (.mp4, .webm)
Max file size: 256 MB
"""
# Validate file type
allowed_types = [
"text/plain",
"text/x-log",
]
if file.content_type not in allowed_types:
return 400, {
"detail": f"Invalid file type: {file.content_type}. Allowed types: {', '.join(allowed_types)}"
}
# Validate file size (256MB limit)
if file.size > 256 * 1024 * 1024:
return 400, {"detail": "File too large (max 256MB)"}
try:
# Create submission
submission = LogfileSubmission.objects.create(
user=request.user if request.user.is_authenticated else None,
content_type=file.content_type,
file_size=file.size,
)
# Save the file
submission.file.save(file.name, ContentFile(file.read()), save=True)
try:
parse_objectives_and_store(submission)
submission.processed = True
submission.save(update_fields=["processed"])
except Exception:
pass
return {
"id": str(submission.id),
"user_id": submission.user_id,
"username": submission.user.username if submission.user else None,
"file_size": submission.file_size,
"content_type": submission.content_type,
"created_at": submission.created_at,
"processed": submission.processed,
}
except Exception as e:
return 500, {"detail": f"Error creating submission: {str(e)}"}