Skip to content

dynamic_imports

Dynamic module imports with multiprocessing spawn support.

Enables importing Python packages from arbitrary paths while maintaining compatibility with multiprocessing spawn workers (used by PyTorch DataLoader with num_workers > 0).

Uses cloudpickle to serialize dynamically imported modules by value, embedding class definitions in the pickle stream rather than storing module paths that workers can't resolve.

The key insight is that we need cloudpickle for user-defined classes (to serialize them by value), but we must preserve ForkingPickler's special reducers for multiprocessing internals (pipes, queues, connections, file descriptors). We achieve this by creating a hybrid pickler that inherits from ForkingPickler but also includes cloudpickle's dispatch table.

_DynamicModuleFinder

Bases: MetaPathFinder

Meta path finder for submodules of dynamically imported packages.

Source code in src/lighter/utils/dynamic_imports.py
class _DynamicModuleFinder(importlib.abc.MetaPathFinder):
    """Meta path finder for submodules of dynamically imported packages."""

    def find_spec(
        self,
        fullname: str,
        path: Sequence[str | bytes] | None,
        target: ModuleType | None = None,
    ) -> importlib.machinery.ModuleSpec | None:
        root = _registry.find_root(fullname)
        if root is None:
            return None

        root_name, root_path = root
        file_path = self._resolve_path(fullname, root_name, root_path)
        if file_path is None or not file_path.is_file():
            return None

        return importlib.machinery.ModuleSpec(
            fullname,
            _DynamicModuleLoader(str(file_path), fullname),
            origin=str(file_path),
            is_package=file_path.name == "__init__.py",
        )

    def _resolve_path(self, fullname: str, root_name: str, root_path: Path) -> Path | None:
        """Resolve 'project.models.net' to '/path/to/project/models/net.py'."""
        if fullname == root_name:
            return root_path / "__init__.py"

        relative = fullname[len(root_name) + 1 :].replace(".", "/")

        # Try package first, then module
        package_init = root_path / relative / "__init__.py"
        if package_init.is_file():
            return package_init
        return root_path / f"{relative}.py"

_resolve_path(fullname, root_name, root_path)

Resolve 'project.models.net' to '/path/to/project/models/net.py'.

Source code in src/lighter/utils/dynamic_imports.py
def _resolve_path(self, fullname: str, root_name: str, root_path: Path) -> Path | None:
    """Resolve 'project.models.net' to '/path/to/project/models/net.py'."""
    if fullname == root_name:
        return root_path / "__init__.py"

    relative = fullname[len(root_name) + 1 :].replace(".", "/")

    # Try package first, then module
    package_init = root_path / relative / "__init__.py"
    if package_init.is_file():
        return package_init
    return root_path / f"{relative}.py"

_DynamicModuleLoader

Bases: Loader

Loader that registers modules with cloudpickle for by-value serialization.

Source code in src/lighter/utils/dynamic_imports.py
class _DynamicModuleLoader(importlib.abc.Loader):
    """Loader that registers modules with cloudpickle for by-value serialization."""

    def __init__(self, filepath: str, fullname: str) -> None:
        self._filepath = filepath
        self._fullname = fullname

    def create_module(self, spec: importlib.machinery.ModuleSpec) -> ModuleType | None:
        return None

    def exec_module(self, module: ModuleType) -> None:
        importlib.machinery.SourceFileLoader(self._fullname, self._filepath).exec_module(module)
        cloudpickle.register_pickle_by_value(module)

_HybridPickler

Bases: ForkingPickler

Pickler combining ForkingPickler's reducers with cloudpickle's by-value serialization.

ForkingPickler has special reducers for multiprocessing internals (pipes, queues, connections, file descriptors) that must be preserved. cloudpickle can serialize dynamically defined classes by value. This hybrid uses both: ForkingPickler's dispatch table takes priority (for multiprocessing internals), then cloudpickle's reducer_override handles user-defined classes.

Source code in src/lighter/utils/dynamic_imports.py
class _HybridPickler(ForkingPickler):
    """Pickler combining ForkingPickler's reducers with cloudpickle's by-value serialization.

    ForkingPickler has special reducers for multiprocessing internals (pipes, queues,
    connections, file descriptors) that must be preserved. cloudpickle can serialize
    dynamically defined classes by value. This hybrid uses both: ForkingPickler's
    dispatch table takes priority (for multiprocessing internals), then cloudpickle's
    reducer_override handles user-defined classes.
    """

    def __init__(self, file: IO[bytes], protocol: int | None = None) -> None:
        super().__init__(file, protocol)
        # Merge cloudpickle's dispatch into our dispatch_table
        # ForkingPickler's reducers (from _extra_reducers) take priority
        cloudpickle_dispatch = cloudpickle.CloudPickler.dispatch.copy()
        cloudpickle_dispatch.update(self.dispatch_table)
        self.dispatch_table = cloudpickle_dispatch

    def reducer_override(self, obj: Any) -> Any:
        """Use cloudpickle's reducer for objects not in dispatch_table."""
        # If it's in ForkingPickler's extra reducers, let standard dispatch handle it
        # _extra_reducers is a class attribute that exists at runtime but isn't typed
        extra_reducers = cast(dict[type, Any], getattr(ForkingPickler, "_extra_reducers", {}))
        if type(obj) in extra_reducers:
            return NotImplemented

        # For everything else, try cloudpickle's reducer_override
        # This handles dynamically imported classes registered with register_pickle_by_value
        pickler = cloudpickle.CloudPickler(BytesIO())
        return pickler.reducer_override(obj)

reducer_override(obj)

Use cloudpickle's reducer for objects not in dispatch_table.

Source code in src/lighter/utils/dynamic_imports.py
def reducer_override(self, obj: Any) -> Any:
    """Use cloudpickle's reducer for objects not in dispatch_table."""
    # If it's in ForkingPickler's extra reducers, let standard dispatch handle it
    # _extra_reducers is a class attribute that exists at runtime but isn't typed
    extra_reducers = cast(dict[type, Any], getattr(ForkingPickler, "_extra_reducers", {}))
    if type(obj) in extra_reducers:
        return NotImplemented

    # For everything else, try cloudpickle's reducer_override
    # This handles dynamically imported classes registered with register_pickle_by_value
    pickler = cloudpickle.CloudPickler(BytesIO())
    return pickler.reducer_override(obj)

_ModuleRegistry

Maps dynamically imported module names to their filesystem paths.

Source code in src/lighter/utils/dynamic_imports.py
class _ModuleRegistry:
    """Maps dynamically imported module names to their filesystem paths."""

    def __init__(self) -> None:
        self._modules: dict[str, Path] = {}

    def register(self, name: str, path: Path) -> None:
        self._modules[name] = path

    def get(self, name: str) -> Path | None:
        """Get the registered path for a module name."""
        return self._modules.get(name)

    def find_root(self, fullname: str) -> tuple[str, Path] | None:
        """Find the registered root module for an import name (e.g., 'project.sub' -> 'project')."""
        for name, path in self._modules.items():
            if fullname == name or fullname.startswith(f"{name}."):
                return name, path
        return None

find_root(fullname)

Find the registered root module for an import name (e.g., 'project.sub' -> 'project').

Source code in src/lighter/utils/dynamic_imports.py
def find_root(self, fullname: str) -> tuple[str, Path] | None:
    """Find the registered root module for an import name (e.g., 'project.sub' -> 'project')."""
    for name, path in self._modules.items():
        if fullname == name or fullname.startswith(f"{name}."):
            return name, path
    return None

get(name)

Get the registered path for a module name.

Source code in src/lighter/utils/dynamic_imports.py
def get(self, name: str) -> Path | None:
    """Get the registered path for a module name."""
    return self._modules.get(name)

_hybrid_dump(obj, file, protocol=None)

Replacement for reduction.dump using hybrid pickler.

Source code in src/lighter/utils/dynamic_imports.py
def _hybrid_dump(obj: Any, file: IO[bytes], protocol: int | None = None) -> None:
    """Replacement for reduction.dump using hybrid pickler."""
    _HybridPickler(file, protocol).dump(obj)

import_module_from_path(module_name, module_path)

Import a package from a filesystem path with multiprocessing support.

Parameters:

Name Type Description Default
module_name str

Name to assign to the module (e.g., "project").

required
module_path Path | str

Path to the package directory (must contain init.py).

required

Returns:

Type Description
ModuleType

The imported module.

Raises:

Type Description
FileNotFoundError

If module_path doesn't contain init.py.

ModuleNotFoundError

If the module cannot be loaded.

ValueError

If module_name was already imported from a different path.

Example

import_module_from_path("project", "/path/to/project") from project.models import MyModel # Works in DataLoader workers!

Source code in src/lighter/utils/dynamic_imports.py
def import_module_from_path(module_name: str, module_path: Path | str) -> ModuleType:
    """Import a package from a filesystem path with multiprocessing support.

    Args:
        module_name: Name to assign to the module (e.g., "project").
        module_path: Path to the package directory (must contain __init__.py).

    Returns:
        The imported module.

    Raises:
        FileNotFoundError: If module_path doesn't contain __init__.py.
        ModuleNotFoundError: If the module cannot be loaded.
        ValueError: If module_name was already imported from a different path.

    Example:
        >>> import_module_from_path("project", "/path/to/project")
        >>> from project.models import MyModel  # Works in DataLoader workers!
    """
    module_path = Path(module_path).resolve()

    # Check if already imported
    if module_name in sys.modules:
        existing_path = _registry.get(module_name)
        if existing_path is not None and existing_path != module_path:
            raise ValueError(f"Module '{module_name}' was already imported from '{existing_path}'.")
        # Same path - return cached module (normal Python behavior)
        return sys.modules[module_name]

    init_file = module_path / "__init__.py"

    if not init_file.is_file():
        raise FileNotFoundError(f"No __init__.py in '{module_path}'.")

    spec = importlib.util.spec_from_file_location(module_name, str(init_file))
    if spec is None or spec.loader is None:
        raise ModuleNotFoundError(f"Could not load '{module_name}' from '{module_path}'.")

    module = importlib.util.module_from_spec(spec)
    sys.modules[module_name] = module
    try:
        spec.loader.exec_module(module)
        cloudpickle.register_pickle_by_value(module)
        _registry.register(module_name, module_path)
    except Exception:
        # Clean up on failure so retry sees a clean state
        sys.modules.pop(module_name, None)
        raise

    return module