opus-submitter/opus_submitter/submissions/utils.py
2025-10-29 01:26:48 +01:00

416 lines
15 KiB
Python

"""
Utilities for fetching Steam Workshop collection data using Steam Web API
"""
import re
import requests
from datetime import datetime
from django.utils import timezone
from django.conf import settings
from typing import Dict, List, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class SteamAPIClient:
"""Client for interacting with Steam Web API"""
BASE_URL = "https://api.steampowered.com"
def __init__(self, api_key: Optional[str] = None):
# Priority: parameter > database > settings > environment
self.api_key = api_key or self._get_api_key_from_db() or getattr(settings, "STEAM_API_KEY", None)
self.session = requests.Session()
if not self.api_key:
logger.warning("No Steam API key provided. Some features may be limited.")
def _get_api_key_from_db(self) -> Optional[str]:
"""Get active API key from database"""
try:
from .models import SteamAPIKey
api_key_obj = SteamAPIKey.get_active_key()
if api_key_obj:
# Update last_used timestamp
from django.utils import timezone
api_key_obj.last_used = timezone.now()
api_key_obj.save(update_fields=['last_used'])
return api_key_obj.api_key
except Exception as e:
logger.debug(f"Could not fetch API key from database: {e}")
return None
def get_published_file_details(self, file_ids: List[str]) -> Dict:
"""
Get details for published files (collections/items) using Steam Web API
Args:
file_ids: List of Steam Workshop file IDs
Returns:
API response data
"""
url = f"{self.BASE_URL}/ISteamRemoteStorage/GetPublishedFileDetails/v1/"
# Prepare form data for POST request
data = {
"itemcount": len(file_ids),
}
# Add each file ID
for i, file_id in enumerate(file_ids):
data[f"publishedfileids[{i}]"] = file_id
try:
response = self.session.post(url, data=data, timeout=30)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"Failed to fetch Steam API data: {e}")
raise
class SteamCollectionFetcher:
"""Utility class for fetching Steam Workshop collection data using Steam API"""
def __init__(self, api_key: Optional[str] = None):
self.api_client = SteamAPIClient(api_key)
def extract_collection_id(self, url: str) -> Optional[str]:
"""
Extract Steam collection ID from various URL formats
Args:
url: Steam Workshop collection URL
Returns:
Collection ID as string, or None if not found
"""
# Handle different URL formats
patterns = [
r"steamcommunity\.com/workshop/filedetails/\?id=(\d+)",
r"steamcommunity\.com/sharedfiles/filedetails/\?id=(\d+)",
r"id=(\d+)",
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def fetch_collection_data(self, url: str) -> Dict:
"""
Fetch collection data from Steam Web API
Args:
url: Steam Workshop collection URL
Returns:
Dictionary containing collection data
Raises:
requests.RequestException: If API request fails
ValueError: If collection ID cannot be extracted or data is invalid
"""
collection_id = self.extract_collection_id(url)
if not collection_id:
raise ValueError(f"Cannot extract collection ID from URL: {url}")
# Fetch collection details from Steam API
api_response = self.api_client.get_published_file_details([collection_id])
if "response" not in api_response:
raise ValueError("Invalid API response format")
response_data = api_response["response"]
if (
"publishedfiledetails" not in response_data
or not response_data["publishedfiledetails"]
):
raise ValueError("No collection data found in API response")
collection_data = response_data["publishedfiledetails"][0]
# Check if collection exists and is accessible
if collection_data.get("result") != 1:
raise ValueError(
f"Collection not found or inaccessible (result: {collection_data.get('result')})"
)
return self._parse_api_collection_data(collection_data, collection_id, url)
def _parse_api_collection_data(
self, api_data: Dict, collection_id: str, url: str
) -> Dict:
"""
Parse collection data from Steam API response
Args:
api_data: Steam API response data for the collection
collection_id: Steam collection ID
url: Original URL
Returns:
Dictionary containing parsed collection data
"""
data = {
"steam_id": collection_id,
"url": url,
"title": api_data.get("title", ""),
"description": api_data.get("description", ""),
"author_name": "",
"author_steam_id": str(api_data.get("creator", "")),
"total_items": 0,
"unique_visitors": api_data.get("views", 0),
"current_favorites": api_data.get("favorited", 0),
"total_favorites": api_data.get("lifetime_favorited", 0),
"steam_created_date": None,
"steam_updated_date": None,
"items": [],
}
# Parse timestamps
if "time_created" in api_data:
data["steam_created_date"] = timezone.make_aware(
datetime.fromtimestamp(api_data["time_created"])
)
if "time_updated" in api_data:
data["steam_updated_date"] = timezone.make_aware(
datetime.fromtimestamp(api_data["time_updated"])
)
# Get author name if we have Steam ID
if data["author_steam_id"]:
try:
author_info = self._get_user_info(data["author_steam_id"])
if author_info:
data["author_name"] = author_info.get("personaname", "")
except Exception as e:
logger.debug(f"Could not fetch author info: {e}")
# Fetch collection items using GetCollectionDetails API
data["items"] = self._fetch_collection_items_via_api(collection_id)
data["total_items"] = len(data["items"])
return data
def _get_user_info(self, steam_id: str) -> Optional[Dict]:
"""
Get user information from Steam API
Args:
steam_id: Steam user ID
Returns:
User info dictionary or None if not available
"""
if not self.api_client.api_key:
return None
url = f"{self.api_client.BASE_URL}/ISteamUser/GetPlayerSummaries/v0002/"
params = {"key": self.api_client.api_key, "steamids": steam_id}
try:
response = self.api_client.session.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if (
"response" in data
and "players" in data["response"]
and data["response"]["players"]
):
return data["response"]["players"][0]
except Exception as e:
logger.debug(f"Failed to fetch user info for {steam_id}: {e}")
return None
def _fetch_collection_items_via_api(self, collection_id: str) -> List[Dict]:
"""
Fetch collection items using GetCollectionDetails API
Args:
collection_id: Steam collection ID
Returns:
List of item dictionaries
"""
items = []
try:
# Use GetCollectionDetails API to get collection items
url = f"{self.api_client.BASE_URL}/ISteamRemoteStorage/GetCollectionDetails/v1/"
data = {
'collectioncount': 1,
'publishedfileids[0]': collection_id
}
response = self.api_client.session.post(url, data=data, timeout=30)
if response.status_code == 200:
collection_response = response.json()
if 'response' in collection_response and 'collectiondetails' in collection_response['response']:
for collection in collection_response['response']['collectiondetails']:
if collection.get('result') == 1 and 'children' in collection:
# Extract item IDs with their sort order
child_items = []
for child in collection['children']:
if 'publishedfileid' in child:
child_items.append({
'id': str(child['publishedfileid']),
'sort_order': child.get('sortorder', 0)
})
# Sort by sort order to maintain collection order
child_items.sort(key=lambda x: x['sort_order'])
item_ids = [item['id'] for item in child_items]
if item_ids:
items = self._fetch_items_by_ids(item_ids)
except Exception as e:
logger.error(f"Failed to fetch collection items via API: {e}")
return items
def _fetch_items_by_ids(self, item_ids: List[str]) -> List[Dict]:
"""Fetch item details by their IDs"""
items = []
# Fetch details for all items in batches (Steam API has limits)
batch_size = 20 # Conservative batch size
for i in range(0, len(item_ids), batch_size):
batch_ids = item_ids[i : i + batch_size]
try:
api_response = self.api_client.get_published_file_details(batch_ids)
if (
"response" in api_response
and "publishedfiledetails" in api_response["response"]
):
for j, item_data in enumerate(
api_response["response"]["publishedfiledetails"]
):
item_id = item_data.get("publishedfileid", "unknown")
result = item_data.get("result", 0)
if result == 1: # Success
item_info = {
"steam_item_id": str(item_id),
"title": item_data.get("title", ""),
"author_name": "",
"author_steam_id": str(item_data.get("creator", "")),
"description": item_data.get("description", ""),
"tags": [
tag.get("tag", "")
for tag in item_data.get("tags", [])
],
"order_index": i + j,
}
# Get author name if available
if item_info["author_steam_id"]:
try:
author_info = self._get_user_info(
item_info["author_steam_id"]
)
if author_info:
item_info["author_name"] = author_info.get(
"personaname", ""
)
except Exception as e:
logger.debug(
f"Could not fetch item author info: {e}"
)
items.append(item_info)
else:
# Log failed items
logger.warning(f"Failed to fetch item {item_id}: result={result}, ban_reason={item_data.get('ban_reason', 'N/A')}")
except Exception as e:
logger.error(f"Failed to fetch batch of collection items: {e}")
continue
return items
def fetch_steam_collection(url: str) -> Dict:
"""
Convenience function to fetch Steam collection data
Args:
url: Steam Workshop collection URL
Returns:
Dictionary containing collection data
"""
fetcher = SteamCollectionFetcher()
return fetcher.fetch_collection_data(url)
def create_or_update_collection(url: str) -> Tuple["SteamCollection", bool]:
"""
Create or update a Steam collection in the database
Args:
url: Steam Workshop collection URL
Returns:
Tuple of (SteamCollection instance, created_flag)
Raises:
ValueError: If collection cannot be fetched or parsed
"""
from .models import SteamCollection, SteamCollectionItem
# Fetch data from Steam
data = fetch_steam_collection(url)
# Create or update collection
collection, created = SteamCollection.objects.update_or_create(
steam_id=data["steam_id"],
defaults={
"url": data["url"],
"title": data["title"],
"description": data["description"],
"author_name": data["author_name"],
"author_steam_id": data["author_steam_id"],
"total_items": data["total_items"],
"unique_visitors": data["unique_visitors"],
"current_favorites": data["current_favorites"],
"total_favorites": data["total_favorites"],
"steam_created_date": data["steam_created_date"],
"steam_updated_date": data["steam_updated_date"],
"last_fetched": timezone.now(),
"fetch_error": "", # Clear any previous errors
},
)
# Update collection items
# First, remove existing items
collection.items.all().delete()
# Add new items
for item_data in data["items"]:
SteamCollectionItem.objects.create(
collection=collection,
steam_item_id=item_data["steam_item_id"],
title=item_data["title"],
author_name=item_data["author_name"],
author_steam_id=item_data["author_steam_id"],
description=item_data["description"],
tags=item_data["tags"],
order_index=item_data["order_index"],
)
return collection, created