from django.http import HttpRequest from django.core.files.base import ContentFile from django.core.cache import cache from django.db.models import ( F, Case, When, Count, IntegerField, Subquery, OuterRef, ) from ninja import Router, File from ninja.files import UploadedFile from noita.schemas import ResultsOut, LeaderboardOut from noita.services.objectives import parse_objectives_and_store from .models import LogfileSubmission, Objectiv, ObjectivPoint, DeathCounter from .schemas import NoitaSubmissionOut router = Router() @router.get("results", response=ResultsOut) def get_results(request: HttpRequest): cache_key = f"api:noita:results:{request.user.id}" cached_data = cache.get(cache_key) if cached_data is not None: return cached_data """ Get the user's score based on their objectives. Calculates points as: ObjectivPoint.point * min(max_count, count) for each objective Uses Django ORM annotate for efficient queryset computation. """ # Group objectives by objectiv_id and count occurrences user_objectives = ( Objectiv.objects.filter(user=request.user) .values("objectiv_id") .annotate(count=Count("id")) ) # Fetch points from ObjectivPoint using Subquery user_objectives = user_objectives.annotate( # Get points per objective from ObjectivPoint points_per_objectiv=Subquery( ObjectivPoint.objects.filter(objectiv_id=OuterRef("objectiv_id")).values( "point" )[:1], output_field=IntegerField(), ), # Get max_count from ObjectivPoint max_objectives=Subquery( ObjectivPoint.objects.filter(objectiv_id=OuterRef("objectiv_id")).values( "max_count" )[:1], output_field=IntegerField(), ), ) # Handle negative max_count (means unlimited) user_objectives = user_objectives.annotate( effective_max=Case( When(max_objectives__lt=0, then=F("count")), default=F("max_objectives"), output_field=IntegerField(), ) ) # Calculate capped count and total points user_objectives = user_objectives.annotate( capped_count=Case( When(effective_max__lt=F("count"), then=F("effective_max")), default=F("count"), output_field=IntegerField(), ), total_points=F("points_per_objectiv") * F("capped_count"), ) # Annotate seed + first-seen-at user_objectives = user_objectives.annotate( seed=F("seed"), first_seen_at=F("first_seen_at"), ) # Build response with all objectives and compute total score total_score = 0 with_points = {} for obj in ObjectivPoint.objects.all(): with_points[obj.objectiv_id] = { "objectiv_id": obj.objectiv_id, "display_string": obj.display_string, "count": 0, "max_count": obj.max_count, "points_per_objectiv": obj.point, "total_points": 0, "first_seen_at": None, "seed": None, } for obj in user_objectives.order_by("-total_points"): points = obj["total_points"] or 0 with_points[obj["objectiv_id"]].update( { "count": obj["count"], "total_points": points, "first_seen_at": obj["first_seen_at"], "seed": obj["seed"], } ) total_score += points # Count deaths for the user deaths_count = DeathCounter.objects.filter(user=request.user).count() data = { "total_score": total_score, "deaths_count": deaths_count, "objectives": list(with_points.values()), } cache.set(f"api:noita:results:{request.user.id}", data, 300) return data @router.get("leaderboard", response=LeaderboardOut) def get_leaderboard(request: HttpRequest): """ Get the global leaderboard for all users ranked by total score. Uses Window functions to rank users by their total score in descending order. """ cache_key = "api:noita:leaderboard" cached_data = cache.get(cache_key) if cached_data is not None: return cached_data from django.contrib.auth import get_user_model User = get_user_model() # Get all objectives with calculated points (grouped by objectiv_id and user) all_objectives = ( Objectiv.objects.values("user", "objectiv_id") .annotate(count=Count("id")) .annotate( # Fetch points from ObjectivPoint using Subquery points_per_objectiv=Subquery( ObjectivPoint.objects.filter( objectiv_id=OuterRef("objectiv_id") ).values("point")[:1], output_field=IntegerField(), ), # Get max_count from ObjectivPoint max_objectives=Subquery( ObjectivPoint.objects.filter( objectiv_id=OuterRef("objectiv_id") ).values("max_count")[:1], output_field=IntegerField(), ), ) .annotate( # Handle negative max_count (means unlimited) effective_max=Case( When(max_objectives__lt=0, then=F("count")), default=F("max_objectives"), output_field=IntegerField(), ) ) .annotate( # Calculate capped count and total points capped_count=Case( When(effective_max__lt=F("count"), then=F("effective_max")), default=F("count"), output_field=IntegerField(), ), total_points=F("points_per_objectiv") * F("capped_count"), ) ) # Build user totals by iterating through objectives user_totals_dict = {} for obj in all_objectives: user_id = obj["user"] points = obj["total_points"] or 0 if user_id not in user_totals_dict: user_totals_dict[user_id] = 0 user_totals_dict[user_id] += points # Get unique users and their scores, then apply ranking users_with_scores = [] for user_id, total_score in user_totals_dict.items(): user = User.objects.get(id=user_id) objectives_count = ( Objectiv.objects.filter(user_id=user_id) .values("objectiv_id") .distinct() .count() ) deaths_count = DeathCounter.objects.filter(user_id=user_id).count() users_with_scores.append( { "user_id": user_id, "user": user, "total_score": total_score, "objectives_count": objectives_count, "deaths_count": deaths_count, } ) # Sort by score and add rank users_with_scores.sort(key=lambda x: x["total_score"], reverse=True) leaderboard = [ { "rank": idx + 1, "username": entry["user"].username, "is_staff": entry["user"].is_staff, "total_score": entry["total_score"], "objectives_count": entry["objectives_count"], "deaths_count": entry["deaths_count"], } for idx, entry in enumerate(users_with_scores) ] data = {"leaderboard": leaderboard} cache.set("api:noita:leaderboard", data, 300) return data @router.post("submit", response={200: NoitaSubmissionOut, 400: dict}) def submit_log_file(request: HttpRequest, file: UploadedFile = File(...)): """ Submit a Noita run file (log file, screenshot, or video). Accepts: - Text files (.txt) for polylan_mod_log.txt - Images (.png, .jpg, .gif) - Videos (.mp4, .webm) Max file size: 256 MB """ # Validate file type allowed_types = [ "text/plain", "text/x-log", ] if file.content_type not in allowed_types: return 400, { "detail": f"Invalid file type: {file.content_type}. Allowed types: {', '.join(allowed_types)}" } # Validate file size (256MB limit) if file.size > 256 * 1024 * 1024: return 400, {"detail": "File too large (max 256MB)"} try: # Create submission submission = LogfileSubmission.objects.create( user=request.user if request.user.is_authenticated else None, content_type=file.content_type, file_size=file.size, ) # Save the file submission.file.save(file.name, ContentFile(file.read()), save=True) try: parse_objectives_and_store(submission) submission.processed = True submission.save(update_fields=["processed"]) except Exception: pass # Invalidate caches on successful submission if submission.user: cache.delete(f"api:noita:results:{submission.user.id}") cache.delete("api:noita:leaderboard") return { "id": str(submission.id), "user_id": submission.user_id, "username": submission.user.username if submission.user else None, "file_size": submission.file_size, "content_type": submission.content_type, "created_at": submission.created_at, "processed": submission.processed, } except Exception as e: return 500, {"detail": f"Error creating submission: {str(e)}"}