""" Utilities for fetching Steam Workshop collection data using Steam Web API """ import re import requests from datetime import datetime from django.utils import timezone from django.conf import settings from typing import Dict, List, Optional, Tuple import logging logger = logging.getLogger(__name__) class SteamAPIClient: """Client for interacting with Steam Web API""" BASE_URL = "https://api.steampowered.com" def __init__(self, api_key: Optional[str] = None): # Priority: parameter > database > settings > environment self.api_key = api_key or self._get_api_key_from_db() or getattr(settings, "STEAM_API_KEY", None) self.session = requests.Session() if not self.api_key: logger.warning("No Steam API key provided. Some features may be limited.") def _get_api_key_from_db(self) -> Optional[str]: """Get active API key from database""" try: from .models import SteamAPIKey api_key_obj = SteamAPIKey.get_active_key() if api_key_obj: # Update last_used timestamp from django.utils import timezone api_key_obj.last_used = timezone.now() api_key_obj.save(update_fields=['last_used']) return api_key_obj.api_key except Exception as e: logger.debug(f"Could not fetch API key from database: {e}") return None def get_published_file_details(self, file_ids: List[str]) -> Dict: """ Get details for published files (collections/items) using Steam Web API Args: file_ids: List of Steam Workshop file IDs Returns: API response data """ url = f"{self.BASE_URL}/ISteamRemoteStorage/GetPublishedFileDetails/v1/" # Prepare form data for POST request data = { "itemcount": len(file_ids), } # Add each file ID for i, file_id in enumerate(file_ids): data[f"publishedfileids[{i}]"] = file_id try: response = self.session.post(url, data=data, timeout=30) response.raise_for_status() return response.json() except requests.RequestException as e: logger.error(f"Failed to fetch Steam API data: {e}") raise class SteamCollectionFetcher: """Utility class for fetching Steam Workshop collection data using Steam API""" def __init__(self, api_key: Optional[str] = None): self.api_client = SteamAPIClient(api_key) def extract_collection_id(self, url: str) -> Optional[str]: """ Extract Steam collection ID from various URL formats Args: url: Steam Workshop collection URL Returns: Collection ID as string, or None if not found """ # Handle different URL formats patterns = [ r"steamcommunity\.com/workshop/filedetails/\?id=(\d+)", r"steamcommunity\.com/sharedfiles/filedetails/\?id=(\d+)", r"id=(\d+)", ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None def fetch_collection_data(self, url: str) -> Dict: """ Fetch collection data from Steam Web API Args: url: Steam Workshop collection URL Returns: Dictionary containing collection data Raises: requests.RequestException: If API request fails ValueError: If collection ID cannot be extracted or data is invalid """ collection_id = self.extract_collection_id(url) if not collection_id: raise ValueError(f"Cannot extract collection ID from URL: {url}") # Fetch collection details from Steam API api_response = self.api_client.get_published_file_details([collection_id]) if "response" not in api_response: raise ValueError("Invalid API response format") response_data = api_response["response"] if ( "publishedfiledetails" not in response_data or not response_data["publishedfiledetails"] ): raise ValueError("No collection data found in API response") collection_data = response_data["publishedfiledetails"][0] # Check if collection exists and is accessible if collection_data.get("result") != 1: raise ValueError( f"Collection not found or inaccessible (result: {collection_data.get('result')})" ) return self._parse_api_collection_data(collection_data, collection_id, url) def _parse_api_collection_data( self, api_data: Dict, collection_id: str, url: str ) -> Dict: """ Parse collection data from Steam API response Args: api_data: Steam API response data for the collection collection_id: Steam collection ID url: Original URL Returns: Dictionary containing parsed collection data """ data = { "steam_id": collection_id, "url": url, "title": api_data.get("title", ""), "description": api_data.get("description", ""), "author_name": "", "author_steam_id": str(api_data.get("creator", "")), "total_items": 0, "unique_visitors": api_data.get("views", 0), "current_favorites": api_data.get("favorited", 0), "total_favorites": api_data.get("lifetime_favorited", 0), "steam_created_date": None, "steam_updated_date": None, "items": [], } # Parse timestamps if "time_created" in api_data: data["steam_created_date"] = timezone.make_aware( datetime.fromtimestamp(api_data["time_created"]) ) if "time_updated" in api_data: data["steam_updated_date"] = timezone.make_aware( datetime.fromtimestamp(api_data["time_updated"]) ) # Get author name if we have Steam ID if data["author_steam_id"]: try: author_info = self._get_user_info(data["author_steam_id"]) if author_info: data["author_name"] = author_info.get("personaname", "") except Exception as e: logger.debug(f"Could not fetch author info: {e}") # Fetch collection items using GetCollectionDetails API data["items"] = self._fetch_collection_items_via_api(collection_id) data["total_items"] = len(data["items"]) return data def _get_user_info(self, steam_id: str) -> Optional[Dict]: """ Get user information from Steam API Args: steam_id: Steam user ID Returns: User info dictionary or None if not available """ if not self.api_client.api_key: return None url = f"{self.api_client.BASE_URL}/ISteamUser/GetPlayerSummaries/v0002/" params = {"key": self.api_client.api_key, "steamids": steam_id} try: response = self.api_client.session.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() if ( "response" in data and "players" in data["response"] and data["response"]["players"] ): return data["response"]["players"][0] except Exception as e: logger.debug(f"Failed to fetch user info for {steam_id}: {e}") return None def _fetch_collection_items_via_api(self, collection_id: str) -> List[Dict]: """ Fetch collection items using GetCollectionDetails API Args: collection_id: Steam collection ID Returns: List of item dictionaries """ items = [] try: # Use GetCollectionDetails API to get collection items url = f"{self.api_client.BASE_URL}/ISteamRemoteStorage/GetCollectionDetails/v1/" data = { 'collectioncount': 1, 'publishedfileids[0]': collection_id } response = self.api_client.session.post(url, data=data, timeout=30) if response.status_code == 200: collection_response = response.json() if 'response' in collection_response and 'collectiondetails' in collection_response['response']: for collection in collection_response['response']['collectiondetails']: if collection.get('result') == 1 and 'children' in collection: # Extract item IDs with their sort order child_items = [] for child in collection['children']: if 'publishedfileid' in child: child_items.append({ 'id': str(child['publishedfileid']), 'sort_order': child.get('sortorder', 0) }) # Sort by sort order to maintain collection order child_items.sort(key=lambda x: x['sort_order']) item_ids = [item['id'] for item in child_items] if item_ids: items = self._fetch_items_by_ids(item_ids) except Exception as e: logger.error(f"Failed to fetch collection items via API: {e}") return items def _fetch_items_by_ids(self, item_ids: List[str]) -> List[Dict]: """Fetch item details by their IDs""" items = [] # Fetch details for all items in batches (Steam API has limits) batch_size = 20 # Conservative batch size for i in range(0, len(item_ids), batch_size): batch_ids = item_ids[i : i + batch_size] try: api_response = self.api_client.get_published_file_details(batch_ids) if ( "response" in api_response and "publishedfiledetails" in api_response["response"] ): for j, item_data in enumerate( api_response["response"]["publishedfiledetails"] ): item_id = item_data.get("publishedfileid", "unknown") result = item_data.get("result", 0) if result == 1: # Success item_info = { "steam_item_id": str(item_id), "title": item_data.get("title", ""), "author_name": "", "author_steam_id": str(item_data.get("creator", "")), "description": item_data.get("description", ""), "tags": [ tag.get("tag", "") for tag in item_data.get("tags", []) ], "order_index": i + j, } # Get author name if available if item_info["author_steam_id"]: try: author_info = self._get_user_info( item_info["author_steam_id"] ) if author_info: item_info["author_name"] = author_info.get( "personaname", "" ) except Exception as e: logger.debug( f"Could not fetch item author info: {e}" ) items.append(item_info) else: # Log failed items logger.warning(f"Failed to fetch item {item_id}: result={result}, ban_reason={item_data.get('ban_reason', 'N/A')}") except Exception as e: logger.error(f"Failed to fetch batch of collection items: {e}") continue return items def fetch_steam_collection(url: str) -> Dict: """ Convenience function to fetch Steam collection data Args: url: Steam Workshop collection URL Returns: Dictionary containing collection data """ fetcher = SteamCollectionFetcher() return fetcher.fetch_collection_data(url) def create_or_update_collection(url: str) -> Tuple["SteamCollection", bool]: """ Create or update a Steam collection in the database Args: url: Steam Workshop collection URL Returns: Tuple of (SteamCollection instance, created_flag) Raises: ValueError: If collection cannot be fetched or parsed """ from .models import SteamCollection, SteamCollectionItem # Fetch data from Steam data = fetch_steam_collection(url) # Create or update collection collection, created = SteamCollection.objects.update_or_create( steam_id=data["steam_id"], defaults={ "url": data["url"], "title": data["title"], "description": data["description"], "author_name": data["author_name"], "author_steam_id": data["author_steam_id"], "total_items": data["total_items"], "unique_visitors": data["unique_visitors"], "current_favorites": data["current_favorites"], "total_favorites": data["total_favorites"], "steam_created_date": data["steam_created_date"], "steam_updated_date": data["steam_updated_date"], "last_fetched": timezone.now(), "fetch_error": "", # Clear any previous errors }, ) # Update collection items # First, remove existing items collection.items.all().delete() # Add new items for item_data in data["items"]: SteamCollectionItem.objects.create( collection=collection, steam_item_id=item_data["steam_item_id"], title=item_data["title"], author_name=item_data["author_name"], author_steam_id=item_data["author_steam_id"], description=item_data["description"], tags=item_data["tags"], order_index=item_data["order_index"], ) return collection, created