collections

This commit is contained in:
Loïc Gremaud 2025-10-29 01:26:48 +01:00
parent 91a9ebec20
commit 2a0f585c4f
17 changed files with 983 additions and 142 deletions

121
README.md
View File

@ -1,121 +0,0 @@
# Opus Magnum Submitter with CAS Authentication
A simple Django application that demonstrates CAS (Central Authentication Service) integration with PolyLAN's CAS server at https://polylan.ch/cas/.
## Features
- 🔐 CAS authentication with PolyLAN
- 👤 Automatic user creation with custom attributes
- 🏷️ CAS groups and permissions storage
- 🏠 Protected home page requiring authentication
- 🌍 Public page accessible without authentication
- 🚪 Clean login/logout functionality
- 📱 Responsive web interface
- 🛠️ Admin interface for user management
## Quick Start
1. **Install dependencies:**
```bash
pip install -e .
```
2. **Run database migrations:**
```bash
cd opus_submitter
uv run manage.py migrate
```
3. **Create a superuser (optional, for admin access):**
```bash
uv run manage.py createsuperuser
```
4. **Start the development server:**
```bash
uv run manage.py runserver localhost:7777
```
5. **Access the application:**
- Open your browser to http://localhost:7777/
- Try the public page: http://localhost:7777/public/
- Login with CAS to access protected features
- Admin interface: http://localhost:7777/admin/ (requires superuser)
## How It Works
### Authentication Flow
1. User visits a protected page (e.g., home page)
2. Django redirects to `/cas/login/`
3. CAS redirects to PolyLAN CAS server: `https://polylan.ch/cas/login`
4. User enters credentials on PolyLAN
5. CAS validates credentials and redirects back with a service ticket
6. Django validates the ticket with the CAS server
7. User is authenticated and redirected to the requested page
### Configuration
The CAS configuration is in `opus_submitter/settings.py`:
```python
# CAS Authentication Settings
CAS_SERVER_URL = 'https://polylan.ch/cas/'
CAS_VERSION = '3'
CAS_CREATE_USER = True
CAS_LOGOUT_COMPLETELY = True
```
### URLs
- `/` - Protected home page (requires authentication)
- `/public/` - Public page (no authentication required)
- `/cas/login/` - CAS login endpoint
- `/cas/logout/` - CAS logout endpoint
- `/admin/` - Django admin (requires staff privileges)
## Project Structure
```
opus_submitter/
├── manage.py
├── opus_submitter/
│ ├── __init__.py
│ ├── settings.py # Django settings with CAS configuration
│ ├── urls.py # URL routing with CAS endpoints
│ ├── wsgi.py
│ └── asgi.py
└── templates/
├── base.html # Base template with navigation
├── home.html # Protected home page
└── public.html # Public page
```
## Dependencies
- Django 5.2.7+
- django-cas-ng 5.0.1+ (CAS client for Django)
- requests 2.31.0+ (HTTP library for CAS communication)
## Development
To modify the CAS configuration:
1. Edit `CAS_SERVER_URL` in `settings.py` if using a different CAS server
2. Adjust `CAS_VERSION` if needed (supports CAS 1.0, 2.0, and 3.0)
3. Set `CAS_CREATE_USER = False` if you don't want automatic user creation
## Testing
1. Visit http://127.0.0.1:8000/public/ (should work without login)
2. Visit http://127.0.0.1:8000/ (should redirect to CAS login)
3. Login with your PolyLAN credentials
4. Verify you're redirected back and can see user information
5. Test logout functionality
## Notes
- This is a development setup with `DEBUG = True`
- For production, update `SECRET_KEY`, set `DEBUG = False`, and configure `ALLOWED_HOSTS`
- The application automatically creates Django users from CAS authentication
- User information is populated from CAS attributes when available

View File

@ -40,6 +40,7 @@ INSTALLED_APPS = [
"django.contrib.staticfiles", "django.contrib.staticfiles",
"django_vite", "django_vite",
"accounts", "accounts",
"submissions",
] ]
MIDDLEWARE = [ MIDDLEWARE = [
@ -131,6 +132,9 @@ AUTH_USER_MODEL = "accounts.CustomUser"
# Simple CAS Configuration # Simple CAS Configuration
CAS_SERVER_URL = "https://polylan.ch/cas/" CAS_SERVER_URL = "https://polylan.ch/cas/"
# Steam API Configuration
STEAM_API_KEY = os.environ.get('STEAM_API_KEY', None) # Set via environment variable
# Authentication backends # Authentication backends
AUTHENTICATION_BACKENDS = [ AUTHENTICATION_BACKENDS = [
"django.contrib.auth.backends.ModelBackend", "django.contrib.auth.backends.ModelBackend",

View File

View File

@ -0,0 +1,143 @@
from django.contrib import admin
from django.utils.html import format_html
from .models import SteamAPIKey, SteamCollection, SteamCollectionItem
@admin.register(SteamAPIKey)
class SteamAPIKeyAdmin(admin.ModelAdmin):
list_display = ["name", "masked_api_key", "is_active", "last_used", "created_at"]
list_filter = ["is_active", "created_at", "last_used"]
search_fields = ["name", "description"]
readonly_fields = ["created_at", "updated_at", "last_used", "masked_api_key"]
fieldsets = (
("Basic Information", {"fields": ("name", "description", "is_active")}),
(
"API Key",
{
"fields": ("api_key", "masked_api_key"),
"description": "Get your Steam API key from https://steamcommunity.com/dev/apikey",
},
),
(
"Metadata",
{
"fields": ("created_at", "updated_at", "last_used"),
"classes": ("collapse",),
},
),
)
def masked_api_key(self, obj):
"""Display masked API key in admin"""
if obj.api_key:
return format_html(
'<code style="background: #f8f9fa; padding: 2px 4px; border-radius: 3px;">{}</code>',
obj.masked_key,
)
return "No key set"
masked_api_key.short_description = "API Key (Masked)"
def get_queryset(self, request):
"""Only superusers can see API keys"""
qs = super().get_queryset(request)
if not request.user.is_superuser:
return qs.none()
return qs
def has_view_permission(self, request, obj=None):
"""Only superusers can view API keys"""
return request.user.is_superuser
def has_add_permission(self, request):
"""Only superusers can add API keys"""
return request.user.is_superuser
def has_change_permission(self, request, obj=None):
"""Only superusers can change API keys"""
return request.user.is_superuser
def has_delete_permission(self, request, obj=None):
"""Only superusers can delete API keys"""
return request.user.is_superuser
@admin.register(SteamCollection)
class SteamCollectionAdmin(admin.ModelAdmin):
list_display = [
"title",
"steam_id",
"author_name",
"total_items",
"current_favorites",
"last_fetched",
"is_active",
]
list_filter = ["is_active", "last_fetched", "created_at"]
search_fields = ["title", "steam_id", "author_name", "description"]
readonly_fields = ["steam_id", "created_at", "updated_at", "last_fetched"]
fieldsets = (
(
"Basic Information",
{"fields": ("steam_id", "url", "title", "description", "is_active")},
),
("Author Information", {"fields": ("author_name", "author_steam_id")}),
(
"Statistics",
{
"fields": (
"total_items",
"unique_visitors",
"current_favorites",
"total_favorites",
)
},
),
(
"Timestamps",
{
"fields": (
"steam_created_date",
"steam_updated_date",
"created_at",
"updated_at",
"last_fetched",
)
},
),
("Status", {"fields": ("fetch_error",)}),
)
@admin.register(SteamCollectionItem)
class SteamCollectionItemAdmin(admin.ModelAdmin):
list_display = [
"title",
"steam_item_id",
"collection",
"author_name",
"order_index",
]
list_filter = ["collection", "created_at"]
search_fields = ["title", "steam_item_id", "author_name", "description"]
readonly_fields = ["created_at", "updated_at"]
fieldsets = (
(
"Basic Information",
{
"fields": (
"collection",
"steam_item_id",
"title",
"description",
"order_index",
)
},
),
("Author Information", {"fields": ("author_name", "author_steam_id")}),
("Metadata", {"fields": ("tags",)}),
("Timestamps", {"fields": ("created_at", "updated_at")}),
)

View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class SubmissionsConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'submissions'

View File

@ -0,0 +1,92 @@
"""
Django management command to fetch Steam Workshop collections
"""
from django.core.management.base import BaseCommand, CommandError
from submissions.utils import create_or_update_collection
from submissions.models import SteamCollection
class Command(BaseCommand):
help = 'Fetch Steam Workshop collection data and save to database'
def add_arguments(self, parser):
parser.add_argument(
'url',
type=str,
help='Steam Workshop collection URL'
)
parser.add_argument(
'--api-key',
type=str,
help='Steam API key (optional, can also be set via STEAM_API_KEY environment variable)'
)
parser.add_argument(
'--force',
action='store_true',
help='Force refetch even if collection already exists'
)
def handle(self, *args, **options):
url = options['url']
api_key = options.get('api_key')
force = options['force']
self.stdout.write(f"Fetching Steam collection from: {url}")
try:
# Check if collection already exists
from submissions.utils import SteamCollectionFetcher
fetcher = SteamCollectionFetcher(api_key)
collection_id = fetcher.extract_collection_id(url)
if collection_id and not force:
existing = SteamCollection.objects.filter(steam_id=collection_id).first()
if existing:
self.stdout.write(
self.style.WARNING(
f"Collection {collection_id} already exists (ID: {existing.id}). "
"Use --force to refetch."
)
)
return
# Fetch and create/update collection
collection, created = create_or_update_collection(url)
if created:
self.stdout.write(
self.style.SUCCESS(
f"Successfully created collection: {collection.title} (ID: {collection.id})"
)
)
else:
self.stdout.write(
self.style.SUCCESS(
f"Successfully updated collection: {collection.title} (ID: {collection.id})"
)
)
# Display collection info
self.stdout.write("\nCollection Details:")
self.stdout.write(f" Steam ID: {collection.steam_id}")
self.stdout.write(f" Title: {collection.title}")
self.stdout.write(f" Author: {collection.author_name or 'Unknown'}")
self.stdout.write(f" Description: {collection.description[:100]}{'...' if len(collection.description) > 100 else ''}")
self.stdout.write(f" Total Items: {collection.total_items}")
self.stdout.write(f" Unique Visitors: {collection.unique_visitors}")
self.stdout.write(f" Current Favorites: {collection.current_favorites}")
self.stdout.write(f" Total Favorites: {collection.total_favorites}")
if collection.items.exists():
self.stdout.write(f"\nCollection Items ({collection.items.count()}):")
for item in collection.items.all()[:10]: # Show first 10 items
self.stdout.write(f" - {item.title} (Steam ID: {item.steam_item_id})")
if collection.items.count() > 10:
self.stdout.write(f" ... and {collection.items.count() - 10} more items")
else:
self.stdout.write("\nNo items found in collection.")
except Exception as e:
raise CommandError(f"Failed to fetch collection: {e}")

View File

@ -0,0 +1,72 @@
# Generated by Django 5.2.7 on 2025-10-29 00:11
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Collection',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('url', models.URLField()),
],
),
migrations.CreateModel(
name='SteamCollection',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('steam_id', models.CharField(help_text='Steam collection ID from URL', max_length=50, unique=True)),
('url', models.URLField(help_text='Full Steam Workshop collection URL')),
('title', models.CharField(blank=True, help_text='Collection title', max_length=255)),
('description', models.TextField(blank=True, help_text='Collection description')),
('author_name', models.CharField(blank=True, help_text='Steam username of collection creator', max_length=100)),
('author_steam_id', models.CharField(blank=True, help_text='Steam ID of collection creator', max_length=50)),
('total_items', models.PositiveIntegerField(default=0, help_text='Number of items in collection')),
('unique_visitors', models.PositiveIntegerField(default=0, help_text='Number of unique visitors')),
('current_favorites', models.PositiveIntegerField(default=0, help_text='Current number of favorites')),
('total_favorites', models.PositiveIntegerField(default=0, help_text='Total unique favorites')),
('steam_created_date', models.DateTimeField(blank=True, help_text='When collection was created on Steam', null=True)),
('steam_updated_date', models.DateTimeField(blank=True, help_text='When collection was last updated on Steam', null=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('last_fetched', models.DateTimeField(blank=True, help_text='When data was last fetched from Steam', null=True)),
('is_active', models.BooleanField(default=True, help_text='Whether this collection is actively tracked')),
('fetch_error', models.TextField(blank=True, help_text='Last error encountered when fetching data')),
],
options={
'verbose_name': 'Steam Collection',
'verbose_name_plural': 'Steam Collections',
'ordering': ['-created_at'],
},
),
migrations.CreateModel(
name='SteamCollectionItem',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('steam_item_id', models.CharField(help_text='Steam Workshop item ID', max_length=50)),
('title', models.CharField(blank=True, help_text='Item title', max_length=255)),
('author_name', models.CharField(blank=True, help_text='Steam username of item creator', max_length=100)),
('author_steam_id', models.CharField(blank=True, help_text='Steam ID of item creator', max_length=50)),
('description', models.TextField(blank=True, help_text='Item description')),
('tags', models.JSONField(blank=True, default=list, help_text='Item tags as JSON array')),
('order_index', models.PositiveIntegerField(default=0, help_text='Order of item in collection')),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('collection', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='items', to='submissions.steamcollection')),
],
options={
'verbose_name': 'Steam Collection Item',
'verbose_name_plural': 'Steam Collection Items',
'ordering': ['collection', 'order_index'],
'unique_together': {('collection', 'steam_item_id')},
},
),
]

View File

@ -0,0 +1,16 @@
# Generated by Django 5.2.7 on 2025-10-29 00:12
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('submissions', '0001_initial'),
]
operations = [
migrations.DeleteModel(
name='Collection',
),
]

View File

@ -0,0 +1,31 @@
# Generated by Django 5.2.7 on 2025-10-29 00:19
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('submissions', '0002_delete_collection'),
]
operations = [
migrations.CreateModel(
name='SteamAPIKey',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(help_text="Descriptive name for this API key (e.g., 'Production Key', 'Development Key')", max_length=100, unique=True)),
('api_key', models.CharField(help_text='Steam Web API key from https://steamcommunity.com/dev/apikey', max_length=64)),
('is_active', models.BooleanField(default=True, help_text='Whether this API key should be used')),
('description', models.TextField(blank=True, help_text='Optional description or notes about this API key')),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('last_used', models.DateTimeField(blank=True, help_text='When this API key was last used', null=True)),
],
options={
'verbose_name': 'Steam API Key',
'verbose_name_plural': 'Steam API Keys',
'ordering': ['-is_active', 'name'],
},
),
]

View File

@ -0,0 +1,198 @@
from django.db import models
from django.contrib.auth import get_user_model
from django.utils import timezone
from django.core.exceptions import ValidationError
User = get_user_model()
class SteamAPIKey(models.Model):
"""Model to store Steam API key configuration - Admin only"""
name = models.CharField(
max_length=100,
unique=True,
help_text="Descriptive name for this API key (e.g., 'Production Key', 'Development Key')"
)
api_key = models.CharField(
max_length=64,
help_text="Steam Web API key from https://steamcommunity.com/dev/apikey"
)
is_active = models.BooleanField(
default=True,
help_text="Whether this API key should be used"
)
description = models.TextField(
blank=True,
help_text="Optional description or notes about this API key"
)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
last_used = models.DateTimeField(
null=True,
blank=True,
help_text="When this API key was last used"
)
class Meta:
verbose_name = "Steam API Key"
verbose_name_plural = "Steam API Keys"
ordering = ['-is_active', 'name']
def __str__(self):
status = "Active" if self.is_active else "Inactive"
return f"{self.name} ({status})"
def clean(self):
"""Validate the API key format"""
if self.api_key:
# Steam API keys are typically 32 characters of hexadecimal
if len(self.api_key) != 32:
raise ValidationError("Steam API key should be 32 characters long")
# Check if it's hexadecimal
try:
int(self.api_key, 16)
except ValueError:
raise ValidationError("Steam API key should contain only hexadecimal characters (0-9, A-F)")
def save(self, *args, **kwargs):
self.full_clean()
super().save(*args, **kwargs)
@classmethod
def get_active_key(cls):
"""Get the currently active API key"""
return cls.objects.filter(is_active=True).first()
@property
def masked_key(self):
"""Return a masked version of the API key for display"""
if not self.api_key:
return ""
return f"{self.api_key[:8]}{'*' * 16}{self.api_key[-8:]}"
class SteamCollection(models.Model):
"""Model representing a Steam Workshop collection"""
# Basic collection info
steam_id = models.CharField(
max_length=50, unique=True, help_text="Steam collection ID from URL"
)
url = models.URLField(help_text="Full Steam Workshop collection URL")
title = models.CharField(max_length=255, blank=True, help_text="Collection title")
description = models.TextField(blank=True, help_text="Collection description")
# Author information
author_name = models.CharField(
max_length=100, blank=True, help_text="Steam username of collection creator"
)
author_steam_id = models.CharField(
max_length=50, blank=True, help_text="Steam ID of collection creator"
)
# Collection metadata
total_items = models.PositiveIntegerField(
default=0, help_text="Number of items in collection"
)
unique_visitors = models.PositiveIntegerField(
default=0, help_text="Number of unique visitors"
)
current_favorites = models.PositiveIntegerField(
default=0, help_text="Current number of favorites"
)
total_favorites = models.PositiveIntegerField(
default=0, help_text="Total unique favorites"
)
# Timestamps
steam_created_date = models.DateTimeField(
null=True, blank=True, help_text="When collection was created on Steam"
)
steam_updated_date = models.DateTimeField(
null=True, blank=True, help_text="When collection was last updated on Steam"
)
# Local tracking
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
last_fetched = models.DateTimeField(
null=True, blank=True, help_text="When data was last fetched from Steam"
)
# Status
is_active = models.BooleanField(
default=True, help_text="Whether this collection is actively tracked"
)
fetch_error = models.TextField(
blank=True, help_text="Last error encountered when fetching data"
)
class Meta:
ordering = ["-created_at"]
verbose_name = "Steam Collection"
verbose_name_plural = "Steam Collections"
def __str__(self):
return f"{self.title or f'Collection {self.steam_id}'}"
@property
def steam_url(self):
"""Generate the Steam Workshop URL from steam_id"""
return f"https://steamcommunity.com/workshop/filedetails/?id={self.steam_id}"
class SteamCollectionItem(models.Model):
"""Model representing individual items within a Steam collection"""
# Relationships
collection = models.ForeignKey(
SteamCollection, on_delete=models.CASCADE, related_name="items"
)
# Item identification
steam_item_id = models.CharField(max_length=50, help_text="Steam Workshop item ID")
title = models.CharField(max_length=255, blank=True, help_text="Item title")
# Author information
author_name = models.CharField(
max_length=100, blank=True, help_text="Steam username of item creator"
)
author_steam_id = models.CharField(
max_length=50, blank=True, help_text="Steam ID of item creator"
)
# Item metadata
description = models.TextField(blank=True, help_text="Item description")
tags = models.JSONField(
default=list, blank=True, help_text="Item tags as JSON array"
)
# Position in collection
order_index = models.PositiveIntegerField(
default=0, help_text="Order of item in collection"
)
# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ["collection", "order_index"]
unique_together = ["collection", "steam_item_id"]
verbose_name = "Steam Collection Item"
verbose_name_plural = "Steam Collection Items"
def __str__(self):
return f"{self.title or f'Item {self.steam_item_id}'} (in {self.collection})"
@property
def steam_url(self):
"""Generate the Steam Workshop URL for this item"""
return (
f"https://steamcommunity.com/workshop/filedetails/?id={self.steam_item_id}"
)

View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

View File

@ -0,0 +1,415 @@
"""
Utilities for fetching Steam Workshop collection data using Steam Web API
"""
import re
import requests
from datetime import datetime
from django.utils import timezone
from django.conf import settings
from typing import Dict, List, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class SteamAPIClient:
"""Client for interacting with Steam Web API"""
BASE_URL = "https://api.steampowered.com"
def __init__(self, api_key: Optional[str] = None):
# Priority: parameter > database > settings > environment
self.api_key = api_key or self._get_api_key_from_db() or getattr(settings, "STEAM_API_KEY", None)
self.session = requests.Session()
if not self.api_key:
logger.warning("No Steam API key provided. Some features may be limited.")
def _get_api_key_from_db(self) -> Optional[str]:
"""Get active API key from database"""
try:
from .models import SteamAPIKey
api_key_obj = SteamAPIKey.get_active_key()
if api_key_obj:
# Update last_used timestamp
from django.utils import timezone
api_key_obj.last_used = timezone.now()
api_key_obj.save(update_fields=['last_used'])
return api_key_obj.api_key
except Exception as e:
logger.debug(f"Could not fetch API key from database: {e}")
return None
def get_published_file_details(self, file_ids: List[str]) -> Dict:
"""
Get details for published files (collections/items) using Steam Web API
Args:
file_ids: List of Steam Workshop file IDs
Returns:
API response data
"""
url = f"{self.BASE_URL}/ISteamRemoteStorage/GetPublishedFileDetails/v1/"
# Prepare form data for POST request
data = {
"itemcount": len(file_ids),
}
# Add each file ID
for i, file_id in enumerate(file_ids):
data[f"publishedfileids[{i}]"] = file_id
try:
response = self.session.post(url, data=data, timeout=30)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"Failed to fetch Steam API data: {e}")
raise
class SteamCollectionFetcher:
"""Utility class for fetching Steam Workshop collection data using Steam API"""
def __init__(self, api_key: Optional[str] = None):
self.api_client = SteamAPIClient(api_key)
def extract_collection_id(self, url: str) -> Optional[str]:
"""
Extract Steam collection ID from various URL formats
Args:
url: Steam Workshop collection URL
Returns:
Collection ID as string, or None if not found
"""
# Handle different URL formats
patterns = [
r"steamcommunity\.com/workshop/filedetails/\?id=(\d+)",
r"steamcommunity\.com/sharedfiles/filedetails/\?id=(\d+)",
r"id=(\d+)",
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def fetch_collection_data(self, url: str) -> Dict:
"""
Fetch collection data from Steam Web API
Args:
url: Steam Workshop collection URL
Returns:
Dictionary containing collection data
Raises:
requests.RequestException: If API request fails
ValueError: If collection ID cannot be extracted or data is invalid
"""
collection_id = self.extract_collection_id(url)
if not collection_id:
raise ValueError(f"Cannot extract collection ID from URL: {url}")
# Fetch collection details from Steam API
api_response = self.api_client.get_published_file_details([collection_id])
if "response" not in api_response:
raise ValueError("Invalid API response format")
response_data = api_response["response"]
if (
"publishedfiledetails" not in response_data
or not response_data["publishedfiledetails"]
):
raise ValueError("No collection data found in API response")
collection_data = response_data["publishedfiledetails"][0]
# Check if collection exists and is accessible
if collection_data.get("result") != 1:
raise ValueError(
f"Collection not found or inaccessible (result: {collection_data.get('result')})"
)
return self._parse_api_collection_data(collection_data, collection_id, url)
def _parse_api_collection_data(
self, api_data: Dict, collection_id: str, url: str
) -> Dict:
"""
Parse collection data from Steam API response
Args:
api_data: Steam API response data for the collection
collection_id: Steam collection ID
url: Original URL
Returns:
Dictionary containing parsed collection data
"""
data = {
"steam_id": collection_id,
"url": url,
"title": api_data.get("title", ""),
"description": api_data.get("description", ""),
"author_name": "",
"author_steam_id": str(api_data.get("creator", "")),
"total_items": 0,
"unique_visitors": api_data.get("views", 0),
"current_favorites": api_data.get("favorited", 0),
"total_favorites": api_data.get("lifetime_favorited", 0),
"steam_created_date": None,
"steam_updated_date": None,
"items": [],
}
# Parse timestamps
if "time_created" in api_data:
data["steam_created_date"] = timezone.make_aware(
datetime.fromtimestamp(api_data["time_created"])
)
if "time_updated" in api_data:
data["steam_updated_date"] = timezone.make_aware(
datetime.fromtimestamp(api_data["time_updated"])
)
# Get author name if we have Steam ID
if data["author_steam_id"]:
try:
author_info = self._get_user_info(data["author_steam_id"])
if author_info:
data["author_name"] = author_info.get("personaname", "")
except Exception as e:
logger.debug(f"Could not fetch author info: {e}")
# Fetch collection items using GetCollectionDetails API
data["items"] = self._fetch_collection_items_via_api(collection_id)
data["total_items"] = len(data["items"])
return data
def _get_user_info(self, steam_id: str) -> Optional[Dict]:
"""
Get user information from Steam API
Args:
steam_id: Steam user ID
Returns:
User info dictionary or None if not available
"""
if not self.api_client.api_key:
return None
url = f"{self.api_client.BASE_URL}/ISteamUser/GetPlayerSummaries/v0002/"
params = {"key": self.api_client.api_key, "steamids": steam_id}
try:
response = self.api_client.session.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if (
"response" in data
and "players" in data["response"]
and data["response"]["players"]
):
return data["response"]["players"][0]
except Exception as e:
logger.debug(f"Failed to fetch user info for {steam_id}: {e}")
return None
def _fetch_collection_items_via_api(self, collection_id: str) -> List[Dict]:
"""
Fetch collection items using GetCollectionDetails API
Args:
collection_id: Steam collection ID
Returns:
List of item dictionaries
"""
items = []
try:
# Use GetCollectionDetails API to get collection items
url = f"{self.api_client.BASE_URL}/ISteamRemoteStorage/GetCollectionDetails/v1/"
data = {
'collectioncount': 1,
'publishedfileids[0]': collection_id
}
response = self.api_client.session.post(url, data=data, timeout=30)
if response.status_code == 200:
collection_response = response.json()
if 'response' in collection_response and 'collectiondetails' in collection_response['response']:
for collection in collection_response['response']['collectiondetails']:
if collection.get('result') == 1 and 'children' in collection:
# Extract item IDs with their sort order
child_items = []
for child in collection['children']:
if 'publishedfileid' in child:
child_items.append({
'id': str(child['publishedfileid']),
'sort_order': child.get('sortorder', 0)
})
# Sort by sort order to maintain collection order
child_items.sort(key=lambda x: x['sort_order'])
item_ids = [item['id'] for item in child_items]
if item_ids:
items = self._fetch_items_by_ids(item_ids)
except Exception as e:
logger.error(f"Failed to fetch collection items via API: {e}")
return items
def _fetch_items_by_ids(self, item_ids: List[str]) -> List[Dict]:
"""Fetch item details by their IDs"""
items = []
# Fetch details for all items in batches (Steam API has limits)
batch_size = 20 # Conservative batch size
for i in range(0, len(item_ids), batch_size):
batch_ids = item_ids[i : i + batch_size]
try:
api_response = self.api_client.get_published_file_details(batch_ids)
if (
"response" in api_response
and "publishedfiledetails" in api_response["response"]
):
for j, item_data in enumerate(
api_response["response"]["publishedfiledetails"]
):
item_id = item_data.get("publishedfileid", "unknown")
result = item_data.get("result", 0)
if result == 1: # Success
item_info = {
"steam_item_id": str(item_id),
"title": item_data.get("title", ""),
"author_name": "",
"author_steam_id": str(item_data.get("creator", "")),
"description": item_data.get("description", ""),
"tags": [
tag.get("tag", "")
for tag in item_data.get("tags", [])
],
"order_index": i + j,
}
# Get author name if available
if item_info["author_steam_id"]:
try:
author_info = self._get_user_info(
item_info["author_steam_id"]
)
if author_info:
item_info["author_name"] = author_info.get(
"personaname", ""
)
except Exception as e:
logger.debug(
f"Could not fetch item author info: {e}"
)
items.append(item_info)
else:
# Log failed items
logger.warning(f"Failed to fetch item {item_id}: result={result}, ban_reason={item_data.get('ban_reason', 'N/A')}")
except Exception as e:
logger.error(f"Failed to fetch batch of collection items: {e}")
continue
return items
def fetch_steam_collection(url: str) -> Dict:
"""
Convenience function to fetch Steam collection data
Args:
url: Steam Workshop collection URL
Returns:
Dictionary containing collection data
"""
fetcher = SteamCollectionFetcher()
return fetcher.fetch_collection_data(url)
def create_or_update_collection(url: str) -> Tuple["SteamCollection", bool]:
"""
Create or update a Steam collection in the database
Args:
url: Steam Workshop collection URL
Returns:
Tuple of (SteamCollection instance, created_flag)
Raises:
ValueError: If collection cannot be fetched or parsed
"""
from .models import SteamCollection, SteamCollectionItem
# Fetch data from Steam
data = fetch_steam_collection(url)
# Create or update collection
collection, created = SteamCollection.objects.update_or_create(
steam_id=data["steam_id"],
defaults={
"url": data["url"],
"title": data["title"],
"description": data["description"],
"author_name": data["author_name"],
"author_steam_id": data["author_steam_id"],
"total_items": data["total_items"],
"unique_visitors": data["unique_visitors"],
"current_favorites": data["current_favorites"],
"total_favorites": data["total_favorites"],
"steam_created_date": data["steam_created_date"],
"steam_updated_date": data["steam_updated_date"],
"last_fetched": timezone.now(),
"fetch_error": "", # Clear any previous errors
},
)
# Update collection items
# First, remove existing items
collection.items.all().delete()
# Add new items
for item_data in data["items"]:
SteamCollectionItem.objects.create(
collection=collection,
steam_item_id=item_data["steam_item_id"],
title=item_data["title"],
author_name=item_data["author_name"],
author_steam_id=item_data["author_steam_id"],
description=item_data["description"],
tags=item_data["tags"],
order_index=item_data["order_index"],
)
return collection, created

View File

@ -0,0 +1,3 @@
from django.shortcuts import render
# Create your views here.

View File

@ -1,21 +0,0 @@
import { defineConfig } from 'vite';
import vue from '@vitejs/plugin-vue';
import { resolve } from 'path';
import { fileURLToPath } from 'node:url';
// https://vitejs.dev/config/
export default defineConfig({
base: '/static/',
plugins: [vue()],
resolve: {
alias: {
'@': fileURLToPath(new URL('./src', import.meta.url)),
},
},
build: {
manifest: 'manifest.json',
outDir: resolve("./static/dist"),
rollupOptions: {
input: { main: resolve('./src/main.ts') }
}
}
});