mirror of
https://github.com/Omni-guides/Jackify.git
synced 2026-01-17 11:37:01 +01:00
443 lines
14 KiB
Python
443 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
OAuth Token Handler
|
|
Handles encrypted storage and retrieval of OAuth tokens
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import base64
|
|
import hashlib
|
|
import logging
|
|
import time
|
|
from typing import Optional, Dict
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OAuthTokenHandler:
|
|
"""
|
|
Handles OAuth token storage with simple encryption
|
|
Stores tokens in ~/.config/jackify/nexus-oauth.json
|
|
"""
|
|
|
|
def __init__(self, config_dir: Optional[str] = None):
|
|
"""
|
|
Initialize token handler
|
|
|
|
Args:
|
|
config_dir: Optional custom config directory (defaults to ~/.config/jackify)
|
|
"""
|
|
if config_dir:
|
|
self.config_dir = Path(config_dir)
|
|
else:
|
|
self.config_dir = Path.home() / ".config" / "jackify"
|
|
|
|
self.token_file = self.config_dir / "nexus-oauth.json"
|
|
|
|
# Ensure config directory exists
|
|
self.config_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Generate encryption key based on machine-specific data
|
|
self._encryption_key = self._generate_encryption_key()
|
|
|
|
def _generate_encryption_key(self) -> bytes:
|
|
"""
|
|
Generate encryption key based on machine-specific data using Fernet
|
|
|
|
Uses hostname + username + machine ID as key material, similar to DPAPI approach.
|
|
This provides proper symmetric encryption while remaining machine-specific.
|
|
|
|
Returns:
|
|
Fernet-compatible 32-byte encryption key
|
|
"""
|
|
import socket
|
|
import getpass
|
|
|
|
try:
|
|
hostname = socket.gethostname()
|
|
username = getpass.getuser()
|
|
|
|
# Try to get machine ID for additional entropy
|
|
machine_id = None
|
|
try:
|
|
# Linux machine-id
|
|
with open('/etc/machine-id', 'r') as f:
|
|
machine_id = f.read().strip()
|
|
except:
|
|
try:
|
|
# Alternative locations
|
|
with open('/var/lib/dbus/machine-id', 'r') as f:
|
|
machine_id = f.read().strip()
|
|
except:
|
|
pass
|
|
|
|
# Combine multiple sources of machine-specific data
|
|
if machine_id:
|
|
key_material = f"{hostname}:{username}:{machine_id}:jackify"
|
|
else:
|
|
key_material = f"{hostname}:{username}:jackify"
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get machine info for encryption: {e}")
|
|
key_material = "jackify:default:key"
|
|
|
|
# Generate 32-byte key using SHA256 for Fernet
|
|
# Fernet requires base64-encoded 32-byte key
|
|
key_bytes = hashlib.sha256(key_material.encode('utf-8')).digest()
|
|
return base64.urlsafe_b64encode(key_bytes)
|
|
|
|
def _encrypt_data(self, data: str) -> str:
|
|
"""
|
|
Encrypt data using AES-GCM (authenticated encryption)
|
|
|
|
Uses pycryptodome for cross-platform compatibility.
|
|
AES-GCM provides authenticated encryption similar to Fernet.
|
|
|
|
Args:
|
|
data: Plain text data
|
|
|
|
Returns:
|
|
Encrypted data as base64 string (nonce:ciphertext:tag format)
|
|
"""
|
|
try:
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Random import get_random_bytes
|
|
|
|
# Derive 32-byte AES key from encryption_key (which is base64-encoded)
|
|
key = base64.urlsafe_b64decode(self._encryption_key)
|
|
|
|
# Generate random nonce (12 bytes for GCM)
|
|
nonce = get_random_bytes(12)
|
|
|
|
# Create AES-GCM cipher
|
|
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
|
|
|
|
# Encrypt and get authentication tag
|
|
data_bytes = data.encode('utf-8')
|
|
ciphertext, tag = cipher.encrypt_and_digest(data_bytes)
|
|
|
|
# Combine nonce:ciphertext:tag and base64 encode
|
|
combined = nonce + ciphertext + tag
|
|
return base64.b64encode(combined).decode('utf-8')
|
|
|
|
except ImportError:
|
|
logger.error("pycryptodome package not available for token encryption")
|
|
return ""
|
|
except Exception as e:
|
|
logger.error(f"Failed to encrypt data: {e}")
|
|
return ""
|
|
|
|
def _decrypt_data(self, encrypted_data: str) -> Optional[str]:
|
|
"""
|
|
Decrypt data using AES-GCM (authenticated encryption)
|
|
|
|
Args:
|
|
encrypted_data: Encrypted data string (base64-encoded nonce:ciphertext:tag)
|
|
|
|
Returns:
|
|
Decrypted plain text or None on failure
|
|
"""
|
|
try:
|
|
from Crypto.Cipher import AES
|
|
|
|
# Check if MODE_GCM is available (pycryptodome has it, old pycrypto doesn't)
|
|
if not hasattr(AES, 'MODE_GCM'):
|
|
logger.error("pycryptodome required for token decryption (pycrypto doesn't support MODE_GCM)")
|
|
return None
|
|
|
|
# Derive 32-byte AES key from encryption_key
|
|
key = base64.urlsafe_b64decode(self._encryption_key)
|
|
|
|
# Decode base64 and split nonce:ciphertext:tag
|
|
combined = base64.b64decode(encrypted_data.encode('utf-8'))
|
|
nonce = combined[:12]
|
|
tag = combined[-16:]
|
|
ciphertext = combined[12:-16]
|
|
|
|
# Create AES-GCM cipher
|
|
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
|
|
|
|
# Decrypt and verify authentication tag
|
|
plaintext = cipher.decrypt_and_verify(ciphertext, tag)
|
|
|
|
return plaintext.decode('utf-8')
|
|
|
|
except ImportError:
|
|
logger.error("pycryptodome package not available for token decryption")
|
|
return None
|
|
except AttributeError:
|
|
logger.error("pycryptodome required for token decryption (pycrypto doesn't support MODE_GCM)")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to decrypt data: {e}")
|
|
return None
|
|
|
|
def save_token(self, token_data: Dict) -> bool:
|
|
"""
|
|
Save OAuth token to encrypted file with proper permissions
|
|
|
|
Args:
|
|
token_data: Token data dict from OAuth response
|
|
|
|
Returns:
|
|
True if saved successfully
|
|
"""
|
|
try:
|
|
# Add timestamp for tracking
|
|
token_data['_saved_at'] = int(time.time())
|
|
|
|
# Convert to JSON
|
|
json_data = json.dumps(token_data, indent=2)
|
|
|
|
# Encrypt using Fernet
|
|
encrypted = self._encrypt_data(json_data)
|
|
|
|
if not encrypted:
|
|
logger.error("Encryption failed, cannot save token")
|
|
return False
|
|
|
|
# Save to file with restricted permissions
|
|
# Write to temp file first, then move (atomic operation)
|
|
import tempfile
|
|
fd, temp_path = tempfile.mkstemp(dir=self.config_dir, prefix='.oauth_tmp_')
|
|
|
|
try:
|
|
with os.fdopen(fd, 'w') as f:
|
|
json.dump({'encrypted_data': encrypted}, f, indent=2)
|
|
|
|
# Set restrictive permissions (owner read/write only)
|
|
os.chmod(temp_path, 0o600)
|
|
|
|
# Atomic move
|
|
os.replace(temp_path, self.token_file)
|
|
|
|
logger.info(f"Saved encrypted OAuth token to {self.token_file}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
# Clean up temp file on error
|
|
try:
|
|
os.unlink(temp_path)
|
|
except:
|
|
pass
|
|
raise e
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save OAuth token: {e}")
|
|
return False
|
|
|
|
def load_token(self) -> Optional[Dict]:
|
|
"""
|
|
Load OAuth token from encrypted file
|
|
|
|
Returns:
|
|
Token data dict or None if not found or invalid
|
|
"""
|
|
if not self.token_file.exists():
|
|
logger.debug("No OAuth token file found")
|
|
return None
|
|
|
|
try:
|
|
# Load encrypted data
|
|
with open(self.token_file, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
encrypted = data.get('encrypted_data')
|
|
if not encrypted:
|
|
logger.error("Token file missing encrypted_data field")
|
|
return None
|
|
|
|
# Decrypt
|
|
decrypted = self._decrypt_data(encrypted)
|
|
if not decrypted:
|
|
logger.error("Failed to decrypt token data")
|
|
return None
|
|
|
|
# Parse JSON
|
|
token_data = json.loads(decrypted)
|
|
|
|
logger.debug("Successfully loaded OAuth token")
|
|
return token_data
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Token file contains invalid JSON: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to load OAuth token: {e}")
|
|
return None
|
|
|
|
def delete_token(self) -> bool:
|
|
"""
|
|
Delete OAuth token file
|
|
|
|
Returns:
|
|
True if deleted successfully
|
|
"""
|
|
try:
|
|
if self.token_file.exists():
|
|
self.token_file.unlink()
|
|
logger.info("Deleted OAuth token file")
|
|
return True
|
|
else:
|
|
logger.debug("No OAuth token file to delete")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete OAuth token: {e}")
|
|
return False
|
|
|
|
def has_token(self) -> bool:
|
|
"""
|
|
Check if OAuth token file exists
|
|
|
|
Returns:
|
|
True if token file exists
|
|
"""
|
|
return self.token_file.exists()
|
|
|
|
def is_token_expired(self, token_data: Optional[Dict] = None, buffer_minutes: int = 5) -> bool:
|
|
"""
|
|
Check if token is expired or close to expiring
|
|
|
|
Args:
|
|
token_data: Optional token data dict (loads from file if not provided)
|
|
buffer_minutes: Minutes before expiry to consider token expired (default 5)
|
|
|
|
Returns:
|
|
True if token is expired or will expire within buffer_minutes
|
|
"""
|
|
if token_data is None:
|
|
token_data = self.load_token()
|
|
|
|
if not token_data:
|
|
return True
|
|
|
|
# Extract OAuth data if nested
|
|
oauth_data = token_data.get('oauth', token_data)
|
|
|
|
# Get expiry information
|
|
expires_in = oauth_data.get('expires_in')
|
|
saved_at = token_data.get('_saved_at')
|
|
|
|
if not expires_in or not saved_at:
|
|
logger.debug("Token missing expiry information, assuming valid")
|
|
return False # Assume token is valid if no expiry info
|
|
|
|
# Calculate expiry time
|
|
expires_at = saved_at + expires_in
|
|
buffer_seconds = buffer_minutes * 60
|
|
now = int(time.time())
|
|
|
|
# Check if expired or within buffer
|
|
is_expired = (expires_at - buffer_seconds) < now
|
|
|
|
if is_expired:
|
|
remaining = expires_at - now
|
|
if remaining < 0:
|
|
logger.debug(f"Token expired {-remaining} seconds ago")
|
|
else:
|
|
logger.debug(f"Token expires in {remaining} seconds (within buffer)")
|
|
|
|
return is_expired
|
|
|
|
def get_access_token(self) -> Optional[str]:
|
|
"""
|
|
Get access token from storage
|
|
|
|
Returns:
|
|
Access token string or None if not found or expired
|
|
"""
|
|
token_data = self.load_token()
|
|
|
|
if not token_data:
|
|
return None
|
|
|
|
# Check if expired
|
|
if self.is_token_expired(token_data):
|
|
logger.debug("Stored token is expired")
|
|
return None
|
|
|
|
# Extract access token from OAuth structure
|
|
oauth_data = token_data.get('oauth', token_data)
|
|
access_token = oauth_data.get('access_token')
|
|
|
|
if not access_token:
|
|
logger.error("Token data missing access_token field")
|
|
return None
|
|
|
|
return access_token
|
|
|
|
def get_refresh_token(self) -> Optional[str]:
|
|
"""
|
|
Get refresh token from storage
|
|
|
|
Returns:
|
|
Refresh token string or None if not found
|
|
"""
|
|
token_data = self.load_token()
|
|
|
|
if not token_data:
|
|
return None
|
|
|
|
# Extract refresh token from OAuth structure
|
|
oauth_data = token_data.get('oauth', token_data)
|
|
refresh_token = oauth_data.get('refresh_token')
|
|
|
|
return refresh_token
|
|
|
|
def get_token_info(self) -> Dict:
|
|
"""
|
|
Get diagnostic information about current token
|
|
|
|
Returns:
|
|
Dict with token status information
|
|
"""
|
|
token_data = self.load_token()
|
|
|
|
if not token_data:
|
|
return {
|
|
'has_token': False,
|
|
'error': 'No token file found'
|
|
}
|
|
|
|
oauth_data = token_data.get('oauth', token_data)
|
|
expires_in = oauth_data.get('expires_in')
|
|
saved_at = token_data.get('_saved_at')
|
|
|
|
# Check if refresh token is likely expired (30 days since last auth)
|
|
# Nexus doesn't provide refresh token expiry, so we estimate conservatively
|
|
REFRESH_TOKEN_LIFETIME_DAYS = 30
|
|
now = int(time.time())
|
|
refresh_token_age_days = (now - saved_at) / 86400 if saved_at else 0
|
|
refresh_token_likely_expired = refresh_token_age_days > REFRESH_TOKEN_LIFETIME_DAYS
|
|
|
|
if expires_in and saved_at:
|
|
expires_at = saved_at + expires_in
|
|
remaining_seconds = expires_at - now
|
|
|
|
return {
|
|
'has_token': True,
|
|
'has_refresh_token': bool(oauth_data.get('refresh_token')),
|
|
'expires_in_seconds': remaining_seconds,
|
|
'expires_in_minutes': remaining_seconds / 60,
|
|
'expires_in_hours': remaining_seconds / 3600,
|
|
'is_expired': remaining_seconds < 0,
|
|
'expires_soon_5min': remaining_seconds < 300,
|
|
'expires_soon_15min': remaining_seconds < 900,
|
|
'saved_at': saved_at,
|
|
'expires_at': expires_at,
|
|
'refresh_token_age_days': refresh_token_age_days,
|
|
'refresh_token_likely_expired': refresh_token_likely_expired,
|
|
}
|
|
else:
|
|
return {
|
|
'has_token': True,
|
|
'has_refresh_token': bool(oauth_data.get('refresh_token')),
|
|
'refresh_token_age_days': refresh_token_age_days,
|
|
'refresh_token_likely_expired': refresh_token_likely_expired,
|
|
'error': 'Token missing expiry information'
|
|
}
|