mirror of
https://github.com/Omni-guides/Jackify.git
synced 2026-01-17 11:37:01 +01:00
259 lines
8.4 KiB
Python
259 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Nexus Authentication Service
|
|
Unified service for Nexus authentication using OAuth or API key fallback
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional, Tuple
|
|
from .nexus_oauth_service import NexusOAuthService
|
|
from ..handlers.oauth_token_handler import OAuthTokenHandler
|
|
from .api_key_service import APIKeyService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class NexusAuthService:
|
|
"""
|
|
Unified authentication service for Nexus Mods
|
|
Handles OAuth 2.0 (preferred) with API key fallback (legacy)
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize authentication service"""
|
|
self.oauth_service = NexusOAuthService()
|
|
self.token_handler = OAuthTokenHandler()
|
|
self.api_key_service = APIKeyService()
|
|
logger.debug("NexusAuthService initialized")
|
|
|
|
def get_auth_token(self) -> Optional[str]:
|
|
"""
|
|
Get authentication token, preferring OAuth over API key
|
|
|
|
Returns:
|
|
Access token or API key, or None if no authentication available
|
|
"""
|
|
# Try OAuth first
|
|
oauth_token = self._get_oauth_token()
|
|
if oauth_token:
|
|
logger.debug("Using OAuth token for authentication")
|
|
return oauth_token
|
|
|
|
# Fall back to API key
|
|
api_key = self.api_key_service.get_saved_api_key()
|
|
if api_key:
|
|
logger.debug("Using API key for authentication (OAuth not available)")
|
|
return api_key
|
|
|
|
logger.warning("No authentication available (neither OAuth nor API key)")
|
|
return None
|
|
|
|
def _get_oauth_token(self) -> Optional[str]:
|
|
"""
|
|
Get OAuth access token, refreshing if needed
|
|
|
|
Returns:
|
|
Valid access token or None
|
|
"""
|
|
# Check if we have a stored token
|
|
if not self.token_handler.has_token():
|
|
logger.debug("No OAuth token stored")
|
|
return None
|
|
|
|
# Check if token is expired (15 minute buffer for long installs)
|
|
if self.token_handler.is_token_expired(buffer_minutes=15):
|
|
logger.info("OAuth token expiring soon, attempting refresh")
|
|
|
|
# Try to refresh
|
|
refresh_token = self.token_handler.get_refresh_token()
|
|
if refresh_token:
|
|
new_token_data = self.oauth_service.refresh_token(refresh_token)
|
|
|
|
if new_token_data:
|
|
# Save refreshed token
|
|
self.token_handler.save_token({'oauth': new_token_data})
|
|
logger.info("OAuth token refreshed successfully")
|
|
return new_token_data.get('access_token')
|
|
else:
|
|
logger.warning("Token refresh failed, OAuth token invalid")
|
|
# Delete invalid token
|
|
self.token_handler.delete_token()
|
|
return None
|
|
else:
|
|
logger.warning("No refresh token available")
|
|
return None
|
|
|
|
# Token is valid, return it
|
|
return self.token_handler.get_access_token()
|
|
|
|
def is_authenticated(self) -> bool:
|
|
"""
|
|
Check if user is authenticated via OAuth or API key
|
|
|
|
Returns:
|
|
True if authenticated
|
|
"""
|
|
return self.get_auth_token() is not None
|
|
|
|
def get_auth_method(self) -> Optional[str]:
|
|
"""
|
|
Get current authentication method
|
|
|
|
Returns:
|
|
'oauth', 'api_key', or None
|
|
"""
|
|
# Check OAuth first
|
|
oauth_token = self._get_oauth_token()
|
|
if oauth_token:
|
|
return 'oauth'
|
|
|
|
# Check API key
|
|
api_key = self.api_key_service.get_saved_api_key()
|
|
if api_key:
|
|
return 'api_key'
|
|
|
|
return None
|
|
|
|
def get_auth_status(self) -> Tuple[bool, str, Optional[str]]:
|
|
"""
|
|
Get detailed authentication status
|
|
|
|
Returns:
|
|
Tuple of (authenticated, method, username)
|
|
- authenticated: True if authenticated
|
|
- method: 'oauth', 'oauth_expired', 'api_key', or 'none'
|
|
- username: Username if available (OAuth only), or None
|
|
"""
|
|
# Check if OAuth token exists
|
|
if self.token_handler.has_token():
|
|
# Check if refresh token is likely expired (hasn't been refreshed in 30+ days)
|
|
token_info = self.token_handler.get_token_info()
|
|
if token_info.get('refresh_token_likely_expired'):
|
|
logger.warning("Refresh token likely expired (30+ days old), user should re-authorize")
|
|
return False, 'oauth_expired', None
|
|
|
|
# Try OAuth
|
|
oauth_token = self._get_oauth_token()
|
|
if oauth_token:
|
|
# Try to get username from userinfo
|
|
user_info = self.oauth_service.get_user_info(oauth_token)
|
|
username = user_info.get('name') if user_info else None
|
|
return True, 'oauth', username
|
|
elif self.token_handler.has_token():
|
|
# Had token but couldn't get valid access token (refresh failed)
|
|
logger.warning("OAuth token refresh failed, token may be invalid")
|
|
return False, 'oauth_expired', None
|
|
|
|
# Try API key
|
|
api_key = self.api_key_service.get_saved_api_key()
|
|
if api_key:
|
|
return True, 'api_key', None
|
|
|
|
return False, 'none', None
|
|
|
|
def authorize_oauth(self, show_browser_message_callback=None) -> bool:
|
|
"""
|
|
Perform OAuth authorization flow
|
|
|
|
Args:
|
|
show_browser_message_callback: Optional callback for browser messages
|
|
|
|
Returns:
|
|
True if authorization successful
|
|
"""
|
|
logger.info("Starting OAuth authorization")
|
|
|
|
token_data = self.oauth_service.authorize(show_browser_message_callback)
|
|
|
|
if token_data:
|
|
# Save token
|
|
success = self.token_handler.save_token({'oauth': token_data})
|
|
if success:
|
|
logger.info("OAuth authorization completed successfully")
|
|
return True
|
|
else:
|
|
logger.error("Failed to save OAuth token")
|
|
return False
|
|
else:
|
|
logger.error("OAuth authorization failed")
|
|
return False
|
|
|
|
def revoke_oauth(self) -> bool:
|
|
"""
|
|
Revoke OAuth authorization by deleting stored token
|
|
|
|
Returns:
|
|
True if revoked successfully
|
|
"""
|
|
logger.info("Revoking OAuth authorization")
|
|
return self.token_handler.delete_token()
|
|
|
|
def save_api_key(self, api_key: str) -> bool:
|
|
"""
|
|
Save API key (legacy fallback)
|
|
|
|
Args:
|
|
api_key: Nexus API key
|
|
|
|
Returns:
|
|
True if saved successfully
|
|
"""
|
|
return self.api_key_service.save_api_key(api_key)
|
|
|
|
def validate_api_key(self, api_key: Optional[str] = None) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Validate API key against Nexus API
|
|
|
|
Args:
|
|
api_key: Optional API key to validate (uses stored if not provided)
|
|
|
|
Returns:
|
|
Tuple of (valid, username_or_error)
|
|
"""
|
|
return self.api_key_service.validate_api_key(api_key)
|
|
|
|
def ensure_valid_auth(self) -> Optional[str]:
|
|
"""
|
|
Ensure we have valid authentication, refreshing if needed
|
|
This should be called before any Nexus operation
|
|
|
|
Returns:
|
|
Valid auth token (OAuth access token or API key), or None
|
|
"""
|
|
auth_token = self.get_auth_token()
|
|
|
|
if not auth_token:
|
|
logger.warning("No authentication available for Nexus operation")
|
|
|
|
return auth_token
|
|
|
|
def get_auth_for_engine(self) -> Optional[str]:
|
|
"""
|
|
Get authentication token for jackify-engine
|
|
Same as ensure_valid_auth() - engine uses NEXUS_API_KEY env var for both OAuth and API keys
|
|
(This matches upstream Wabbajack behavior)
|
|
|
|
Returns:
|
|
Valid auth token to pass via NEXUS_API_KEY environment variable, or None
|
|
"""
|
|
return self.ensure_valid_auth()
|
|
|
|
def clear_all_auth(self) -> bool:
|
|
"""
|
|
Clear all authentication (both OAuth and API key)
|
|
Useful for testing or switching accounts
|
|
|
|
Returns:
|
|
True if any auth was cleared
|
|
"""
|
|
oauth_cleared = self.token_handler.delete_token()
|
|
api_key_cleared = self.api_key_service.clear_api_key()
|
|
|
|
if oauth_cleared or api_key_cleared:
|
|
logger.info("Cleared all Nexus authentication")
|
|
return True
|
|
else:
|
|
logger.debug("No authentication to clear")
|
|
return False
|