93 lines
3.0 KiB
Python
93 lines
3.0 KiB
Python
"""
|
|
Simple CAS 2.0 authentication backend - bare minimum implementation.
|
|
"""
|
|
|
|
import json
|
|
import requests
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.backends import BaseBackend
|
|
|
|
|
|
class SimpleCASBackend(BaseBackend):
|
|
"""Simple CAS 2.0 authentication backend."""
|
|
|
|
def authenticate(self, request, ticket=None, service=None, **kwargs):
|
|
"""Authenticate user using CAS ticket."""
|
|
if not ticket or not service:
|
|
return None
|
|
|
|
# Validate ticket with CAS server
|
|
cas_user_id, attributes = self.validate_ticket(ticket, service)
|
|
|
|
if not cas_user_id:
|
|
return None
|
|
|
|
print(f"CAS User ID: {cas_user_id}")
|
|
print(f"CAS Attributes: {attributes}")
|
|
|
|
User = get_user_model()
|
|
|
|
# Try to find user by CAS user ID first, then by username
|
|
username = attributes.get("username", cas_user_id).lower()
|
|
|
|
try:
|
|
# First try to find by CAS user ID
|
|
user = User.objects.get(cas_user_id=cas_user_id)
|
|
except User.DoesNotExist:
|
|
try:
|
|
# Then try by username
|
|
user = User.objects.get(username=username)
|
|
# Update the CAS user ID if found by username
|
|
user.cas_user_id = cas_user_id
|
|
user.save()
|
|
except User.DoesNotExist:
|
|
# Create new user
|
|
user = User.objects.create_user(
|
|
username=username,
|
|
cas_user_id=cas_user_id,
|
|
first_name=attributes.get("firstname", ""),
|
|
last_name=attributes.get("lastname", ""),
|
|
email=attributes.get("email", ""),
|
|
)
|
|
|
|
# Always update CAS data on login
|
|
user.update_cas_data(cas_user_id, attributes)
|
|
|
|
return user
|
|
|
|
def validate_ticket(self, ticket, service):
|
|
"""Validate CAS ticket and return username and attributes."""
|
|
validate_url = f"{settings.CAS_SERVER_URL.rstrip('/')}/serviceValidate"
|
|
|
|
params = {"ticket": ticket, "service": service, "format": "JSON"}
|
|
|
|
try:
|
|
response = requests.get(validate_url, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
data = json.loads(response.text)
|
|
|
|
# Parse CAS 2.0 JSON response
|
|
service_response = data.get("serviceResponse", {})
|
|
auth_success = service_response.get("authenticationSuccess")
|
|
|
|
if auth_success:
|
|
cas_user_id = auth_success.get("user", "")
|
|
attributes = auth_success.get("attributes", {})
|
|
return cas_user_id, attributes
|
|
|
|
return None, None
|
|
|
|
except Exception as e:
|
|
print(f"CAS validation error: {e}")
|
|
return None, None
|
|
|
|
def get_user(self, user_id):
|
|
"""Get user by ID."""
|
|
User = get_user_model()
|
|
try:
|
|
return User.objects.get(pk=user_id)
|
|
except User.DoesNotExist:
|
|
return None
|