opus-submitter/opus_submitter/simple_cas_backend.py

93 lines
3.0 KiB
Python

"""
Simple CAS 2.0 authentication backend - bare minimum implementation.
"""
import json
import requests
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import BaseBackend
class SimpleCASBackend(BaseBackend):
"""Simple CAS 2.0 authentication backend."""
def authenticate(self, request, ticket=None, service=None, **kwargs):
"""Authenticate user using CAS ticket."""
if not ticket or not service:
return None
# Validate ticket with CAS server
cas_user_id, attributes = self.validate_ticket(ticket, service)
if not cas_user_id:
return None
print(f"CAS User ID: {cas_user_id}")
print(f"CAS Attributes: {attributes}")
User = get_user_model()
# Try to find user by CAS user ID first, then by username
username = attributes.get("username", cas_user_id).lower()
try:
# First try to find by CAS user ID
user = User.objects.get(cas_user_id=cas_user_id)
except User.DoesNotExist:
try:
# Then try by username
user = User.objects.get(username=username)
# Update the CAS user ID if found by username
user.cas_user_id = cas_user_id
user.save()
except User.DoesNotExist:
# Create new user
user = User.objects.create_user(
username=username,
cas_user_id=cas_user_id,
first_name=attributes.get("firstname", ""),
last_name=attributes.get("lastname", ""),
email=attributes.get("email", ""),
)
# Always update CAS data on login
user.update_cas_data(cas_user_id, attributes)
return user
def validate_ticket(self, ticket, service):
"""Validate CAS ticket and return username and attributes."""
validate_url = f"{settings.CAS_SERVER_URL.rstrip('/')}/serviceValidate"
params = {"ticket": ticket, "service": service, "format": "JSON"}
try:
response = requests.get(validate_url, params=params, timeout=10)
response.raise_for_status()
data = json.loads(response.text)
# Parse CAS 2.0 JSON response
service_response = data.get("serviceResponse", {})
auth_success = service_response.get("authenticationSuccess")
if auth_success:
cas_user_id = auth_success.get("user", "")
attributes = auth_success.get("attributes", {})
return cas_user_id, attributes
return None, None
except Exception as e:
print(f"CAS validation error: {e}")
return None, None
def get_user(self, user_id):
"""Get user by ID."""
User = get_user_model()
try:
return User.objects.get(pk=user_id)
except User.DoesNotExist:
return None