Sync from development - prepare for v0.4.0

This commit is contained in:
Omni
2026-02-25 17:40:43 +00:00
parent 2eb54b9a36
commit 805718222a
324 changed files with 4914 additions and 4567 deletions

View File

@@ -8,250 +8,9 @@ import subprocess
logger = logging.getLogger(__name__)
def debug_print(message):
"""Log debug message only if debug mode is enabled"""
from jackify.backend.handlers.config_handler import ConfigHandler
config_handler = ConfigHandler()
if config_handler.get('debug_mode', False):
logger.debug(message)
class PrefixCreationMixin:
"""Mixin providing prefix creation methods for AutomatedPrefixService."""
def detect_actual_prefix_appid(self, initial_appid: int, shortcut_name: str) -> Optional[int]:
"""
After Steam restart, detect the actual prefix AppID that was created.
Uses direct VDF file reading to find the actual AppID.
Args:
initial_appid: The initial (negative) AppID from shortcuts.vdf
shortcut_name: Name of the shortcut for logging
Returns:
The actual (positive) AppID of the created prefix, or None if not found
"""
try:
logger.info(f"Using VDF to detect actual AppID for shortcut: {shortcut_name}")
# Wait up to 30 seconds for Steam to process the shortcut
for i in range(30):
try:
from ..handlers.shortcut_handler import ShortcutHandler
from ..handlers.path_handler import PathHandler
path_handler = PathHandler()
shortcuts_path = path_handler._find_shortcuts_vdf()
if shortcuts_path:
from ..handlers.vdf_handler import VDFHandler
shortcuts_data = VDFHandler.load(shortcuts_path, binary=True)
if shortcuts_data and 'shortcuts' in shortcuts_data:
for idx, shortcut in shortcuts_data['shortcuts'].items():
app_name = shortcut.get('AppName', shortcut.get('appname', '')).strip()
if app_name.lower() == shortcut_name.lower():
appid = shortcut.get('appid')
if appid:
actual_appid = int(appid) & 0xFFFFFFFF
logger.info(f"Found shortcut '{app_name}' in shortcuts.vdf")
logger.info(f" Initial AppID (signed): {initial_appid}")
logger.info(f" Actual AppID (unsigned): {actual_appid}")
return actual_appid
logger.debug(f"Shortcut '{shortcut_name}' not found in VDF yet (attempt {i+1}/30)")
time.sleep(1)
except Exception as e:
logger.warning(f"Error reading shortcuts.vdf on attempt {i+1}: {e}")
time.sleep(1)
logger.error(f"Shortcut '{shortcut_name}' not found in shortcuts.vdf after 30 seconds")
return None
except Exception as e:
logger.error(f"Error detecting actual prefix AppID: {e}")
return None
def launch_shortcut_to_trigger_prefix(self, initial_appid: int) -> bool:
"""
Launch the shortcut using rungameid to trigger prefix creation.
This follows the same pattern as the working test script.
Args:
initial_appid: The initial (negative) AppID from shortcuts.vdf
Returns:
True if successful, False otherwise
"""
try:
# Convert signed AppID to unsigned AppID (same as STL's generateSteamShortID)
unsigned_appid = self.generate_steam_short_id(initial_appid)
# Calculate rungameid using the unsigned AppID
rungameid = (unsigned_appid << 32) | 0x02000000
logger.info(f"Launching shortcut with rungameid: {rungameid}")
debug_print(f"[DEBUG] Launching shortcut with rungameid: {rungameid}")
debug_print(f"[DEBUG] Initial signed AppID: {initial_appid}")
debug_print(f"[DEBUG] Unsigned AppID: {unsigned_appid}")
# Launch using rungameid
cmd = ['steam', f'steam://rungameid/{rungameid}']
debug_print(f"[DEBUG] About to run launch command: {' '.join(cmd)}")
# Use subprocess.Popen to launch asynchronously (steam command returns immediately)
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Wait a moment for the process to start
time.sleep(1)
# Check if the process is still running (steam command should exit quickly)
try:
return_code = process.poll()
if return_code is None:
# Process is still running, wait a bit more
time.sleep(2)
return_code = process.poll()
debug_print(f"[DEBUG] Steam launch process return code: {return_code}")
# Get any output
stdout, stderr = process.communicate(timeout=1)
if stdout:
debug_print(f"[DEBUG] Steam launch stdout: {stdout}")
if stderr:
debug_print(f"[DEBUG] Steam launch stderr: {stderr}")
except subprocess.TimeoutExpired:
debug_print("[DEBUG] Steam launch process timed out, but that's OK")
process.kill()
logger.info(f"Launch command executed: {' '.join(cmd)}")
# Give it a moment for the shortcut to actually start
time.sleep(5)
return True
except subprocess.TimeoutExpired:
logger.error("Launch command timed out")
debug_print("[DEBUG] Launch command timed out")
return False
except Exception as e:
logger.error(f"Error launching shortcut: {e}")
debug_print(f"[DEBUG] Error launching shortcut: {e}")
return False
def create_prefix_directly(self, appid: int, batch_file_path: str) -> Optional[Path]:
"""
Create prefix directly using Proton wrapper.
Args:
appid: The AppID from the shortcut
batch_file_path: Path to the temporary batch file
Returns:
Path to the created prefix, or None if failed
"""
proton_path = self.find_proton_experimental()
if not proton_path:
return None
# Steam uses negative AppIDs for non-Steam shortcuts, but we need the positive value for the prefix path
positive_appid = abs(appid)
logger.info(f"Using positive AppID {positive_appid} for prefix creation (original: {appid})")
# Create the prefix directory structure
prefix_path = self._get_compatdata_path_for_appid(positive_appid)
if not prefix_path:
logger.error(f"Could not determine compatdata path for AppID {positive_appid}")
return None
# Create the prefix directory structure
prefix_path.mkdir(parents=True, exist_ok=True)
pfx_dir = prefix_path / "pfx"
pfx_dir.mkdir(exist_ok=True)
# Set up environment
env = os.environ.copy()
env['STEAM_COMPAT_DATA_PATH'] = str(prefix_path)
env['STEAM_COMPAT_APP_ID'] = str(positive_appid) # Use positive AppID for environment
# Determine correct Steam root based on installation type
from ..handlers.path_handler import PathHandler
path_handler = PathHandler()
steam_library = path_handler.find_steam_library()
if steam_library and steam_library.name == "common":
# Extract Steam root from library path: .../Steam/steamapps/common -> .../Steam
steam_root = steam_library.parent.parent
env['STEAM_COMPAT_CLIENT_INSTALL_PATH'] = str(steam_root)
else:
# Fallback to legacy path if detection fails
env['STEAM_COMPAT_CLIENT_INSTALL_PATH'] = str(Path.home() / ".local/share/Steam")
# Build the command
cmd = [
str(proton_path / "proton"),
"run",
batch_file_path
]
logger.info(f"Creating prefix with command: {' '.join(cmd)}")
logger.info(f"Prefix path: {prefix_path}")
logger.info(f"Using AppID: {positive_appid} (original: {appid})")
try:
# Run the command with a timeout
result = subprocess.run(
cmd,
env=env,
capture_output=True,
text=True,
timeout=30
)
# Check if prefix was created
time.sleep(2) # Give it a moment to settle
prefix_created = prefix_path.exists()
pfx_exists = (prefix_path / "pfx").exists()
logger.info(f"Return code: {result.returncode}")
logger.info(f"Prefix created: {prefix_created}")
logger.info(f"pfx directory exists: {pfx_exists}")
if result.stderr:
logger.debug(f"stderr: {result.stderr.strip()}")
success = prefix_created and pfx_exists
if success:
logger.info(f"Prefix created successfully at: {prefix_path}")
return prefix_path
else:
logger.error("Failed to create prefix")
return None
except subprocess.TimeoutExpired:
logger.warning("Command timed out, but this might be normal")
# Check if prefix was created despite timeout
prefix_created = prefix_path.exists()
pfx_exists = (prefix_path / "pfx").exists()
if prefix_created and pfx_exists:
logger.info(f"Prefix created successfully despite timeout at: {prefix_path}")
return prefix_path
else:
logger.error("No prefix created")
return None
except Exception as e:
logger.error(f"Error creating prefix: {e}")
return None
def _get_compatdata_path_for_appid(self, appid: int) -> Optional[Path]:
"""
Get the compatdata path for a given AppID.

View File

@@ -151,6 +151,7 @@ class GameUtilsMixin:
game_dir_names = {
"skyrim": "Skyrim Special Edition",
"fnv": "FalloutNV",
"fo3": "Fallout3",
"fo4": "Fallout4",
"oblivion": "Oblivion",
"oblivion_remastered": "Oblivion Remastered",

View File

@@ -7,15 +7,6 @@ import vdf
logger = logging.getLogger(__name__)
def debug_print(message):
"""Log debug message only if debug mode is enabled"""
from jackify.backend.handlers.config_handler import ConfigHandler
config_handler = ConfigHandler()
if config_handler.get('debug_mode', False):
logger.debug(message)
class ProtonOperationsMixin:
"""Mixin providing Proton and compatibility tool methods for AutomatedPrefixService."""
@@ -112,7 +103,7 @@ class ProtonOperationsMixin:
# STL sets the compatibility tool in config.vdf, not shortcuts.vdf
# We know this works from manual testing, so just log that we're skipping this check
logger.info(f"Skipping Proton version check for '{shortcut_name}' - STL handles this correctly")
debug_print(f"[DEBUG] Skipping Proton version check for '{shortcut_name}' - STL handles this correctly")
logger.debug(f"[DEBUG] Skipping Proton version check for '{shortcut_name}' - STL handles this correctly")
def set_proton_version_for_shortcut(self, appid: int, proton_version: str) -> bool:
"""
@@ -165,7 +156,7 @@ class ProtonOperationsMixin:
os.fsync(f.fileno()) if hasattr(f, 'fileno') else None
logger.info(f"Set Proton version {proton_version} for AppID {appid}")
debug_print(f"[DEBUG] Set Proton version {proton_version} for AppID {appid} in config.vdf")
logger.debug(f"[DEBUG] Set Proton version {proton_version} for AppID {appid} in config.vdf")
# Small delay to ensure filesystem write completes
import time
@@ -175,7 +166,7 @@ class ProtonOperationsMixin:
with open(config_path, 'r') as f:
verify_data = vdf.load(f)
compat_mapping = verify_data.get('Software', {}).get('Valve', {}).get('Steam', {}).get('CompatToolMapping', {}).get(str(appid))
debug_print(f"[DEBUG] Verification: AppID {appid} -> {compat_mapping}")
logger.debug(f"[DEBUG] Verification: AppID {appid} -> {compat_mapping}")
return True
@@ -324,14 +315,14 @@ class ProtonOperationsMixin:
config_data['Software']['Valve']['Steam']['CompatToolMapping'][str(unsigned_appid)] = compat_entry
logger.info(f"Added compatibility tool entry: {str(unsigned_appid)} -> {compat_tool}")
debug_print(f"[DEBUG] Added compatibility tool entry: {str(unsigned_appid)} -> {compat_tool}")
logger.debug(f"[DEBUG] Added compatibility tool entry: {str(unsigned_appid)} -> {compat_tool}")
# Write back to file (text format)
with open(config_path, 'w') as f:
vdf.dump(config_data, f)
logger.info(f"Set compatibility tool STL-style: AppID {unsigned_appid} -> {compat_tool}")
debug_print(f"[DEBUG] Set compatibility tool STL-style: AppID {unsigned_appid} -> {compat_tool}")
logger.debug(f"[DEBUG] Set compatibility tool STL-style: AppID {unsigned_appid} -> {compat_tool}")
return True
@@ -564,7 +555,7 @@ class ProtonOperationsMixin:
f.writelines(lines)
logger.info(f"Updated localconfig.vdf: Signed AppID {signed_appid_int} -> OverlayAppEnable=1, DisableLaunchInVR=1")
debug_print(f"[DEBUG] Updated localconfig.vdf: Signed AppID {signed_appid_int} -> OverlayAppEnable=1, DisableLaunchInVR=1")
logger.debug(f"[DEBUG] Updated localconfig.vdf: Signed AppID {signed_appid_int} -> OverlayAppEnable=1, DisableLaunchInVR=1")
return True

View File

@@ -72,7 +72,12 @@ class RegistryOperationsMixin:
return False
def _apply_universal_dotnet_fixes(self, modlist_compatdata_path: str):
"""Apply universal dotnet4.x compatibility registry fixes to ALL modlists"""
"""Apply universal dotnet4.x compatibility registry fixes to ALL modlists.
Direct file editing is preferred over `wine reg add` — faster, no Wine
process overhead, and works even when Proton isn't on PATH. Falls back
to subprocess wine reg add when the reg files haven't been created yet.
"""
try:
prefix_path = os.path.join(modlist_compatdata_path, "pfx")
if not os.path.exists(prefix_path):
@@ -81,59 +86,99 @@ class RegistryOperationsMixin:
logger.info("Applying universal dotnet4.x compatibility registry fixes...")
# Find the appropriate Wine binary to use for registry operations
user_reg = os.path.join(prefix_path, "user.reg")
system_reg = os.path.join(prefix_path, "system.reg")
fix1 = fix2 = False
if os.path.exists(user_reg):
fix1 = self._reg_set_value(
user_reg,
"[Software\\\\Wine\\\\DllOverrides]",
'"*mscoree"',
'"native"',
)
if os.path.exists(system_reg):
fix2 = self._reg_set_value(
system_reg,
"[Software\\\\Microsoft\\\\.NETFramework]",
'"OnlyUseLatestCLR"',
"dword:00000001",
)
if fix1 and fix2:
logger.info("Universal dotnet4.x compatibility fixes applied via direct reg file editing")
return True
# Fall back to wine reg add when reg files are not present yet
logger.debug("Reg files not ready; falling back to wine reg add")
wine_binary = self._find_wine_binary_for_registry(modlist_compatdata_path)
if not wine_binary:
logger.error("Could not find Wine binary for registry operations")
logger.error("Could not find Wine binary for registry fallback")
return False
# Set environment for Wine registry operations
env = os.environ.copy()
env['WINEPREFIX'] = prefix_path
env['WINEDEBUG'] = '-all' # Suppress Wine debug output
env['WINEDEBUG'] = '-all'
# Registry fix 1: Set *mscoree=native DLL override (asterisk for full override)
# Use native .NET runtime instead of Wine's
logger.debug("Setting *mscoree=native DLL override...")
cmd1 = [
wine_binary, 'reg', 'add',
'HKEY_CURRENT_USER\\Software\\Wine\\DllOverrides',
'/v', '*mscoree', '/t', 'REG_SZ', '/d', 'native', '/f'
]
r1 = subprocess.run(
[wine_binary, 'reg', 'add',
'HKEY_CURRENT_USER\\Software\\Wine\\DllOverrides',
'/v', '*mscoree', '/t', 'REG_SZ', '/d', 'native', '/f'],
env=env, capture_output=True, text=True, errors='replace',
)
r2 = subprocess.run(
[wine_binary, 'reg', 'add',
'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\.NETFramework',
'/v', 'OnlyUseLatestCLR', '/t', 'REG_DWORD', '/d', '1', '/f'],
env=env, capture_output=True, text=True, errors='replace',
)
result1 = subprocess.run(cmd1, env=env, capture_output=True, text=True, errors='replace')
if result1.returncode == 0:
logger.info("Successfully applied *mscoree=native DLL override")
ok = r1.returncode == 0 and r2.returncode == 0
if ok:
logger.info("Universal dotnet4.x fixes applied via wine reg add fallback")
else:
logger.warning(f"Failed to set *mscoree DLL override: {result1.stderr}")
# Registry fix 2: Set OnlyUseLatestCLR=1
# Use latest CLR to avoid .NET version conflicts
logger.debug("Setting OnlyUseLatestCLR=1 registry entry...")
cmd2 = [
wine_binary, 'reg', 'add',
'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\.NETFramework',
'/v', 'OnlyUseLatestCLR', '/t', 'REG_DWORD', '/d', '1', '/f'
]
result2 = subprocess.run(cmd2, env=env, capture_output=True, text=True, errors='replace')
if result2.returncode == 0:
logger.info("Successfully applied OnlyUseLatestCLR=1 registry entry")
else:
logger.warning(f"Failed to set OnlyUseLatestCLR: {result2.stderr}")
# Both fixes applied - this should eliminate dotnet4.x installation requirements
if result1.returncode == 0 and result2.returncode == 0:
logger.info("Universal dotnet4.x compatibility fixes applied successfully")
return True
else:
logger.warning("Some dotnet4.x registry fixes failed, but continuing...")
return False
logger.warning("Some dotnet4.x registry fixes failed")
return ok
except Exception as e:
logger.error(f"Failed to apply universal dotnet4.x fixes: {e}")
return False
def _reg_set_value(self, reg_path: str, section: str, key: str, value: str) -> bool:
"""Set or add a key=value pair in a Wine .reg text file."""
try:
with open(reg_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
in_section = False
updated = False
for i, line in enumerate(lines):
stripped = line.strip()
if stripped.lower() == section.lower():
in_section = True
elif stripped.startswith('[') and in_section:
# Reached next section without finding key; insert before it
lines.insert(i, f'{key}={value}\n')
updated = True
break
elif in_section and stripped.startswith(key.lower()) or (in_section and stripped.lower().startswith(key.lower())):
lines[i] = f'{key}={value}\n'
updated = True
break
if not updated:
if not in_section:
lines.append(f'\n{section}\n')
lines.append(f'{key}={value}\n')
with open(reg_path, 'w', encoding='utf-8') as f:
f.writelines(lines)
return True
except Exception as e:
logger.debug(f"_reg_set_value failed for {reg_path}: {e}")
return False
def _find_wine_binary_for_registry(self, modlist_compatdata_path: str) -> Optional[str]:
"""Find the appropriate Wine binary for registry operations"""
try:
@@ -227,8 +272,41 @@ class RegistryOperationsMixin:
logger.debug(f"Error during recursive wine search in {proton_path}: {e}")
return None
def _create_canonical_game_symlink(self, pfx_path: Path, real_game_path: str) -> bool:
"""Symlink the real game dir into the prefix at the canonical Windows Steam path.
The Bethesda launcher validates that Installed Path looks like a proper
Windows Steam path (C:\\Program Files...). A raw Z:\\ or D:\\ path passes
the existence check on the user's own machine but fails for other users
whose Wine path translation differs. By symlinking the real directory into
drive_c/Program Files (x86)/Steam/steamapps/common/, we write a canonical
C:\\ path to the registry that satisfies the launcher, while NVSE follows
the symlink to reach the actual executable.
"""
try:
real_path = Path(real_game_path)
game_dir_name = real_path.name
symlink_parent = pfx_path / "drive_c" / "Program Files (x86)" / "Steam" / "steamapps" / "common"
symlink_parent.mkdir(parents=True, exist_ok=True)
symlink_path = symlink_parent / game_dir_name
if symlink_path.is_symlink():
symlink_path.unlink()
elif symlink_path.exists():
logger.warning(f"Real directory already exists at symlink target {symlink_path}, skipping")
return False
symlink_path.symlink_to(real_path)
logger.info(f"Created game symlink: {symlink_path} -> {real_path}")
return True
except Exception as e:
logger.warning(f"Failed to create canonical game symlink: {e}")
return False
def _inject_game_registry_entries(self, modlist_compatdata_path: str, special_game_type: str):
"""Detect and inject FNV/Enderal game paths and apply universal dotnet4.x compatibility fixes"""
"""Detect and inject FNV/FO3/Enderal game paths into the modlist prefix registry."""
system_reg_path = os.path.join(modlist_compatdata_path, "pfx", "system.reg")
if not os.path.exists(system_reg_path):
logger.warning("system.reg not found, skipping game path injection")
@@ -236,41 +314,74 @@ class RegistryOperationsMixin:
logger.info("Detecting game registry entries...")
# Universal dotnet4.x registry fixes applied in modlist_handler.py after .reg downloads
# Game configurations
games_config = {
"22380": { # Fallout New Vegas AppID
"name": "Fallout New Vegas",
"common_names": ["Fallout New Vegas", "FalloutNV"],
"registry_section": "[Software\\\\Wow6432Node\\\\bethesda softworks\\\\falloutnv]",
"path_key": "Installed Path"
"path_key": "Installed Path",
},
"22300": { # Fallout 3 AppID
"name": "Fallout 3",
"common_names": ["Fallout 3", "Fallout3", "Fallout 3 GOTY"],
"registry_section": "[Software\\\\Wow6432Node\\\\bethesda softworks\\\\fallout3]",
"path_key": "Installed Path",
},
"22370": { # Fallout 3 GOTY AppID alias
"name": "Fallout 3",
"common_names": ["Fallout 3 GOTY", "Fallout 3"],
"registry_section": "[Software\\\\Wow6432Node\\\\bethesda softworks\\\\fallout3]",
"path_key": "Installed Path",
},
"976620": { # Enderal Special Edition AppID
"name": "Enderal",
"common_names": ["Enderal: Forgotten Stories (Special Edition)", "Enderal Special Edition", "Enderal"],
"registry_section": "[Software\\\\Wow6432Node\\\\SureAI\\\\Enderal SE]",
"path_key": "installed path"
}
"registry_section": "[Software\\\\Wow6432Node\\\\SureAI\\\\Enderal SE]",
"path_key": "installed path",
},
}
# Detect and inject each game
pfx_path = Path(modlist_compatdata_path) / "pfx"
for app_id, config in games_config.items():
game_path = self._find_steam_game(app_id, config["common_names"])
if game_path:
logger.info(f"Detected {config['name']} at: {game_path}")
if not game_path:
logger.debug(f"{config['name']} not found in Steam libraries")
continue
logger.info(f"Detected {config['name']} at: {game_path}")
# Create a symlink inside the prefix at the canonical Windows Steam path so the
# Bethesda launcher sees a proper C:\ path while NVSE can still resolve the exe.
symlink_ok = self._create_canonical_game_symlink(pfx_path, game_path)
if symlink_ok:
game_dir_name = Path(game_path).name
canonical_win_path = f"C:\\Program Files (x86)\\Steam\\steamapps\\common\\{game_dir_name}"
wine_val = canonical_win_path.replace("\\", "\\\\") + "\\\\"
success = self._reg_set_value(
system_reg_path,
config["registry_section"],
f'"{config["path_key"]}"',
f'"{wine_val}"',
)
if success:
logger.info(f"Registry set to canonical path for {config['name']}: {canonical_win_path}")
else:
logger.warning(f"Failed to set canonical registry path for {config['name']}")
else:
# Symlink failed — fall back to writing the real Z:/D: path
logger.warning(f"Symlink failed for {config['name']}, writing real path to registry")
success = self._update_registry_path(
system_reg_path,
config["registry_section"],
config["registry_section"],
config["path_key"],
game_path
)
if success:
logger.info(f"Updated registry entry for {config['name']}")
logger.info(f"Updated registry entry for {config['name']} (real path fallback)")
else:
logger.warning(f"Failed to update registry entry for {config['name']}")
else:
logger.debug(f"{config['name']} not found in Steam libraries")
logger.info("Game registry injection completed")

View File

@@ -16,13 +16,6 @@ import vdf
logger = logging.getLogger(__name__)
def debug_print(message):
"""Log debug message only if debug mode is enabled"""
from jackify.backend.handlers.config_handler import ConfigHandler
config_handler = ConfigHandler()
if config_handler.get('debug_mode', False):
logger.debug(message)
from .automated_prefix_shortcuts import ShortcutOperationsMixin
from .automated_prefix_proton import ProtonOperationsMixin
from .automated_prefix_creation import PrefixCreationMixin
@@ -170,7 +163,6 @@ exit"""
logger.error(f"Error getting config path: {e}")
return None
def kill_running_processes(self) -> bool:
"""
Kill any running processes that might interfere with prefix creation.

View File

@@ -11,15 +11,6 @@ from .automated_prefix_shortcuts_cleanup import AutomatedPrefixShortcutsCleanupM
logger = logging.getLogger(__name__)
def debug_print(message):
"""Log debug message only if debug mode is enabled"""
from jackify.backend.handlers.config_handler import ConfigHandler
config_handler = ConfigHandler()
if config_handler.get('debug_mode', False):
logger.debug(message)
class ShortcutOperationsMixin(AutomatedPrefixShortcutsCleanupMixin):
"""Mixin providing shortcut operation methods for AutomatedPrefixService."""
@@ -148,10 +139,10 @@ class ShortcutOperationsMixin(AutomatedPrefixShortcutsCleanupMixin):
True if successful, False otherwise
"""
try:
debug_print(f"[DEBUG] create_shortcut_directly called for '{shortcut_name}' - this is the fallback method")
logger.debug(f"[DEBUG] create_shortcut_directly called for '{shortcut_name}' - this is the fallback method")
shortcuts_path = self._get_shortcuts_path()
if not shortcuts_path:
debug_print("[DEBUG] No shortcuts path found")
logger.debug("[DEBUG] No shortcuts path found")
return False
# Read current shortcuts
@@ -207,191 +198,6 @@ class ShortcutOperationsMixin(AutomatedPrefixShortcutsCleanupMixin):
logger.error(f"Error creating shortcut directly: {e}")
return False
def create_shortcut_directly_with_proton(self, shortcut_name: str, exe_path: str, modlist_install_dir: str) -> bool:
"""
Create a Steam shortcut with temporary batch file for invisible prefix creation.
This uses the CRC32-based AppID calculation for predictable results.
Args:
shortcut_name: Name for the shortcut
exe_path: Path to the final ModOrganizer.exe executable
modlist_install_dir: Directory where the modlist is installed
Returns:
True if successful, False otherwise
"""
try:
debug_print(f"[DEBUG] create_shortcut_directly_with_proton called for '{shortcut_name}' - using temporary batch file approach")
shortcuts_path = self._get_shortcuts_path()
if not shortcuts_path:
debug_print("[DEBUG] No shortcuts path found")
return False
# Calculate predictable AppID using CRC32 (based on FINAL exe_path)
from zlib import crc32
combined_string = exe_path + shortcut_name
crc = crc32(combined_string.encode('utf-8'))
appid = -(crc & 0x7FFFFFFF) # Make it negative and within 32-bit range (like other shortcuts)
debug_print(f"[DEBUG] Calculated AppID: {appid} from '{combined_string}'")
# Create temporary batch file for invisible prefix creation
batch_content = """@echo off
echo Creating Proton prefix...
timeout /t 3 /nobreak >nul
echo Prefix creation complete.
"""
from jackify.shared.paths import get_jackify_data_dir
batch_path = get_jackify_data_dir() / "temp_prefix_creation.bat"
batch_path.parent.mkdir(parents=True, exist_ok=True)
with open(batch_path, 'w') as f:
f.write(batch_content)
debug_print(f"[DEBUG] Created temporary batch file: {batch_path}")
# Read current shortcuts
with open(shortcuts_path, 'rb') as f:
shortcuts_data = vdf.binary_load(f)
shortcuts = shortcuts_data.get('shortcuts', {})
# Check if shortcut already exists (idempotent)
found = False
new_shortcuts_list = []
shortcuts_list = list(shortcuts.values())
for shortcut in shortcuts_list:
if shortcut.get('AppName') == shortcut_name:
debug_print(f"[DEBUG] Updating existing shortcut for '{shortcut_name}'")
# Update existing shortcut with temporary batch file
shortcut.update({
'Exe': f'"{batch_path}"', # Point to temporary batch file
'StartDir': f'"{batch_path.parent}"', # Batch file directory
'appid': appid,
'LaunchOptions': '', # Empty like working shortcuts
'tags': {}, # Empty tags like working shortcuts
'CompatTool': 'proton_experimental' # Set Proton version directly in shortcut
})
new_shortcuts_list.append(shortcut)
found = True
else:
new_shortcuts_list.append(shortcut)
if not found:
debug_print(f"[DEBUG] Creating new shortcut for '{shortcut_name}'")
# Create new shortcut entry pointing to temporary batch file
new_shortcut = {
'AppName': shortcut_name,
'Exe': f'"{batch_path}"', # Point to temporary batch file
'StartDir': f'"{batch_path.parent}"', # Batch file directory
'appid': appid,
'icon': '',
'ShortcutPath': '',
'LaunchOptions': '', # Empty like working shortcuts
'IsHidden': 0,
'AllowDesktopConfig': 1,
'AllowOverlay': 1,
'OpenVR': 0,
'Devkit': 0,
'DevkitGameID': '',
'LastPlayTime': 0,
'FlatpakAppID': '',
'tags': {}, # Empty tags like working shortcuts
'sortas': '',
'CompatTool': 'proton_experimental' # Set Proton version directly in shortcut
}
new_shortcuts_list.append(new_shortcut)
# Rebuild shortcuts dict with new order
shortcuts_data['shortcuts'] = {str(i): s for i, s in enumerate(new_shortcuts_list)}
# Write back to file
with open(shortcuts_path, 'wb') as f:
vdf.binary_dump(shortcuts_data, f)
logger.info(f"Created/updated shortcut with temporary batch file: {shortcut_name} with AppID {appid}")
debug_print(f"[DEBUG] Shortcut created/updated with temporary batch file, AppID {appid}")
# Set Proton version in config.vdf BEFORE creating shortcut
if self.set_proton_version_for_shortcut(appid, 'proton_experimental'):
logger.info(f"Set Proton Experimental for shortcut {shortcut_name}")
return True
else:
logger.warning(f"Failed to set Proton version for shortcut {shortcut_name}")
return False
except Exception as e:
logger.error(f"Error creating shortcut with temporary batch file: {e}")
return False
def replace_shortcut_with_final_exe(self, shortcut_name: str, final_exe_path: str, modlist_install_dir: str) -> bool:
"""
Replace the temporary batch file shortcut with the final ModOrganizer.exe.
This should be called after the prefix has been created.
Args:
shortcut_name: Name of the shortcut to update
final_exe_path: Path to the final ModOrganizer.exe executable
modlist_install_dir: Directory where the modlist is installed
Returns:
True if successful, False otherwise
"""
try:
debug_print(f"[DEBUG] replace_shortcut_with_final_exe called for '{shortcut_name}'")
shortcuts_path = self._get_shortcuts_path()
if not shortcuts_path:
debug_print("[DEBUG] No shortcuts path found")
return False
# Read current shortcuts
with open(shortcuts_path, 'rb') as f:
shortcuts_data = vdf.binary_load(f)
shortcuts = shortcuts_data.get('shortcuts', {})
# Find and update the shortcut
found = False
new_shortcuts_list = []
shortcuts_list = list(shortcuts.values())
for shortcut in shortcuts_list:
if shortcut.get('AppName') == shortcut_name:
debug_print(f"[DEBUG] Replacing temporary batch file with final exe for '{shortcut_name}'")
# Update shortcut to point to final ModOrganizer.exe
shortcut.update({
'Exe': f'"{final_exe_path}"', # Point to final ModOrganizer.exe
'StartDir': modlist_install_dir, # ModOrganizer directory
'LaunchOptions': '', # Empty like working shortcuts
'tags': {}, # Empty tags like working shortcuts
# Keep existing appid and CompatibilityTool
})
new_shortcuts_list.append(shortcut)
found = True
else:
new_shortcuts_list.append(shortcut)
if not found:
logger.error(f"Shortcut '{shortcut_name}' not found for replacement")
return False
# Rebuild shortcuts dict with new order
shortcuts_data['shortcuts'] = {str(i): s for i, s in enumerate(new_shortcuts_list)}
# Write back to file
with open(shortcuts_path, 'wb') as f:
vdf.binary_dump(shortcuts_data, f)
logger.info(f"Replaced shortcut with final exe: {shortcut_name}")
debug_print(f"[DEBUG] Shortcut replaced with final ModOrganizer.exe")
return True
except Exception as e:
logger.error(f"Error replacing shortcut with final exe: {e}")
return False
def modify_shortcut_to_final_exe(self, shortcut_name: str, final_exe_path: str,
final_start_dir: str) -> bool:
"""

View File

@@ -3,21 +3,10 @@ from pathlib import Path
from typing import Optional, Union, List, Dict, Tuple
import logging
import os
import time
import subprocess
import vdf
logger = logging.getLogger(__name__)
def debug_print(message):
"""Log debug message only if debug mode is enabled"""
from jackify.backend.handlers.config_handler import ConfigHandler
config_handler = ConfigHandler()
if config_handler.get('debug_mode', False):
logger.debug(message)
class WorkflowMixin:
"""Mixin providing workflow methods for AutomatedPrefixService."""
@@ -110,166 +99,6 @@ class WorkflowMixin:
return message
def run_complete_workflow(self, shortcut_name: str, modlist_install_dir: str,
final_exe_path: str, progress_callback=None) -> Tuple[bool, Optional[Path], Optional[int]]:
"""
Run the simple automated prefix creation workflow.
Args:
shortcut_name: Name for the Steam shortcut
modlist_install_dir: Directory where the modlist is installed
final_exe_path: Path to ModOrganizer.exe
Returns:
Tuple of (success, prefix_path, appid)
"""
debug_print(f"[DEBUG] run_complete_workflow called with shortcut_name={shortcut_name}, modlist_install_dir={modlist_install_dir}, final_exe_path={final_exe_path}")
logger.info("Starting simple automated prefix creation workflow")
# Initialize shared timing to continue from jackify-engine
from jackify.shared.timing import initialize_from_console_output
# TODO: Pass console output if available to continue timeline
initialize_from_console_output()
# Show immediate feedback to user
if progress_callback:
progress_callback("Starting automated Steam setup...")
try:
# Step 1: Create shortcut directly (NO STL needed!)
logger.info("Step 1: Creating shortcut directly to ModOrganizer.exe")
if progress_callback:
progress_callback("Creating Steam shortcut...")
if not self.create_shortcut_directly_with_proton(shortcut_name, final_exe_path, modlist_install_dir):
logger.error("Failed to create shortcut directly")
return False, None, None, None
if progress_callback:
progress_callback(f"{self._get_progress_timestamp()} Steam shortcut created successfully")
logger.info("Step 1 completed: Shortcut created directly")
# Step 2: Calculate the predictable AppID and rungameid
logger.info("Step 2: Calculating predictable AppID")
if progress_callback:
progress_callback("Calculating AppID...")
# Calculate AppID using the same method as create_shortcut_directly_with_proton
from zlib import crc32
combined_string = final_exe_path + shortcut_name
crc = crc32(combined_string.encode('utf-8'))
initial_appid = -(crc & 0x7FFFFFFF) # Make it negative and within 32-bit range
# Calculate rungameid for launching
rungameid = (initial_appid << 32) | 0x02000000
# Convert AppID to positive prefix ID
expected_prefix_id = str(abs(initial_appid))
if progress_callback:
progress_callback("AppID calculated")
logger.info(f"Step 2 completed: AppID = {initial_appid}, rungameid = {rungameid}, expected_prefix_id = {expected_prefix_id}")
# Step 3: Restart Steam
logger.info("Step 3: Restarting Steam")
if progress_callback:
progress_callback(f"{self._get_progress_timestamp()} Restarting Steam...")
if not self.restart_steam():
logger.error("Failed to restart Steam")
return False, None, None, None
if progress_callback:
progress_callback(f"{self._get_progress_timestamp()} Steam restarted successfully")
logger.info("Step 3 completed: Steam restarted")
# Step 4: Launch temporary batch file to create prefix invisibly
logger.info("Step 4: Launching temporary batch file to create prefix")
debug_print(f"[DEBUG] About to launch temporary batch file with rungameid={rungameid}")
# Launch using rungameid (this will run the batch file invisibly)
try:
result = subprocess.run(['steam', f'steam://rungameid/{rungameid}'],
capture_output=True, text=True, timeout=5)
debug_print(f"[DEBUG] Launch result: return_code={result.returncode}")
if result.returncode != 0:
logger.error(f"Failed to launch temporary batch file: {result.stderr}")
return False, None, None, None
except subprocess.TimeoutExpired:
debug_print("[DEBUG] Launch timed out (expected)")
except Exception as e:
logger.error(f"Error launching temporary batch file: {e}")
return False, None, None, None
if progress_callback:
progress_callback(f"{self._get_progress_timestamp()} Temporary batch file launched")
logger.info("Step 4 completed: Temporary batch file launched")
# Step 5: Wait for temporary batch file to complete (invisible)
logger.info("Step 5: Waiting for temporary batch file to complete")
if progress_callback:
progress_callback(f"{self._get_progress_timestamp()} Creating Proton prefix (please wait)...")
# Wait for batch file to complete (3 seconds + buffer)
time.sleep(5)
logger.info("Step 5 completed: Temporary batch file completed")
# Step 6: Verify prefix was created
logger.info("Step 6: Verifying prefix creation")
if progress_callback:
progress_callback(f"{self._get_progress_timestamp()} Verifying prefix creation...")
compatdata_path = Path.home() / ".local/share/Steam/steamapps/compatdata" / expected_prefix_id
if not compatdata_path.exists():
logger.error(f"Prefix not found at {compatdata_path}")
return False, None, None, None
logger.info(f"Step 6 completed: Prefix verified at {compatdata_path}")
# Step 7: Replace temporary batch file with final ModOrganizer.exe
logger.info("Step 7: Replacing temporary batch file with final ModOrganizer.exe")
if progress_callback:
progress_callback(f"{self._get_progress_timestamp()} Updating shortcut...")
if not self.replace_shortcut_with_final_exe(shortcut_name, final_exe_path, modlist_install_dir):
logger.error("Failed to replace shortcut with final exe")
return False, None, None, None
logger.info("Step 7 completed: Shortcut updated with final ModOrganizer.exe")
# Step 8: Detect actual AppID using protontricks -l
logger.info("Step 8: Detecting actual AppID")
if progress_callback:
progress_callback(f"{self._get_progress_timestamp()} Detecting actual AppID...")
actual_appid = self.detect_actual_prefix_appid(initial_appid, shortcut_name)
if actual_appid is None:
logger.error("Failed to detect actual AppID")
return False, None, None, None
logger.info(f"Step 8 completed: Actual AppID = {actual_appid}")
# Step 9: Verify prefix was created successfully
logger.info("Step 9: Verifying prefix creation")
if progress_callback:
progress_callback(f"{self._get_progress_timestamp()} Verifying prefix creation...")
prefix_path = self._get_compatdata_path_for_appid(actual_appid)
if not prefix_path or not prefix_path.exists():
logger.error(f"Prefix path not found: {prefix_path}")
return False, None, None, None
if not self.verify_prefix_creation(prefix_path):
logger.error("Prefix verification failed")
return False, None, None, None
logger.info(f"Step 9 completed: Prefix verified at {prefix_path}")
if progress_callback:
progress_callback(f"{self._get_progress_timestamp()} Steam Configuration complete!")
# Show Proton override notification if applicable
self._show_proton_override_notification(progress_callback)
logger.info(" Simple automated prefix creation workflow completed successfully")
return True, prefix_path, actual_appid
except Exception as e:
logger.error(f"Error in automated prefix creation workflow: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
return False, None, None, None
def run_working_workflow(self, shortcut_name: str, modlist_install_dir: str,
final_exe_path: str, progress_callback=None, steamdeck: Optional[bool] = None,
download_dir=None, auto_restart: bool = True) -> Tuple[bool, Optional[Path], Optional[int], Optional[str]]:
@@ -323,9 +152,9 @@ class WorkflowMixin:
modlist_handler = ModlistHandler()
special_game_type = modlist_handler.detect_special_game_type(modlist_install_dir)
# No launch options needed - both FNV and Enderal use registry injection
# No launch options needed - FNV, FO3 and Enderal use registry injection
custom_launch_options = None
if special_game_type in ["fnv", "enderal"]:
if special_game_type in ["fnv", "fo3", "enderal"]:
logger.info(f"Using registry injection approach for {special_game_type.upper()} modlist")
else:
logger.debug("Standard modlist - no special game handling needed")
@@ -372,7 +201,8 @@ class WorkflowMixin:
)
if not success:
logger.error("Failed to create shortcut with native Steam service")
return False, None, None, None
from jackify.shared.errors import shortcut_write_failed
raise shortcut_write_failed("create_shortcut_with_native_service returned failure")
logger.info(f"Step 1 completed: Shortcut created with native service, AppID: {appid}")
if progress_callback:
@@ -398,7 +228,8 @@ class WorkflowMixin:
logger.info("Step 2: restart_steam() returned %s", restart_ok)
if not restart_ok:
logger.error("Failed to start Steam")
return False, None, None, None
from jackify.shared.errors import steam_restart_failed
raise steam_restart_failed("Steam did not come back within the expected time")
logger.info("Step 2 completed: Steam started")
if progress_callback:
@@ -415,7 +246,8 @@ class WorkflowMixin:
if not self.create_prefix_with_proton_wrapper(appid):
logger.error("Failed to create Proton prefix")
return False, None, None, None
from jackify.shared.errors import prefix_creation_failed
raise prefix_creation_failed("create_prefix_with_proton_wrapper returned failure")
logger.info("Step 3 completed: Proton prefix created")
if progress_callback:
@@ -437,7 +269,7 @@ class WorkflowMixin:
# Get prefix path (needed for logging regardless of game type)
prefix_path = self.get_prefix_path(appid)
if special_game_type in ["fnv", "enderal"]:
if special_game_type in ["fnv", "fo3", "enderal"]:
logger.info(f"Step 5: Injecting {special_game_type.upper()} game registry entries")
if progress_callback:
progress_callback(f"{self._get_progress_timestamp()} Injecting {special_game_type.upper()} game registry entries...")
@@ -448,8 +280,6 @@ class WorkflowMixin:
logger.warning("Could not find prefix path for registry injection")
else:
logger.info("Step 5: Skipping registry injection for standard modlist")
if progress_callback:
progress_callback(f"{self._get_progress_timestamp()} No special game registry injection needed")
# Step 5.5: Pre-create game-specific directories for all modlists
logger.info(f"Step 5.5: Creating game-specific user directories")
@@ -477,10 +307,13 @@ class WorkflowMixin:
return True, prefix_path, appid, last_timestamp
except Exception as e:
logger.error(f"Error in working workflow: {e}")
logger.error(f"Error in working workflow: {e}", exc_info=True)
if progress_callback:
progress_callback(f"Error: {str(e)}")
return False, None, None, None
from jackify.shared.errors import JackifyError, prefix_creation_failed
if isinstance(e, JackifyError):
raise
raise prefix_creation_failed(str(e)) from e
def continue_workflow_after_conflict_resolution(self, shortcut_name: str, modlist_install_dir: str,
final_exe_path: str, appid: int, progress_callback=None) -> Tuple[bool, Optional[Path], Optional[int]]:
@@ -520,7 +353,8 @@ class WorkflowMixin:
if not self.create_prefix_with_proton_wrapper(appid):
logger.error("Failed to create Proton prefix")
return False, None, None, None
from jackify.shared.errors import prefix_creation_failed
raise prefix_creation_failed("create_prefix_with_proton_wrapper returned failure")
logger.info("Step 3 completed: Proton prefix created")
if progress_callback:

View File

@@ -0,0 +1,166 @@
"""
MO2 Setup Service
Downloads and configures a standalone Mod Organizer 2 instance:
- Fetches latest release from GitHub
- Extracts with 7z
- Creates a Steam shortcut and Proton prefix via AutomatedPrefixService
"""
import re
import shutil
import logging
import subprocess
from pathlib import Path
from typing import Callable, Optional, Tuple
import requests
logger = logging.getLogger(__name__)
def _is_dangerous_path(path: Path) -> bool:
home = Path.home().resolve()
dangerous = [Path('/'), Path('/home'), Path('/root'), home]
return any(path.resolve() == d for d in dangerous)
class MO2SetupService:
"""Download, extract, and configure a standalone MO2 instance."""
GITHUB_API = "https://api.github.com/repos/ModOrganizer2/modorganizer/releases/latest"
ASSET_PATTERN = re.compile(r"Mod\.Organizer-\d+\.\d+(\.\d+)?\.7z$")
def setup_mo2(
self,
install_dir: Path,
shortcut_name: str = "Mod Organizer 2",
progress_callback: Optional[Callable[[str], None]] = None,
should_cancel: Optional[Callable[[], bool]] = None,
) -> Tuple[bool, Optional[int], Optional[str]]:
"""
Download, extract, and configure MO2.
Returns (success, app_id, error_message).
"""
def _progress(msg: str):
logger.info(msg)
if progress_callback:
progress_callback(msg)
def _cancel_requested() -> bool:
try:
return bool(should_cancel and should_cancel())
except Exception:
return False
if not shutil.which('7z'):
return False, None, "7z not found. Install p7zip-full (or equivalent) first."
if _is_dangerous_path(install_dir):
return False, None, f"Refusing to install to dangerous path: {install_dir}"
# Create directory
try:
install_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
return False, None, f"Could not create directory: {e}"
# Fetch release info
_progress("Fetching latest MO2 release info...")
if _cancel_requested():
return False, None, "MO2 setup cancelled."
try:
resp = requests.get(self.GITHUB_API, timeout=15, verify=True)
resp.raise_for_status()
release = resp.json()
except Exception as e:
return False, None, f"Failed to fetch MO2 release info: {e}"
# Find asset
asset = None
for a in release.get('assets', []):
if self.ASSET_PATTERN.match(a['name']):
asset = a
break
if not asset:
return False, None, "Could not find main MO2 .7z asset in latest release."
# Download
archive_path = install_dir / asset['name']
_progress(f"Downloading {asset['name']}...")
if _cancel_requested():
return False, None, "MO2 setup cancelled."
try:
with requests.get(asset['browser_download_url'], stream=True, timeout=120, verify=True) as r:
r.raise_for_status()
with open(archive_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
if _cancel_requested():
return False, None, "MO2 setup cancelled."
f.write(chunk)
except Exception as e:
return False, None, f"Failed to download MO2 archive: {e}"
# Extract
_progress(f"Extracting to {install_dir}...")
if _cancel_requested():
return False, None, "MO2 setup cancelled."
try:
result = subprocess.run(
['7z', 'x', str(archive_path), f'-o{install_dir}'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=1200,
)
if result.returncode != 0:
err = result.stderr.decode(errors='ignore')
return False, None, f"Extraction failed: {err}"
except Exception as e:
return False, None, f"Extraction failed: {e}"
# Validate
mo2_exe = install_dir / "ModOrganizer.exe"
if not mo2_exe.exists():
# MO2 release archives usually extract into a single top-level folder.
# Limit search depth to direct children to avoid expensive recursive scans.
mo2_exe = None
for child in install_dir.iterdir():
candidate = child / "ModOrganizer.exe"
if candidate.exists():
mo2_exe = candidate
break
if not mo2_exe:
return False, None, "ModOrganizer.exe not found after extraction."
# Cleanup archive
try:
archive_path.unlink()
except Exception:
pass
_progress(f"MO2 installed at: {mo2_exe.parent}")
# Set up Steam shortcut and Proton prefix
_progress("Creating Steam shortcut and Proton prefix...")
if _cancel_requested():
return False, None, "MO2 setup cancelled."
try:
from .automated_prefix_service import AutomatedPrefixService
svc = AutomatedPrefixService()
success, prefix_path, app_id, _last_ts = svc.run_working_workflow(
shortcut_name=shortcut_name,
modlist_install_dir=str(install_dir),
final_exe_path=str(mo2_exe),
progress_callback=_progress,
)
except Exception as e:
logger.error(f"AutomatedPrefixService failed: {e}")
return False, None, f"Prefix setup failed: {e}"
if not success:
return False, None, "Failed to create Steam shortcut or Proton prefix."
_progress("MO2 setup complete.")
return True, app_id, None

View File

@@ -246,6 +246,7 @@ class ModlistService(ModlistServiceInstallationMixin):
'appid': getattr(context, 'app_id', None), # Use updated app_id from Steam
'engine_installed': getattr(context, 'engine_installed', False), # Path manipulation flag
'download_dir': str(context.download_dir) if getattr(context, 'download_dir', None) else None,
'modlist_source': getattr(context, 'modlist_source', None),
}
debug_callback(f"Configuration context built: {config_context}")
@@ -479,4 +480,4 @@ class ModlistService(ModlistServiceInstallationMixin):
logger.error("Game type is required")
return False
return True
return True

View File

@@ -0,0 +1,98 @@
"""Nexus Premium status detection service."""
import time
import logging
from typing import Tuple, Optional
import requests
logger = logging.getLogger(__name__)
NEXUS_VALIDATE_URL = "https://api.nexusmods.com/v1/users/validate.json"
NEXUS_OAUTH_USERINFO_URL = "https://users.nexusmods.com/oauth/userinfo"
_CACHE_TTL_SECONDS = 3600
class NexusPremiumService:
"""Check and cache Nexus Premium status for the authenticated user."""
def check_premium_status(
self, auth_token: str, is_oauth: bool = False
) -> Tuple[bool, Optional[str]]:
"""
Query Nexus API for premium status.
Args:
auth_token: Nexus API key or OAuth access token.
is_oauth: True when auth_token is an OAuth Bearer token.
Returns:
(is_premium, username) — both None/False on failure.
"""
cached = self._read_cache(auth_token, is_oauth=is_oauth)
if cached is not None:
return cached
result = self._fetch(auth_token, is_oauth=is_oauth)
if result[1] is not None:
self._write_cache(auth_token, result, is_oauth=is_oauth)
return result
def _fetch(self, auth_token: str, is_oauth: bool = False) -> Tuple[bool, Optional[str]]:
try:
if is_oauth:
# OAuth path: userinfo endpoint returns membership_roles array.
# The validate endpoint is for API keys only.
resp = requests.get(
NEXUS_OAUTH_USERINFO_URL,
headers={"Authorization": f"Bearer {auth_token}", "Accept": "application/json"},
timeout=8,
)
resp.raise_for_status()
data = resp.json()
roles = data.get("membership_roles") or []
is_premium = "premium" in roles
username = data.get("name") or data.get("sub")
else:
resp = requests.get(
NEXUS_VALIDATE_URL,
headers={"apikey": auth_token, "Accept": "application/json"},
timeout=8,
)
resp.raise_for_status()
data = resp.json()
is_premium = bool(data.get("is_premium") or data.get("is_supporter"))
username = data.get("name")
logger.debug(f"Nexus user: {username}, premium={is_premium}, roles={data.get('membership_roles')}")
return is_premium, username
except Exception as e:
logger.debug(f"Nexus premium check failed: {e}")
return False, None
def _cache_key(self, token: str, is_oauth: bool = False) -> str:
suffix = "oauth" if is_oauth else "apikey"
return f"nexus_premium_cache_{token[:8]}_{suffix}"
def _read_cache(self, token: str, is_oauth: bool = False) -> Optional[Tuple[bool, Optional[str]]]:
try:
from jackify.backend.handlers.config_handler import ConfigHandler
cfg = ConfigHandler()
entry = cfg.get(self._cache_key(token, is_oauth))
if not entry:
return None
if time.time() - entry.get("ts", 0) > _CACHE_TTL_SECONDS:
return None
return entry["is_premium"], entry.get("username")
except Exception:
return None
def _write_cache(self, token: str, result: Tuple[bool, Optional[str]], is_oauth: bool = False) -> None:
try:
from jackify.backend.handlers.config_handler import ConfigHandler
cfg = ConfigHandler()
cfg.set(self._cache_key(token, is_oauth), {
"is_premium": result[0],
"username": result[1],
"ts": time.time(),
})
except Exception:
pass

View File

@@ -200,6 +200,52 @@ def is_flatpak_steam() -> bool:
return False
def ensure_flatpak_steam_filesystem_access(path: "Path") -> bool:
"""Grant Flatpak Steam filesystem access to the parent of the given path.
Safe to call on non-Flatpak systems — returns True immediately.
Skips if the path is already covered by an existing override.
Returns True if access was already present or successfully granted, False on error.
"""
from pathlib import Path as _Path
if not is_flatpak_steam():
return True
flatpak_cmd = _get_flatpak_command()
if not flatpak_cmd:
logger.warning("Flatpak Steam detected but flatpak command not found — cannot grant filesystem access")
return False
grant_path = str(_Path(path).parent)
env = _get_clean_subprocess_env()
try:
# Check existing overrides to avoid redundant changes
result = subprocess.run(
[flatpak_cmd, "override", "--user", "--show", "com.valvesoftware.Steam"],
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
text=True, timeout=10, env=env,
)
if result.returncode == 0:
for line in result.stdout.splitlines():
if "filesystems" in line.lower() and grant_path in line:
logger.debug(f"Flatpak Steam already has access to {grant_path}")
return True
except Exception as e:
logger.debug(f"Could not check existing Flatpak overrides: {e}")
try:
result = subprocess.run(
[flatpak_cmd, "override", "--user", f"--filesystem={grant_path}", "com.valvesoftware.Steam"],
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
text=True, timeout=15, env=env,
)
if result.returncode == 0:
logger.info(f"Granted Flatpak Steam filesystem access to {grant_path}")
return True
logger.warning(f"flatpak override failed (exit {result.returncode}): {result.stderr.strip()}")
return False
except Exception as e:
logger.warning(f"Failed to grant Flatpak Steam filesystem access: {e}")
return False
def _get_steam_executable(env=None):
"""Resolve steam executable path for native Steam. Prefer PATH, then common locations."""
env = env or os.environ

View File

@@ -31,6 +31,7 @@ class UpdateInfo:
release_date: str
changelog: str
download_url: str
source: str = "github"
file_size: Optional[int] = None
is_critical: bool = False
is_delta_update: bool = False
@@ -100,9 +101,17 @@ class UpdateService:
break
if download_url:
# Prefer Nexus CDN for Premium users when release embeds nexus_file_id
release_body = release_data.get('body', '')
nexus_url = self._try_nexus_download_url(release_body)
update_source = "github"
if nexus_url:
download_url = nexus_url
update_source = "nexus"
# Determine if this is a delta update
is_delta = '.delta' in download_url or 'delta' in download_url.lower()
# Safety checks to prevent segfault
try:
# Sanitize string fields
@@ -111,9 +120,9 @@ class UpdateService:
safe_date = str(release_data.get('published_at', ''))
safe_changelog = str(release_data.get('body', ''))[:1000] # Limit size
safe_url = str(download_url)
logger.debug(f"Creating UpdateInfo for version {safe_version}")
update_info = UpdateInfo(
version=safe_version,
tag_name=safe_tag,
@@ -121,7 +130,8 @@ class UpdateService:
changelog=safe_changelog,
download_url=safe_url,
file_size=file_size,
is_delta_update=is_delta
is_delta_update=is_delta,
source=update_source,
)
logger.debug(f"UpdateInfo created successfully")
@@ -142,6 +152,56 @@ class UpdateService:
logger.error(f"Unexpected error checking for updates: {e}")
return None
def _try_nexus_download_url(self, release_body: str) -> Optional[str]:
"""
If the user is Nexus Premium and the release body embeds nexus_file_id,
return a Nexus CDN download URL. Returns None on any failure.
Release body format expected:
nexus_mod_id: 12345
nexus_file_id: 67890
"""
import re
try:
mod_match = re.search(r'nexus_mod_id:\s*(\d+)', release_body, re.IGNORECASE)
file_match = re.search(r'nexus_file_id:\s*(\d+)', release_body, re.IGNORECASE)
if not file_match:
return None
nexus_file_id = int(file_match.group(1))
nexus_mod_id = int(mod_match.group(1)) if mod_match else None
from jackify.backend.services.nexus_auth_service import NexusAuthService
auth_service = NexusAuthService()
token = auth_service.get_auth_token()
if not token:
return None
from jackify.backend.services.nexus_premium_service import NexusPremiumService
is_premium, _ = NexusPremiumService().check_premium_status(token)
if not is_premium:
logger.debug("Nexus download skipped: user is not Premium")
return None
if nexus_mod_id is None:
return None
api_url = f"https://api.nexusmods.com/v1/games/site/mods/{nexus_mod_id}/files/{nexus_file_id}/download_link.json"
resp = requests.get(
api_url,
headers={"apikey": token, "Accept": "application/json"},
timeout=8,
)
resp.raise_for_status()
links = resp.json()
if isinstance(links, list) and links:
cdn_url = links[0].get("URI")
if cdn_url:
logger.debug(f"Using Nexus CDN URL for update")
return cdn_url
except Exception as e:
logger.debug(f"Nexus download URL lookup failed: {e}")
return None
def _is_newer_version(self, version: str) -> bool:
"""
Compare versions to determine if update is newer.

View File

@@ -114,7 +114,7 @@ def should_offer_vnv_automation(modlist_name: str, modlist_install_location: Opt
def run_vnv_automation_if_applicable(
modlist_name: str,
modlist_install_location: Path,
game_root: Path,
game_root: Optional[Path],
ttw_installer_path: Optional[Path] = None,
progress_callback: Optional[Callable[[str], None]] = None,
manual_file_callback: Optional[Callable[[str, str], Optional[Path]]] = None,
@@ -144,10 +144,27 @@ def run_vnv_automation_if_applicable(
logger.info(f"VNV detected: {modlist_name}")
# Resolve game root for Fallout New Vegas if caller didn't provide one.
# CLI flows may pass None and rely on auto-detection.
resolved_game_root = game_root
if resolved_game_root is None:
try:
from jackify.backend.handlers.path_handler import PathHandler
game_paths = PathHandler().find_vanilla_game_paths()
resolved_game_root = game_paths.get('Fallout New Vegas')
except Exception as detect_err:
logger.debug(f"VNV game root auto-detection failed: {detect_err}")
if resolved_game_root is None:
logger.warning("VNV detected but Fallout New Vegas game root could not be resolved")
if progress_callback:
progress_callback("VNV automation skipped: Fallout New Vegas path not found")
return False, None
# Initialize service
vnv_service = VNVPostInstallService(
modlist_install_location=modlist_install_location,
game_root=game_root,
game_root=resolved_game_root,
ttw_installer_path=ttw_installer_path
)

View File

@@ -16,7 +16,8 @@ from ..handlers.config_handler import ConfigHandler
from ..handlers.wine_utils import WineUtils
from .native_steam_service import NativeSteamService
from .steam_restart_service import (
start_steam, is_flatpak_steam, is_steam_deck, _get_clean_subprocess_env, robust_steam_restart
start_steam, is_flatpak_steam, is_steam_deck, _get_clean_subprocess_env, robust_steam_restart,
ensure_flatpak_steam_filesystem_access,
)
from .automated_prefix_service import AutomatedPrefixService
@@ -48,8 +49,14 @@ class WabbajackInstallerService:
if not steam_name.startswith('proton'):
steam_name = f"proton_{steam_name}"
return path, steam_name
path = self.handler.find_proton_experimental()
return path, "proton_experimental" if path else None
best = WineUtils.select_best_proton()
if best:
return Path(best['path']), best['steam_compat_name']
valve = WineUtils.select_best_valve_proton()
if valve:
return Path(valve['path']), valve.get('steam_compat_name', 'proton_experimental')
logger.error("No Proton version found")
return None, None
def install_wabbajack(
self,
@@ -93,6 +100,9 @@ class WabbajackInstallerService:
_is_steam_deck = is_steam_deck()
_is_flatpak = is_flatpak_steam()
if _is_flatpak:
ensure_flatpak_steam_filesystem_access(install_folder)
try:
# Step 1: Check requirements
update_progress("Checking requirements...", 1, 5)