From bd083f222730db3f24937d26dc4f0939a22fbc12 Mon Sep 17 00:00:00 2001 From: hariomphulre Date: Thu, 7 May 2026 02:57:52 +0530 Subject: [PATCH 1/3] Fix plugin loading type annotations and improve code readability --- A_Course_Wishes.cpp | 0 archinstall/lib/args.py | 4 ++-- archinstall/lib/plugins.py | 42 +++++++++++++++++++++++++++++--------- 3 files changed, 34 insertions(+), 12 deletions(-) create mode 100644 A_Course_Wishes.cpp diff --git a/A_Course_Wishes.cpp b/A_Course_Wishes.cpp new file mode 100644 index 0000000000..e69de29bb2 diff --git a/archinstall/lib/args.py b/archinstall/lib/args.py index 8d78f7e6d8..c67cbe4319 100644 --- a/archinstall/lib/args.py +++ b/archinstall/lib/args.py @@ -447,8 +447,8 @@ def _parse_args(self) -> Arguments: warn(f'Warning: --debug mode will write certain credentials to {logger.path}!') if args.plugin: - plugin_path = Path(args.plugin) - load_plugin(plugin_path) + # Pass plugin as string to preserve URL format (avoid Path normalization) + load_plugin(args.plugin) if args.creds_decryption_key is None: if os.environ.get('ARCHINSTALL_CREDS_DECRYPTION_KEY'): diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py index 919fcf2387..f83989262b 100644 --- a/archinstall/lib/plugins.py +++ b/archinstall/lib/plugins.py @@ -2,6 +2,7 @@ import importlib.util import os import sys +import urllib.error import urllib.parse import urllib.request from importlib import metadata @@ -34,21 +35,40 @@ def plugin(f, *args, **kwargs) -> None: # type: ignore[no-untyped-def] plugins[f.__name__] = f -def _localize_path(path: Path) -> Path: +def _localize_path(path: str | Path) -> Path: """ Support structures for load_plugin() """ - url = urllib.parse.urlparse(str(path)) + # Keep as string to preserve URL format (Path normalization breaks URLs) + path_str = str(path) + url = urllib.parse.urlparse(path_str) if url.scheme and url.scheme in ('https', 'http'): - converted_path = Path(f'/tmp/{path.stem}_{hashlib.md5(os.urandom(12)).hexdigest()}.py') + # Extract filename from the URL path component + # Use os.path.basename instead of path.stem to handle URLs correctly + url_path = url.path + filename = os.path.basename(url_path) if url_path else 'plugin' + # Remove .py extension if present for the temporary filename format + if filename.endswith('.py'): + filename_base = filename.replace('.py', '') + else: + filename_base = filename + + converted_path = Path(f'/tmp/{filename_base}_{hashlib.md5(os.urandom(12)).hexdigest()}.py') with open(converted_path, 'w') as temp_file: - temp_file.write(urllib.request.urlopen(url.geturl()).read().decode('utf-8')) + # Use the original path string directly instead of reconstructing from parsed URL + # This avoids issues with url.geturl() producing malformed URLs + try: + response = urllib.request.urlopen(path_str) + temp_file.write(response.read().decode('utf-8')) + except urllib.error.URLError as e: + error(f'Failed to download plugin from {path_str}: {e}') + raise return converted_path else: - return path + return Path(path) def _import_via_path(path: Path, namespace: str | None = None) -> str: @@ -80,18 +100,20 @@ def _import_via_path(path: Path, namespace: str | None = None) -> str: return namespace -def load_plugin(path: Path) -> None: +def load_plugin(path: str | Path) -> None: namespace: str | None = None - parsed_url = urllib.parse.urlparse(str(path)) + # Keep URL as string to preserve scheme (avoid Path normalization) + path_str = str(path) if isinstance(path, Path) else path + parsed_url = urllib.parse.urlparse(path_str) info(f'Loading plugin from url {parsed_url}') # The Profile was not a direct match on a remote URL if not parsed_url.scheme: # Path was not found in any known examples, check if it's an absolute path - if os.path.isfile(path): - namespace = _import_via_path(path) + if os.path.isfile(path_str): + namespace = _import_via_path(Path(path_str)) elif parsed_url.scheme in ('https', 'http'): - localized = _localize_path(path) + localized = _localize_path(path_str) namespace = _import_via_path(localized) if namespace and namespace in sys.modules: From 34c6fc4b65a0f1fa5fe4609e3acd69dee0bb332e Mon Sep 17 00:00:00 2001 From: hariomphulre Date: Wed, 13 May 2026 20:54:43 +0530 Subject: [PATCH 2/3] fix(plugins): resolve version comparison bug and secure remote downloads --- archinstall/lib/args.py | 990 +++++++++++++++++++------------------ archinstall/lib/plugins.py | 219 ++++---- 2 files changed, 615 insertions(+), 594 deletions(-) diff --git a/archinstall/lib/args.py b/archinstall/lib/args.py index c67cbe4319..2d9b448980 100644 --- a/archinstall/lib/args.py +++ b/archinstall/lib/args.py @@ -35,524 +35,530 @@ @p_dataclass class Arguments: - config: Path | None = None - config_url: str | None = None - creds: Path | None = None - creds_url: str | None = None - creds_decryption_key: str | None = None - silent: bool = False - dry_run: bool = False - script: str | None = None - mountpoint: Path = Path('/mnt') - skip_ntp: bool = False - skip_wkd: bool = False - skip_boot: bool = False - debug: bool = False - offline: bool = False - no_pkg_lookups: bool = False - plugin: str | None = None - skip_version_check: bool = False - skip_wifi_check: bool = False - advanced: bool = False - verbose: bool = False + config: Path | None = None + config_url: str | None = None + creds: Path | None = None + creds_url: str | None = None + creds_decryption_key: str | None = None + silent: bool = False + dry_run: bool = False + script: str | None = None + mountpoint: Path = Path('/mnt') + skip_ntp: bool = False + skip_wkd: bool = False + skip_boot: bool = False + debug: bool = False + offline: bool = False + no_pkg_lookups: bool = False + plugin: str | None = None + skip_version_check: bool = False + skip_wifi_check: bool = False + advanced: bool = False + verbose: bool = False @dataclass class ArchConfig: - version: str | None = None - script: str | None = None - locale_config: LocaleConfiguration | None = None - archinstall_language: Language = field(default_factory=lambda: translation_handler.get_language_by_abbr('en')) - disk_config: DiskLayoutConfiguration | None = None - profile_config: ProfileConfiguration | None = None - mirror_config: MirrorConfiguration | None = None - network_config: NetworkConfiguration | None = None - bootloader_config: BootloaderConfiguration | None = None - app_config: ApplicationConfiguration | None = None - auth_config: AuthenticationConfiguration | None = None - swap: ZramConfiguration | None = None - hostname: str = 'archlinux' - kernels: list[str] = field(default_factory=lambda: [DEFAULT_KERNEL.value]) - ntp: bool = True - packages: list[str] = field(default_factory=list) - pacman_config: PacmanConfiguration = field(default_factory=PacmanConfiguration.default) - timezone: str = 'UTC' - services: list[str] = field(default_factory=list) - custom_commands: list[str] = field(default_factory=list) - - def unsafe_config(self) -> dict[str, Any]: - config: dict[str, list[UserSerialization] | str | None] = {} - - if self.auth_config: - if self.auth_config.users: - config['users'] = [user.json() for user in self.auth_config.users] - - if self.auth_config.root_enc_password: - config['root_enc_password'] = self.auth_config.root_enc_password.enc_password - - if self.disk_config: - disk_encryption = self.disk_config.disk_encryption - if disk_encryption and disk_encryption.encryption_password: - config['encryption_password'] = disk_encryption.encryption_password.plaintext - - return config - - def safe_config(self) -> dict[str, Any]: - config: Any = { - 'version': self.version, - 'script': self.script, - 'archinstall-language': self.archinstall_language.json(), - 'hostname': self.hostname, - 'kernels': self.kernels, - 'ntp': self.ntp, - 'packages': self.packages, - 'pacman_config': self.pacman_config.json(), - 'swap': self.swap, - 'timezone': self.timezone, - 'services': self.services, - 'custom_commands': self.custom_commands, - 'bootloader_config': self.bootloader_config.json() if self.bootloader_config else None, - 'app_config': self.app_config.json() if self.app_config else None, - 'auth_config': self.auth_config.json() if self.auth_config else None, - } - - if self.locale_config: - config['locale_config'] = self.locale_config.json() - - if self.disk_config: - config['disk_config'] = self.disk_config.json() - - if self.profile_config: - config['profile_config'] = self.profile_config.json() - - if self.mirror_config: - config['mirror_config'] = self.mirror_config.json() - - if self.network_config: - config['network_config'] = self.network_config.json() - - return config - - @classmethod - def from_config(cls, args_config: dict[str, Any], args: Arguments) -> Self: - arch_config = cls() - - arch_config.locale_config = LocaleConfiguration.parse_arg(args_config) - - if script := args_config.get('script', None): - arch_config.script = script - - if archinstall_lang := args_config.get('archinstall-language', None): - arch_config.archinstall_language = translation_handler.get_language_by_name(archinstall_lang) - translation_handler.activate(arch_config.archinstall_language, set_font=False) - - if disk_config := args_config.get('disk_config', {}): - enc_password = args_config.get('encryption_password', '') - password = Password(plaintext=enc_password) if enc_password else None - arch_config.disk_config = DiskLayoutConfiguration.parse_arg(disk_config, password) - - # DEPRECATED - # backwards compatibility for main level disk_encryption entry - disk_encryption: DiskEncryption | None = None - - if args_config.get('disk_encryption', None) is not None and arch_config.disk_config is not None: - disk_encryption = DiskEncryption.parse_arg( - arch_config.disk_config, - args_config['disk_encryption'], - Password(plaintext=args_config.get('encryption_password', '')), - ) - - if disk_encryption: - arch_config.disk_config.disk_encryption = disk_encryption - - if profile_config := args_config.get('profile_config', None): - arch_config.profile_config = ProfileConfiguration.parse_arg(profile_config) - - if mirror_config := args_config.get('mirror_config', None): - backwards_compatible_repo = [] - if additional_repositories := args_config.get('additional-repositories', []): - backwards_compatible_repo = [Repository(r) for r in additional_repositories] + version: str | None = None + script: str | None = None + locale_config: LocaleConfiguration | None = None + archinstall_language: Language = field(default_factory=lambda: translation_handler.get_language_by_abbr('en')) + disk_config: DiskLayoutConfiguration | None = None + profile_config: ProfileConfiguration | None = None + mirror_config: MirrorConfiguration | None = None + network_config: NetworkConfiguration | None = None + bootloader_config: BootloaderConfiguration | None = None + app_config: ApplicationConfiguration | None = None + auth_config: AuthenticationConfiguration | None = None + swap: ZramConfiguration | None = None + hostname: str = 'archlinux' + kernels: list[str] = field(default_factory=lambda: [DEFAULT_KERNEL.value]) + ntp: bool = True + packages: list[str] = field(default_factory=list) + pacman_config: PacmanConfiguration = field(default_factory=PacmanConfiguration.default) + timezone: str = 'UTC' + services: list[str] = field(default_factory=list) + custom_commands: list[str] = field(default_factory=list) + + def unsafe_config(self) -> dict[str, Any]: + config: dict[str, list[UserSerialization] | str | None] = {} + + if self.auth_config: + if self.auth_config.users: + config['users'] = [user.json() for user in self.auth_config.users] + + if self.auth_config.root_enc_password: + config['root_enc_password'] = self.auth_config.root_enc_password.enc_password + + if self.disk_config: + disk_encryption = self.disk_config.disk_encryption + if disk_encryption and disk_encryption.encryption_password: + config['encryption_password'] = disk_encryption.encryption_password.plaintext + + return config + + def safe_config(self) -> dict[str, Any]: + config: Any = { + 'version': self.version, + 'script': self.script, + 'archinstall-language': self.archinstall_language.json(), + 'hostname': self.hostname, + 'kernels': self.kernels, + 'ntp': self.ntp, + 'packages': self.packages, + 'pacman_config': self.pacman_config.json(), + 'swap': self.swap, + 'timezone': self.timezone, + 'services': self.services, + 'custom_commands': self.custom_commands, + 'bootloader_config': self.bootloader_config.json() if self.bootloader_config else None, + 'app_config': self.app_config.json() if self.app_config else None, + 'auth_config': self.auth_config.json() if self.auth_config else None, + } + + if self.locale_config: + config['locale_config'] = self.locale_config.json() + + if self.disk_config: + config['disk_config'] = self.disk_config.json() + + if self.profile_config: + config['profile_config'] = self.profile_config.json() + + if self.mirror_config: + config['mirror_config'] = self.mirror_config.json() + + if self.network_config: + config['network_config'] = self.network_config.json() + + return config + + @classmethod + def from_config(cls, args_config: dict[str, Any], args: Arguments) -> Self: + arch_config = cls() + + arch_config.locale_config = LocaleConfiguration.parse_arg(args_config) + + if script := args_config.get('script', None): + arch_config.script = script + + if archinstall_lang := args_config.get('archinstall-language', None): + arch_config.archinstall_language = translation_handler.get_language_by_name(archinstall_lang) + translation_handler.activate(arch_config.archinstall_language, set_font=False) + + if disk_config := args_config.get('disk_config', {}): + enc_password = args_config.get('encryption_password', '') + password = Password(plaintext=enc_password) if enc_password else None + arch_config.disk_config = DiskLayoutConfiguration.parse_arg(disk_config, password) + + # DEPRECATED + # backwards compatibility for main level disk_encryption entry + disk_encryption: DiskEncryption | None = None + + if args_config.get('disk_encryption', None) is not None and arch_config.disk_config is not None: + disk_encryption = DiskEncryption.parse_arg( + arch_config.disk_config, + args_config['disk_encryption'], + Password(plaintext=args_config.get('encryption_password', '')), + ) + + if disk_encryption: + arch_config.disk_config.disk_encryption = disk_encryption + + if profile_config := args_config.get('profile_config', None): + arch_config.profile_config = ProfileConfiguration.parse_arg(profile_config) + + if mirror_config := args_config.get('mirror_config', None): + backwards_compatible_repo = [] + if additional_repositories := args_config.get('additional-repositories', []): + backwards_compatible_repo = [Repository(r) for r in additional_repositories] - arch_config.mirror_config = MirrorConfiguration.parse_args( - mirror_config, - backwards_compatible_repo, - ) + arch_config.mirror_config = MirrorConfiguration.parse_args( + mirror_config, + backwards_compatible_repo, + ) - if net_config := args_config.get('network_config', None): - arch_config.network_config = NetworkConfiguration.parse_arg(net_config) + if net_config := args_config.get('network_config', None): + arch_config.network_config = NetworkConfiguration.parse_arg(net_config) - if bootloader_config_dict := args_config.get('bootloader_config', None): - arch_config.bootloader_config = BootloaderConfiguration.parse_arg(bootloader_config_dict, args.skip_boot) - # DEPRECATED: separate bootloader and uki fields (backward compatibility) - elif bootloader_str := args_config.get('bootloader', None): - bootloader = Bootloader.from_arg(bootloader_str, args.skip_boot) - uki = args_config.get('uki', False) - if uki and not bootloader.has_uki_support(): - uki = False - arch_config.bootloader_config = BootloaderConfiguration(bootloader=bootloader, uki=uki, removable=True) + if bootloader_config_dict := args_config.get('bootloader_config', None): + arch_config.bootloader_config = BootloaderConfiguration.parse_arg(bootloader_config_dict, args.skip_boot) + # DEPRECATED: separate bootloader and uki fields (backward compatibility) + elif bootloader_str := args_config.get('bootloader', None): + bootloader = Bootloader.from_arg(bootloader_str, args.skip_boot) + uki = args_config.get('uki', False) + if uki and not bootloader.has_uki_support(): + uki = False + arch_config.bootloader_config = BootloaderConfiguration(bootloader=bootloader, uki=uki, removable=True) - # deprecated: backwards compatibility - audio_config_args = args_config.get('audio_config', None) - app_config_args = args_config.get('app_config', None) + # deprecated: backwards compatibility + audio_config_args = args_config.get('audio_config', None) + app_config_args = args_config.get('app_config', None) - if audio_config_args is not None or app_config_args is not None: - arch_config.app_config = ApplicationConfiguration.parse_arg(app_config_args, audio_config_args) + if audio_config_args is not None or app_config_args is not None: + arch_config.app_config = ApplicationConfiguration.parse_arg(app_config_args, audio_config_args) - if auth_config_args := args_config.get('auth_config', None): - arch_config.auth_config = AuthenticationConfiguration.parse_arg(auth_config_args) + if auth_config_args := args_config.get('auth_config', None): + arch_config.auth_config = AuthenticationConfiguration.parse_arg(auth_config_args) - if hostname := args_config.get('hostname', ''): - arch_config.hostname = hostname + if hostname := args_config.get('hostname', ''): + arch_config.hostname = hostname - if kernels := args_config.get('kernels', []): - arch_config.kernels = kernels + if kernels := args_config.get('kernels', []): + arch_config.kernels = kernels - arch_config.ntp = args_config.get('ntp', True) + arch_config.ntp = args_config.get('ntp', True) - if packages := args_config.get('packages', []): - arch_config.packages = packages + if packages := args_config.get('packages', []): + arch_config.packages = packages - if pacman_config := args_config.get('pacman_config', None): - arch_config.pacman_config = PacmanConfiguration.parse_arg(pacman_config) - elif parallel_downloads := args_config.get('parallel_downloads', 0): - arch_config.pacman_config = PacmanConfiguration(parallel_downloads=int(parallel_downloads)) + if pacman_config := args_config.get('pacman_config', None): + arch_config.pacman_config = PacmanConfiguration.parse_arg(pacman_config) + elif parallel_downloads := args_config.get('parallel_downloads', 0): + arch_config.pacman_config = PacmanConfiguration(parallel_downloads=int(parallel_downloads)) - swap_arg = args_config.get('swap') - if swap_arg is not None: - arch_config.swap = ZramConfiguration.parse_arg(swap_arg) + swap_arg = args_config.get('swap') + if swap_arg is not None: + arch_config.swap = ZramConfiguration.parse_arg(swap_arg) - if timezone := args_config.get('timezone', 'UTC'): - arch_config.timezone = timezone + if timezone := args_config.get('timezone', 'UTC'): + arch_config.timezone = timezone - if services := args_config.get('services', []): - arch_config.services = services + if services := args_config.get('services', []): + arch_config.services = services - # DEPRECATED: backwards compatibility - root_password = None - if root_password := args_config.get('!root-password', None): - root_password = Password(plaintext=root_password) + # DEPRECATED: backwards compatibility + root_password = None + if root_password := args_config.get('!root-password', None): + root_password = Password(plaintext=root_password) - if enc_password := args_config.get('root_enc_password', None): - root_password = Password(enc_password=enc_password) + if enc_password := args_config.get('root_enc_password', None): + root_password = Password(enc_password=enc_password) - if root_password is not None: - if arch_config.auth_config is None: - arch_config.auth_config = AuthenticationConfiguration() - arch_config.auth_config.root_enc_password = root_password + if root_password is not None: + if arch_config.auth_config is None: + arch_config.auth_config = AuthenticationConfiguration() + arch_config.auth_config.root_enc_password = root_password - # DEPRECATED: backwards compatibility - users: list[User] = [] - if args_users := args_config.get('!users', None): - users = User.parse_arguments(args_users) + # DEPRECATED: backwards compatibility + users: list[User] = [] + if args_users := args_config.get('!users', None): + users = User.parse_arguments(args_users) - if args_users := args_config.get('users', None): - users = User.parse_arguments(args_users) + if args_users := args_config.get('users', None): + users = User.parse_arguments(args_users) - if users: - if arch_config.auth_config is None: - arch_config.auth_config = AuthenticationConfiguration() - arch_config.auth_config.users = users + if users: + if arch_config.auth_config is None: + arch_config.auth_config = AuthenticationConfiguration() + arch_config.auth_config.users = users - if custom_commands := args_config.get('custom_commands', []): - arch_config.custom_commands = custom_commands + if custom_commands := args_config.get('custom_commands', []): + arch_config.custom_commands = custom_commands - return arch_config + return arch_config class ArchConfigHandler: - def __init__(self) -> None: - self._parser: ArgumentParser = self._define_arguments() - args: Arguments = self._parse_args() - self._args = args - - config = self._parse_config() - - try: - self._config = ArchConfig.from_config(config, args) - self._config.version = get_version() - except ValueError as err: - warn(str(err)) - sys.exit(1) - - @property - def config(self) -> ArchConfig: - return self._config - - @property - def args(self) -> Arguments: - return self._args - - def get_script(self) -> str: - if script := self.args.script: - return script - - if script := self.config.script: - return script - - return 'guided' - - def print_help(self) -> None: - self._parser.print_help() - - def _define_arguments(self) -> ArgumentParser: - parser = ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument( - '-v', - '--version', - action='version', - default=False, - version='%(prog)s ' + get_version(), - ) - parser.add_argument( - '--config', - type=Path, - nargs='?', - default=None, - help='JSON configuration file', - ) - parser.add_argument( - '--config-url', - type=str, - nargs='?', - default=None, - help='Url to a JSON configuration file', - ) - parser.add_argument( - '--creds', - type=Path, - nargs='?', - default=None, - help='JSON credentials configuration file', - ) - parser.add_argument( - '--creds-url', - type=str, - nargs='?', - default=None, - help='Url to a JSON credentials configuration file', - ) - parser.add_argument( - '--creds-decryption-key', - type=str, - nargs='?', - default=None, - help='Decryption key for credentials file', - ) - parser.add_argument( - '--silent', - action='store_true', - default=False, - help='WARNING: Disables all prompts for input and confirmation. If no configuration is provided, this is ignored', - ) - parser.add_argument( - '--dry-run', - '--dry_run', - action='store_true', - default=False, - help='Generates a configuration file and then exits instead of performing an installation', - ) - parser.add_argument( - '--script', - nargs='?', - help='Script to run for installation', - type=str, - ) - parser.add_argument( - '--mountpoint', - type=Path, - nargs='?', - default=Path('/mnt'), - help='Define an alternate mount point for installation', - ) - parser.add_argument( - '--skip-ntp', - action='store_true', - help='Disables NTP checks during installation', - default=False, - ) - parser.add_argument( - '--skip-wkd', - action='store_true', - help='Disables checking if archlinux keyring wkd sync is complete.', - default=False, - ) - parser.add_argument( - '--skip-boot', - action='store_true', - help='Disables installation of a boot loader (note: only use this when problems arise with the boot loader step).', - default=False, - ) - parser.add_argument( - '--debug', - action='store_true', - default=False, - help='Adds debug info into the log', - ) - parser.add_argument( - '--offline', - action='store_true', - default=False, - help='Disabled online upstream services such as package search and key-ring auto update.', - ) - parser.add_argument( - '--no-pkg-lookups', - action='store_true', - default=False, - help='Disabled package validation specifically prior to starting installation.', - ) - parser.add_argument( - '--plugin', - nargs='?', - type=str, - default=None, - help='File path to a plugin to load', - ) - parser.add_argument( - '--skip-version-check', - action='store_true', - default=False, - help='Skip the version check when running archinstall', - ) - parser.add_argument( - '--skip-wifi-check', - action='store_true', - default=False, - help='Skip wifi check when running archinstall', - ) - parser.add_argument( - '--advanced', - action='store_true', - default=False, - help='Enabled advanced options', - ) - parser.add_argument( - '--verbose', - action='store_true', - default=False, - help='Enabled verbose options', - ) - - return parser - - def _parse_args(self) -> Arguments: - argparse_args = vars(self._parser.parse_args()) - args: Arguments = Arguments(**argparse_args) - - # amend the parameters (check internal consistency) - # Installation can't be silent if config is not passed - if args.config is None and args.config_url is None: - args.silent = False - - if args.debug: - warn(f'Warning: --debug mode will write certain credentials to {logger.path}!') - - if args.plugin: - # Pass plugin as string to preserve URL format (avoid Path normalization) - load_plugin(args.plugin) - - if args.creds_decryption_key is None: - if os.environ.get('ARCHINSTALL_CREDS_DECRYPTION_KEY'): - args.creds_decryption_key = os.environ.get('ARCHINSTALL_CREDS_DECRYPTION_KEY') - - return args - - def _parse_config(self) -> dict[str, Any]: - config: dict[str, Any] = {} - config_data: str | None = None - creds_data: str | None = None - - if self._args.config is not None: - config_data = self._read_file(self._args.config) - elif self._args.config_url is not None: - config_data = self._fetch_from_url(self._args.config_url) - - if config_data is not None: - config.update(json.loads(config_data)) - - if self._args.creds is not None: - creds_data = self._read_file(self._args.creds) - elif self._args.creds_url is not None: - creds_data = self._fetch_from_url(self._args.creds_url) - - if creds_data is not None: - json_data = self._process_creds_data(creds_data) - if json_data is not None: - config.update(json_data) - - config = self._cleanup_config(config) - - return config - - def _process_creds_data(self, creds_data: str) -> dict[str, Any] | None: - if creds_data.startswith('$'): # encrypted data - if self._args.creds_decryption_key is not None: - try: - creds_data = decrypt(creds_data, self._args.creds_decryption_key) - return json.loads(creds_data) - except ValueError as err: - if 'Invalid password' in str(err): - error(tr('Incorrect credentials file decryption password')) - sys.exit(1) - else: - debug(f'Error decrypting credentials file: {err}') - raise err from err - else: - header = tr('Enter credentials file decryption password') - wrong_pwd_text = tr('Incorrect password') - prompt = header - - while True: - decryption_pwd: Password | None = tui.run( - lambda p=prompt: get_password( # type: ignore[misc] - header=p, - allow_skip=False, - no_confirmation=True, - ) - ) - - if not decryption_pwd: - return None - - try: - creds_data = decrypt(creds_data, decryption_pwd.plaintext) - break - except ValueError as err: - if 'Invalid password' in str(err): - debug('Incorrect credentials file decryption password') - prompt = f'{header}' + f'\n\n{wrong_pwd_text}' - else: - debug(f'Error decrypting credentials file: {err}') - raise err from err - - return json.loads(creds_data) - - def _fetch_from_url(self, url: str) -> str: - if urllib.parse.urlparse(url).scheme: - try: - req = Request(url, headers={'User-Agent': 'ArchInstall'}) - with urlopen(req) as resp: - return resp.read().decode('utf-8') - except urllib.error.HTTPError as err: - error(f'Could not fetch JSON from {url}: {err}') - else: - error('Not a valid url') - - sys.exit(1) - - def _read_file(self, path: Path) -> str: - if not path.exists(): - error(f'Could not find file {path}') - sys.exit(1) - - return path.read_text() - - def _cleanup_config(self, config: Namespace | dict[str, Any]) -> dict[str, Any]: - clean_args = {} - for key, val in config.items(): - if isinstance(val, dict): - val = self._cleanup_config(val) - - if val is not None: - clean_args[key] = val - - return clean_args + def __init__(self) -> None: + self._parser: ArgumentParser = self._define_arguments() + args: Arguments = self._parse_args() + self._args = args + + config = self._parse_config() + + try: + self._config = ArchConfig.from_config(config, args) + self._config.version = get_version() + except ValueError as err: + warn(str(err)) + sys.exit(1) + + @property + def config(self) -> ArchConfig: + return self._config + + @property + def args(self) -> Arguments: + return self._args + + def get_script(self) -> str: + if script := self.args.script: + return script + + if script := self.config.script: + return script + + return 'guided' + + def print_help(self) -> None: + self._parser.print_help() + + def _define_arguments(self) -> ArgumentParser: + parser = ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument( + '-v', + '--version', + action='version', + default=False, + version='%(prog)s ' + get_version(), + ) + parser.add_argument( + '--config', + type=Path, + nargs='?', + default=None, + help='JSON configuration file', + ) + parser.add_argument( + '--config-url', + type=str, + nargs='?', + default=None, + help='Url to a JSON configuration file', + ) + parser.add_argument( + '--creds', + type=Path, + nargs='?', + default=None, + help='JSON credentials configuration file', + ) + parser.add_argument( + '--creds-url', + type=str, + nargs='?', + default=None, + help='Url to a JSON credentials configuration file', + ) + parser.add_argument( + '--creds-decryption-key', + type=str, + nargs='?', + default=None, + help='Decryption key for credentials file', + ) + parser.add_argument( + '--silent', + action='store_true', + default=False, + help='WARNING: Disables all prompts for input and confirmation. If no configuration is provided, this is ignored', + ) + parser.add_argument( + '--dry-run', + '--dry_run', + action='store_true', + default=False, + help='Generates a configuration file and then exits instead of performing an installation', + ) + parser.add_argument( + '--script', + nargs='?', + help='Script to run for installation', + type=str, + ) + parser.add_argument( + '--mountpoint', + type=Path, + nargs='?', + default=Path('/mnt'), + help='Define an alternate mount point for installation', + ) + parser.add_argument( + '--skip-ntp', + action='store_true', + help='Disables NTP checks during installation', + default=False, + ) + parser.add_argument( + '--skip-wkd', + action='store_true', + help='Disables checking if archlinux keyring wkd sync is complete.', + default=False, + ) + parser.add_argument( + '--skip-boot', + action='store_true', + help='Disables installation of a boot loader (note: only use this when problems arise with the boot loader step).', + default=False, + ) + parser.add_argument( + '--debug', + action='store_true', + default=False, + help='Adds debug info into the log', + ) + parser.add_argument( + '--offline', + action='store_true', + default=False, + help='Disabled online upstream services such as package search and key-ring auto update.', + ) + parser.add_argument( + '--no-pkg-lookups', + action='store_true', + default=False, + help='Disabled package validation specifically prior to starting installation.', + ) + parser.add_argument( + '--plugin', + nargs='?', + type=str, + default=None, + help='File path to a plugin to load', + ) + parser.add_argument( + '--skip-version-check', + action='store_true', + default=False, + help='Skip the version check when running archinstall', + ) + parser.add_argument( + '--skip-wifi-check', + action='store_true', + default=False, + help='Skip wifi check when running archinstall', + ) + parser.add_argument( + '--advanced', + action='store_true', + default=False, + help='Enabled advanced options', + ) + parser.add_argument( + '--verbose', + action='store_true', + default=False, + help='Enabled verbose options', + ) + + return parser + + def _parse_args(self) -> Arguments: + argparse_args = vars(self._parser.parse_args()) + args: Arguments = Arguments(**argparse_args) + + # amend the parameters (check internal consistency) + # Installation can't be silent if config is not passed + if args.config is None and args.config_url is None: + args.silent = False + + if args.debug: + warn(f'Warning: --debug mode will write certain credentials to {logger.path}!') + + if args.plugin: + # Pass plugin as string to preserve URL format (avoid Path normalization) + load_plugin(args.plugin) + + if args.creds_decryption_key is None: + if os.environ.get('ARCHINSTALL_CREDS_DECRYPTION_KEY'): + args.creds_decryption_key = os.environ.get('ARCHINSTALL_CREDS_DECRYPTION_KEY') + + return args + + def _parse_config(self) -> dict[str, Any]: + config: dict[str, Any] = {} + config_data: str | None = None + creds_data: str | None = None + + if self._args.config is not None: + config_data = self._read_file(self._args.config) + elif self._args.config_url is not None: + config_data = self._fetch_from_url(self._args.config_url) + + if config_data is not None: + config.update(json.loads(config_data)) + + if self._args.creds is not None: + creds_data = self._read_file(self._args.creds) + elif self._args.creds_url is not None: + creds_data = self._fetch_from_url(self._args.creds_url) + + if creds_data is not None: + json_data = self._process_creds_data(creds_data) + if json_data is not None: + config.update(json_data) + + config = self._cleanup_config(config) + + return config + + def _process_creds_data(self, creds_data: str) -> dict[str, Any] | None: + if creds_data.startswith('$'): # encrypted data + if self._args.creds_decryption_key is not None: + try: + creds_data = decrypt(creds_data, self._args.creds_decryption_key) + return json.loads(creds_data) + except ValueError as err: + if 'Invalid password' in str(err): + error(tr('Incorrect credentials file decryption password')) + sys.exit(1) + else: + debug(f'Error decrypting credentials file: {err}') + raise err from err + else: + header = tr('Enter credentials file decryption password') + wrong_pwd_text = tr('Incorrect password') + prompt = header + + while True: + decryption_pwd: Password | None = tui.run( + lambda p=prompt: get_password( # type: ignore[misc] + header=p, + allow_skip=False, + no_confirmation=True, + ) + ) + + if not decryption_pwd: + return None + + try: + creds_data = decrypt(creds_data, decryption_pwd.plaintext) + break + except ValueError as err: + if 'Invalid password' in str(err): + debug('Incorrect credentials file decryption password') + prompt = f'{header}' + f'\n\n{wrong_pwd_text}' + else: + debug(f'Error decrypting credentials file: {err}') + raise err from err + + return json.loads(creds_data) + + def _fetch_from_url(self, url: str) -> str: + if urllib.parse.urlparse(url).scheme: + try: + req = Request(url, headers={'User-Agent': 'ArchInstall'}) + # FIXED: Added a 15-second timeout to prevent infinite hanging + with urlopen(req, timeout=15) as resp: + return resp.read().decode('utf-8') + except urllib.error.HTTPError as err: + error(f'Could not fetch JSON from {url}: {err}') + else: + error('Not a valid url') + + sys.exit(1) + + def _read_file(self, path: Path) -> str: + if not path.exists(): + error(f'Could not find file {path}') + sys.exit(1) + + return path.read_text() + + # FIXED: Added safe handling to support both ArgumentParser Namespace objects AND dictionaries, + # as calling .items() on a Namespace object without using `vars()` throws an AttributeError. + def _cleanup_config(self, config: Namespace | dict[str, Any]) -> dict[str, Any]: + clean_args = {} + # Convert Namespace to dict if necessary before iterating + config_dict = vars(config) if isinstance(config, Namespace) else config + + for key, val in config_dict.items(): + if isinstance(val, dict): + val = self._cleanup_config(val) + + if val is not None: + clean_args[key] = val + + return clean_args \ No newline at end of file diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py index f83989262b..7aa7b6ab3a 100644 --- a/archinstall/lib/plugins.py +++ b/archinstall/lib/plugins.py @@ -18,125 +18,140 @@ # 2: Load the plugin entrypoint # 3: Initiate the plugin and store it as .name in plugins for plugin_definition in metadata.entry_points().select(group='archinstall.plugin'): - plugin_entrypoint = plugin_definition.load() + plugin_entrypoint = plugin_definition.load() - try: - plugins[plugin_definition.name] = plugin_entrypoint() - except Exception as err: - error( - f'Error: {err}', - f'The above error was detected when loading the plugin: {plugin_definition}', - ) + try: + plugins[plugin_definition.name] = plugin_entrypoint() + except Exception as err: + error( + f'Error: {err}', + f'The above error was detected when loading the plugin: {plugin_definition}', + ) # @archinstall.plugin decorator hook to programmatically add # plugins in runtime. Useful in profiles_bck and other things. def plugin(f, *args, **kwargs) -> None: # type: ignore[no-untyped-def] - plugins[f.__name__] = f + plugins[f.__name__] = f def _localize_path(path: str | Path) -> Path: - """ - Support structures for load_plugin() - """ - # Keep as string to preserve URL format (Path normalization breaks URLs) - path_str = str(path) - url = urllib.parse.urlparse(path_str) - - if url.scheme and url.scheme in ('https', 'http'): - # Extract filename from the URL path component - # Use os.path.basename instead of path.stem to handle URLs correctly - url_path = url.path - filename = os.path.basename(url_path) if url_path else 'plugin' - # Remove .py extension if present for the temporary filename format - if filename.endswith('.py'): - filename_base = filename.replace('.py', '') - else: - filename_base = filename - - converted_path = Path(f'/tmp/{filename_base}_{hashlib.md5(os.urandom(12)).hexdigest()}.py') - - with open(converted_path, 'w') as temp_file: - # Use the original path string directly instead of reconstructing from parsed URL - # This avoids issues with url.geturl() producing malformed URLs - try: - response = urllib.request.urlopen(path_str) - temp_file.write(response.read().decode('utf-8')) - except urllib.error.URLError as e: - error(f'Failed to download plugin from {path_str}: {e}') - raise - - return converted_path - else: - return Path(path) + """ + Support structures for load_plugin() + """ + # Keep as string to preserve URL format (Path normalization breaks URLs) + path_str = str(path) + url = urllib.parse.urlparse(path_str) + + if url.scheme and url.scheme in ('https', 'http'): + # FIXED: Prevent arbitrary code execution over unencrypted HTTP + if url.scheme == 'http': + error(f'Insecure HTTP URL {path_str} is not allowed for downloading plugins. Please use HTTPS.') + raise ValueError('Insecure HTTP URLs are blocked for security reasons.') + + # Extract filename from the URL path component + # Use os.path.basename instead of path.stem to handle URLs correctly + url_path = url.path + filename = os.path.basename(url_path) if url_path else 'plugin' + # Remove .py extension if present for the temporary filename format + if filename.endswith('.py'): + filename_base = filename.replace('.py', '') + else: + filename_base = filename + + converted_path = Path(f'/tmp/{filename_base}_{hashlib.md5(os.urandom(12)).hexdigest()}.py') + + # FIXED: Open in 'wb' (write-binary) mode to safely write downloaded bytes without assuming UTF-8 + with open(converted_path, 'wb') as temp_file: + try: + # FIXED: Added a 15-second timeout and wrapped urlopen in a `with` statement to close the socket cleanly + with urllib.request.urlopen(path_str, timeout=15) as response: + temp_file.write(response.read()) + except urllib.error.URLError as e: + error(f'Failed to download plugin from {path_str}: {e}') + raise + + return converted_path + else: + return Path(path) def _import_via_path(path: Path, namespace: str | None = None) -> str: - if not namespace: - namespace = os.path.basename(path) + if not namespace: + namespace = os.path.basename(path) - if namespace == '__init__.py': - namespace = path.parent.name + if namespace == '__init__.py': + namespace = path.parent.name - try: - spec = importlib.util.spec_from_file_location(namespace, path) - if spec and spec.loader: - imported = importlib.util.module_from_spec(spec) - sys.modules[namespace] = imported - spec.loader.exec_module(sys.modules[namespace]) + try: + spec = importlib.util.spec_from_file_location(namespace, path) + if spec and spec.loader: + imported = importlib.util.module_from_spec(spec) + sys.modules[namespace] = imported + spec.loader.exec_module(sys.modules[namespace]) - return namespace - except Exception as err: - error( - f'Error: {err}', - f'The above error was detected when loading the plugin: {path}', - ) + return namespace + except Exception as err: + error( + f'Error: {err}', + f'The above error was detected when loading the plugin: {path}', + ) - try: - del sys.modules[namespace] - except Exception: - pass + try: + del sys.modules[namespace] + except Exception: + pass - return namespace + return namespace def load_plugin(path: str | Path) -> None: - namespace: str | None = None - # Keep URL as string to preserve scheme (avoid Path normalization) - path_str = str(path) if isinstance(path, Path) else path - parsed_url = urllib.parse.urlparse(path_str) - info(f'Loading plugin from url {parsed_url}') - - # The Profile was not a direct match on a remote URL - if not parsed_url.scheme: - # Path was not found in any known examples, check if it's an absolute path - if os.path.isfile(path_str): - namespace = _import_via_path(Path(path_str)) - elif parsed_url.scheme in ('https', 'http'): - localized = _localize_path(path_str) - namespace = _import_via_path(localized) - - if namespace and namespace in sys.modules: - # Version dependency via __archinstall__version__ variable (if present) in the plugin - # Any errors in version inconsistency will be handled through normal error handling if not defined. - version = get_version() - - if version is not None: - version_major_and_minor = version.rsplit('.', 1)[0] - - if sys.modules[namespace].__archinstall__version__ < float(version_major_and_minor): - error(f'Plugin {sys.modules[namespace]} does not support the current Archinstall version.') - - # Locate the plugin entry-point called Plugin() - # This in accordance with the entry_points() from setup.cfg above - if hasattr(sys.modules[namespace], 'Plugin'): - try: - plugins[namespace] = sys.modules[namespace].Plugin() - info(f'Plugin {plugins[namespace]} has been loaded.') - except Exception as err: - error( - f'Error: {err}', - f'The above error was detected when initiating the plugin: {path}', - ) - else: - warn(f"Plugin '{path}' is missing a valid entry-point or is corrupt.") + namespace: str | None = None + # Keep URL as string to preserve scheme (avoid Path normalization) + path_str = str(path) if isinstance(path, Path) else path + parsed_url = urllib.parse.urlparse(path_str) + info(f'Loading plugin from url {parsed_url}') + + # The Profile was not a direct match on a remote URL + if not parsed_url.scheme: + # Path was not found in any known examples, check if it's an absolute path + if os.path.isfile(path_str): + namespace = _import_via_path(Path(path_str)) + elif parsed_url.scheme in ('https', 'http'): + localized = _localize_path(path_str) + namespace = _import_via_path(localized) + + if namespace and namespace in sys.modules: + # Version dependency via __archinstall__version__ variable (if present) in the plugin + # Any errors in version inconsistency will be handled through normal error handling if not defined. + version = get_version() + + if version is not None: + version_major_and_minor = version.rsplit('.', 1)[0] + + # FIXED: Safely fetch the plugin version attribute, defaulting to "0.0" if missing + plugin_version_raw = getattr(sys.modules[namespace], '__archinstall__version__', '0.0') + + # FIXED: Safely parse versions into a tuple for integer comparison, preventing the float("2.10") == 2.1 bug + def parse_version(v: str | float) -> tuple[int, ...]: + return tuple(int(x) for x in str(v).split('.') if x.isdigit()) + + plugin_version = parse_version(plugin_version_raw) + system_version = parse_version(version_major_and_minor) + + if plugin_version < system_version: + error(f'Plugin {sys.modules[namespace]} does not support the current Archinstall version.') + + # Locate the plugin entry-point called Plugin() + # This in accordance with the entry_points() from setup.cfg above + if hasattr(sys.modules[namespace], 'Plugin'): + try: + plugins[namespace] = sys.modules[namespace].Plugin() + info(f'Plugin {plugins[namespace]} has been loaded.') + except Exception as err: + error( + f'Error: {err}', + f'The above error was detected when initiating the plugin: {path}', + ) + else: + warn(f"Plugin '{path}' is missing a valid entry-point or is corrupt.") \ No newline at end of file From 002ff4fcbdfd40344d01fdb2567e94f62ebd8fa4 Mon Sep 17 00:00:00 2001 From: hariomphulre Date: Fri, 15 May 2026 02:06:25 +0530 Subject: [PATCH 3/3] fix(plugins): restore args parsing and complete #3021 fixes Pass plugin paths as strings so HTTPS URLs are not mangled by pathlib. Fix mixed tab/space indentation in ArchConfig.from_config that broke imports. Harden plugin import (spec handling, failed-import cleanup), and skip version comparison when version tuples are empty. Add tests for URL handling and HTTP rejection. Co-authored-by: Cursor --- archinstall/lib/args.py | 170 +++++++++++++-------------- archinstall/lib/plugins.py | 235 +++++++++++++++++++------------------ tests/test_plugins.py | 38 ++++++ 3 files changed, 241 insertions(+), 202 deletions(-) create mode 100644 tests/test_plugins.py diff --git a/archinstall/lib/args.py b/archinstall/lib/args.py index b65024d304..0e788cf56d 100644 --- a/archinstall/lib/args.py +++ b/archinstall/lib/args.py @@ -37,26 +37,26 @@ @p_dataclass class Arguments: - config: Path | None = None - config_url: str | None = None - creds: Path | None = None - creds_url: str | None = None - creds_decryption_key: str | None = None - silent: bool = False - dry_run: bool = False - script: str | None = None - mountpoint: Path = Path('/mnt') - skip_ntp: bool = False - skip_wkd: bool = False - skip_boot: bool = False - debug: bool = False - offline: bool = False - no_pkg_lookups: bool = False - plugin: str | None = None - skip_version_check: bool = False - skip_wifi_check: bool = False - advanced: bool = False - verbose: bool = False + config: Path | None = None + config_url: str | None = None + creds: Path | None = None + creds_url: str | None = None + creds_decryption_key: str | None = None + silent: bool = False + dry_run: bool = False + script: str | None = None + mountpoint: Path = Path('/mnt') + skip_ntp: bool = False + skip_wkd: bool = False + skip_boot: bool = False + debug: bool = False + offline: bool = False + no_pkg_lookups: bool = False + plugin: str | None = None + skip_version_check: bool = False + skip_wifi_check: bool = False + advanced: bool = False + verbose: bool = False class ArchConfigType(StrEnum): @@ -276,90 +276,90 @@ def from_config(cls, args_config: dict[str, Any], args: Arguments) -> Self: if additional_repositories := args_config.get('additional-repositories', []): backwards_compatible_repo = [Repository(r) for r in additional_repositories] - arch_config.mirror_config = MirrorConfiguration.parse_args( - mirror_config, - backwards_compatible_repo, - ) + arch_config.mirror_config = MirrorConfiguration.parse_args( + mirror_config, + backwards_compatible_repo, + ) - if net_config := args_config.get('network_config', None): - arch_config.network_config = NetworkConfiguration.parse_arg(net_config) + if net_config := args_config.get('network_config', None): + arch_config.network_config = NetworkConfiguration.parse_arg(net_config) - if bootloader_config_dict := args_config.get('bootloader_config', None): - arch_config.bootloader_config = BootloaderConfiguration.parse_arg(bootloader_config_dict, args.skip_boot) - # DEPRECATED: separate bootloader and uki fields (backward compatibility) - elif bootloader_str := args_config.get('bootloader', None): - bootloader = Bootloader.from_arg(bootloader_str, args.skip_boot) - uki = args_config.get('uki', False) - if uki and not bootloader.has_uki_support(): - uki = False - arch_config.bootloader_config = BootloaderConfiguration(bootloader=bootloader, uki=uki, removable=True) + if bootloader_config_dict := args_config.get('bootloader_config', None): + arch_config.bootloader_config = BootloaderConfiguration.parse_arg(bootloader_config_dict, args.skip_boot) + # DEPRECATED: separate bootloader and uki fields (backward compatibility) + elif bootloader_str := args_config.get('bootloader', None): + bootloader = Bootloader.from_arg(bootloader_str, args.skip_boot) + uki = args_config.get('uki', False) + if uki and not bootloader.has_uki_support(): + uki = False + arch_config.bootloader_config = BootloaderConfiguration(bootloader=bootloader, uki=uki, removable=True) - # deprecated: backwards compatibility - audio_config_args = args_config.get('audio_config', None) - app_config_args = args_config.get('app_config', None) + # deprecated: backwards compatibility + audio_config_args = args_config.get('audio_config', None) + app_config_args = args_config.get('app_config', None) - if audio_config_args is not None or app_config_args is not None: - arch_config.app_config = ApplicationConfiguration.parse_arg(app_config_args, audio_config_args) + if audio_config_args is not None or app_config_args is not None: + arch_config.app_config = ApplicationConfiguration.parse_arg(app_config_args, audio_config_args) - if auth_config_args := args_config.get('auth_config', None): - arch_config.auth_config = AuthenticationConfiguration.parse_arg(auth_config_args) + if auth_config_args := args_config.get('auth_config', None): + arch_config.auth_config = AuthenticationConfiguration.parse_arg(auth_config_args) - if hostname := args_config.get('hostname', ''): - arch_config.hostname = hostname + if hostname := args_config.get('hostname', ''): + arch_config.hostname = hostname - if kernels := args_config.get('kernels', []): - arch_config.kernels = kernels + if kernels := args_config.get('kernels', []): + arch_config.kernels = kernels - arch_config.ntp = args_config.get('ntp', True) + arch_config.ntp = args_config.get('ntp', True) - if packages := args_config.get('packages', []): - arch_config.packages = packages + if packages := args_config.get('packages', []): + arch_config.packages = packages - if pacman_config := args_config.get('pacman_config', None): - arch_config.pacman_config = PacmanConfiguration.parse_arg(pacman_config) - elif parallel_downloads := args_config.get('parallel_downloads', 0): - arch_config.pacman_config = PacmanConfiguration(parallel_downloads=int(parallel_downloads)) + if pacman_config := args_config.get('pacman_config', None): + arch_config.pacman_config = PacmanConfiguration.parse_arg(pacman_config) + elif parallel_downloads := args_config.get('parallel_downloads', 0): + arch_config.pacman_config = PacmanConfiguration(parallel_downloads=int(parallel_downloads)) - swap_arg = args_config.get('swap') - if swap_arg is not None: - arch_config.swap = ZramConfiguration.parse_arg(swap_arg) + swap_arg = args_config.get('swap') + if swap_arg is not None: + arch_config.swap = ZramConfiguration.parse_arg(swap_arg) - if timezone := args_config.get('timezone', 'UTC'): - arch_config.timezone = timezone + if timezone := args_config.get('timezone', 'UTC'): + arch_config.timezone = timezone - if services := args_config.get('services', []): - arch_config.services = services + if services := args_config.get('services', []): + arch_config.services = services - # DEPRECATED: backwards compatibility - root_password = None - if root_password := args_config.get('!root-password', None): - root_password = Password(plaintext=root_password) + # DEPRECATED: backwards compatibility + root_password = None + if root_password := args_config.get('!root-password', None): + root_password = Password(plaintext=root_password) - if enc_password := args_config.get('root_enc_password', None): - root_password = Password(enc_password=enc_password) + if enc_password := args_config.get('root_enc_password', None): + root_password = Password(enc_password=enc_password) - if root_password is not None: - if arch_config.auth_config is None: - arch_config.auth_config = AuthenticationConfiguration() - arch_config.auth_config.root_enc_password = root_password + if root_password is not None: + if arch_config.auth_config is None: + arch_config.auth_config = AuthenticationConfiguration() + arch_config.auth_config.root_enc_password = root_password - # DEPRECATED: backwards compatibility - users: list[User] = [] - if args_users := args_config.get('!users', None): - users = User.parse_arguments(args_users) + # DEPRECATED: backwards compatibility + users: list[User] = [] + if args_users := args_config.get('!users', None): + users = User.parse_arguments(args_users) - if args_users := args_config.get('users', None): - users = User.parse_arguments(args_users) + if args_users := args_config.get('users', None): + users = User.parse_arguments(args_users) - if users: - if arch_config.auth_config is None: - arch_config.auth_config = AuthenticationConfiguration() - arch_config.auth_config.users = users + if users: + if arch_config.auth_config is None: + arch_config.auth_config = AuthenticationConfiguration() + arch_config.auth_config.users = users - if custom_commands := args_config.get('custom_commands', []): - arch_config.custom_commands = custom_commands + if custom_commands := args_config.get('custom_commands', []): + arch_config.custom_commands = custom_commands - return arch_config + return arch_config class ArchConfigHandler: @@ -549,8 +549,8 @@ def _parse_args(self) -> Arguments: warn(f'Warning: --debug mode will write certain credentials to {logger.path}!') if args.plugin: - plugin_path = Path(args.plugin) - load_plugin(plugin_path) + # pathlib collapses "https://..." to "https:/..." which breaks URL loading (#3021). + load_plugin(args.plugin) if args.creds_decryption_key is None: if os.environ.get('ARCHINSTALL_CREDS_DECRYPTION_KEY'): diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py index 7aa7b6ab3a..ca77d71cfe 100644 --- a/archinstall/lib/plugins.py +++ b/archinstall/lib/plugins.py @@ -18,140 +18,141 @@ # 2: Load the plugin entrypoint # 3: Initiate the plugin and store it as .name in plugins for plugin_definition in metadata.entry_points().select(group='archinstall.plugin'): - plugin_entrypoint = plugin_definition.load() + plugin_entrypoint = plugin_definition.load() - try: - plugins[plugin_definition.name] = plugin_entrypoint() - except Exception as err: - error( - f'Error: {err}', - f'The above error was detected when loading the plugin: {plugin_definition}', - ) + try: + plugins[plugin_definition.name] = plugin_entrypoint() + except Exception as err: + error( + f'Error: {err}', + f'The above error was detected when loading the plugin: {plugin_definition}', + ) # @archinstall.plugin decorator hook to programmatically add # plugins in runtime. Useful in profiles_bck and other things. def plugin(f, *args, **kwargs) -> None: # type: ignore[no-untyped-def] - plugins[f.__name__] = f + plugins[f.__name__] = f def _localize_path(path: str | Path) -> Path: - """ - Support structures for load_plugin() - """ - # Keep as string to preserve URL format (Path normalization breaks URLs) - path_str = str(path) - url = urllib.parse.urlparse(path_str) - - if url.scheme and url.scheme in ('https', 'http'): - # FIXED: Prevent arbitrary code execution over unencrypted HTTP - if url.scheme == 'http': - error(f'Insecure HTTP URL {path_str} is not allowed for downloading plugins. Please use HTTPS.') - raise ValueError('Insecure HTTP URLs are blocked for security reasons.') - - # Extract filename from the URL path component - # Use os.path.basename instead of path.stem to handle URLs correctly - url_path = url.path - filename = os.path.basename(url_path) if url_path else 'plugin' - # Remove .py extension if present for the temporary filename format - if filename.endswith('.py'): - filename_base = filename.replace('.py', '') - else: - filename_base = filename - - converted_path = Path(f'/tmp/{filename_base}_{hashlib.md5(os.urandom(12)).hexdigest()}.py') - - # FIXED: Open in 'wb' (write-binary) mode to safely write downloaded bytes without assuming UTF-8 - with open(converted_path, 'wb') as temp_file: - try: - # FIXED: Added a 15-second timeout and wrapped urlopen in a `with` statement to close the socket cleanly - with urllib.request.urlopen(path_str, timeout=15) as response: - temp_file.write(response.read()) - except urllib.error.URLError as e: - error(f'Failed to download plugin from {path_str}: {e}') - raise - - return converted_path - else: - return Path(path) + """ + Support structures for load_plugin() + """ + # Keep as string to preserve URL format (Path normalization breaks URLs) + path_str = str(path) + url = urllib.parse.urlparse(path_str) + + if url.scheme and url.scheme in ('https', 'http'): + if url.scheme == 'http': + error(f'Insecure HTTP URL {path_str} is not allowed for downloading plugins. Please use HTTPS.') + raise ValueError('Insecure HTTP URLs are blocked for security reasons.') + + # Extract filename from the URL path component + # Use os.path.basename instead of path.stem to handle URLs correctly + url_path = url.path + filename = os.path.basename(url_path) if url_path else 'plugin' + # Remove .py extension if present for the temporary filename format + if filename.endswith('.py'): + filename_base = filename.replace('.py', '') + else: + filename_base = filename + + converted_path = Path(f'/tmp/{filename_base}_{hashlib.md5(os.urandom(12)).hexdigest()}.py') + + with open(converted_path, 'wb') as temp_file: + try: + with urllib.request.urlopen(path_str, timeout=15) as response: + temp_file.write(response.read()) + except urllib.error.URLError as e: + error(f'Failed to download plugin from {path_str}: {e}') + raise + + return converted_path + else: + return Path(path) def _import_via_path(path: Path, namespace: str | None = None) -> str: - if not namespace: - namespace = os.path.basename(path) + if not namespace: + namespace = os.path.basename(path) - if namespace == '__init__.py': - namespace = path.parent.name + if namespace == '__init__.py': + namespace = path.parent.name - try: - spec = importlib.util.spec_from_file_location(namespace, path) - if spec and spec.loader: - imported = importlib.util.module_from_spec(spec) - sys.modules[namespace] = imported - spec.loader.exec_module(sys.modules[namespace]) + try: + spec = importlib.util.spec_from_file_location(namespace, path) + if spec is None or spec.loader is None: + error( + f'Could not load plugin module spec from {path}', + f'The above error was detected when loading the plugin: {path}', + ) + return '' - return namespace - except Exception as err: - error( - f'Error: {err}', - f'The above error was detected when loading the plugin: {path}', - ) + imported = importlib.util.module_from_spec(spec) + sys.modules[namespace] = imported + spec.loader.exec_module(imported) - try: - del sys.modules[namespace] - except Exception: - pass + return namespace + except Exception as err: + error( + f'Error: {err}', + f'The above error was detected when loading the plugin: {path}', + ) - return namespace + try: + del sys.modules[namespace] + except Exception: + pass + + return '' def load_plugin(path: str | Path) -> None: - namespace: str | None = None - # Keep URL as string to preserve scheme (avoid Path normalization) - path_str = str(path) if isinstance(path, Path) else path - parsed_url = urllib.parse.urlparse(path_str) - info(f'Loading plugin from url {parsed_url}') - - # The Profile was not a direct match on a remote URL - if not parsed_url.scheme: - # Path was not found in any known examples, check if it's an absolute path - if os.path.isfile(path_str): - namespace = _import_via_path(Path(path_str)) - elif parsed_url.scheme in ('https', 'http'): - localized = _localize_path(path_str) - namespace = _import_via_path(localized) - - if namespace and namespace in sys.modules: - # Version dependency via __archinstall__version__ variable (if present) in the plugin - # Any errors in version inconsistency will be handled through normal error handling if not defined. - version = get_version() - - if version is not None: - version_major_and_minor = version.rsplit('.', 1)[0] - - # FIXED: Safely fetch the plugin version attribute, defaulting to "0.0" if missing - plugin_version_raw = getattr(sys.modules[namespace], '__archinstall__version__', '0.0') - - # FIXED: Safely parse versions into a tuple for integer comparison, preventing the float("2.10") == 2.1 bug - def parse_version(v: str | float) -> tuple[int, ...]: - return tuple(int(x) for x in str(v).split('.') if x.isdigit()) - - plugin_version = parse_version(plugin_version_raw) - system_version = parse_version(version_major_and_minor) - - if plugin_version < system_version: - error(f'Plugin {sys.modules[namespace]} does not support the current Archinstall version.') - - # Locate the plugin entry-point called Plugin() - # This in accordance with the entry_points() from setup.cfg above - if hasattr(sys.modules[namespace], 'Plugin'): - try: - plugins[namespace] = sys.modules[namespace].Plugin() - info(f'Plugin {plugins[namespace]} has been loaded.') - except Exception as err: - error( - f'Error: {err}', - f'The above error was detected when initiating the plugin: {path}', - ) - else: - warn(f"Plugin '{path}' is missing a valid entry-point or is corrupt.") \ No newline at end of file + namespace: str | None = None + # Keep URL as string to preserve scheme (avoid Path normalization) + path_str = str(path) if isinstance(path, Path) else path + parsed_url = urllib.parse.urlparse(path_str) + info(f'Loading plugin from url {parsed_url}') + + # The Profile was not a direct match on a remote URL + if not parsed_url.scheme: + # Path was not found in any known examples, check if it's an absolute path + if os.path.isfile(path_str): + namespace = _import_via_path(Path(path_str)) + elif parsed_url.scheme in ('https', 'http'): + localized = _localize_path(path_str) + namespace = _import_via_path(localized) + + if namespace and namespace in sys.modules: + # Version dependency via __archinstall__version__ variable (if present) in the plugin + # Any errors in version inconsistency will be handled through normal error handling if not defined. + version = get_version() + + if version is not None: + version_major_and_minor = version.rsplit('.', 1)[0] + + plugin_version_raw = getattr(sys.modules[namespace], '__archinstall__version__', '0.0') + + def parse_version(v: str | float) -> tuple[int, ...]: + return tuple(int(x) for x in str(v).split('.') if x.isdigit()) + + plugin_version = parse_version(plugin_version_raw) + system_version = parse_version(version_major_and_minor) + + if plugin_version and system_version and plugin_version < system_version: + error(f'Plugin {sys.modules[namespace]} does not support the current Archinstall version.') + + # Locate the plugin entry-point called Plugin() + # This in accordance with the entry_points() from setup.cfg above + if hasattr(sys.modules[namespace], 'Plugin'): + try: + plugins[namespace] = sys.modules[namespace].Plugin() + info(f'Plugin {plugins[namespace]} has been loaded.') + except Exception as err: + error( + f'Error: {err}', + f'The above error was detected when initiating the plugin: {path}', + ) + else: + warn(f"Plugin '{path}' is missing a valid entry-point or is corrupt.") diff --git a/tests/test_plugins.py b/tests/test_plugins.py new file mode 100644 index 0000000000..d7c70d1d38 --- /dev/null +++ b/tests/test_plugins.py @@ -0,0 +1,38 @@ +import urllib.parse +from pathlib import Path + +import pytest +from pytest import MonkeyPatch + +from archinstall.lib.args import ArchConfigHandler + + +def test_path_corrupts_https_url_authority_issue_3021() -> None: + """pathlib.Path is not safe for URL strings: POSIX normalization drops one slash after the scheme.""" + url = 'https://raw.githubusercontent.com/phisch/archinstall-aur/refs/heads/master/archinstall-aur.py' + broken = urllib.parse.urlparse(str(Path(url))) + assert broken.netloc == '' + assert broken.scheme == 'https' + + +def test_cli_https_plugin_passes_unparsed_string_to_load_plugin(monkeypatch: MonkeyPatch) -> None: + url = 'https://raw.githubusercontent.com/phisch/archinstall-aur/refs/heads/master/archinstall-aur.py' + received: list[object] = [] + + def capture(path: object) -> None: + received.append(path) + + monkeypatch.setattr('archinstall.lib.args.load_plugin', capture) + monkeypatch.setattr('sys.argv', ['archinstall', '--plugin', url]) + ArchConfigHandler() + assert len(received) == 1 + parsed = urllib.parse.urlparse(str(received[0])) + assert parsed.scheme == 'https' + assert parsed.netloc == 'raw.githubusercontent.com' + + +def test_localize_path_rejects_http() -> None: + from archinstall.lib.plugins import _localize_path + + with pytest.raises(ValueError, match='Insecure HTTP'): + _localize_path('http://example.com/plugin.py')