Skip to content

Index

sparkwheel: A powerful YAML-based configuration system with references, expressions, and dynamic instantiation.

Uses YAML format only.

BaseError

Bases: Exception

Base exception for sparkwheel with rich error context.

Attributes:

Name Type Description
message

The error message

source_location

Optional location in config file where error occurred

suggestion

Optional helpful suggestion for fixing the error

Source code in src/sparkwheel/utils/exceptions.py
class BaseError(Exception):
    """Base exception for sparkwheel with rich error context.

    Attributes:
        message: The error message
        source_location: Optional location in config file where error occurred
        suggestion: Optional helpful suggestion for fixing the error
    """

    def __init__(
        self,
        message: str,
        source_location: SourceLocation | None = None,
        suggestion: str | None = None,
    ) -> None:
        self.source_location = source_location
        self.suggestion = suggestion
        self._original_message = message
        super().__init__(self._format_message())

    def _format_message(self) -> str:
        """Format error message with source location and suggestions.

        Critical info (file:line) is on the first line for Rich compatibility,
        since Rich's traceback only shows the first line of exception messages.
        """
        parts = []

        # Put file:line on the FIRST line for Rich visibility
        if self.source_location:
            location = f"{self.source_location.filepath}:{self.source_location.line}"
            if self.source_location.id:
                parts.append(f"[{location} @ {self.source_location.id}] {self._original_message}")
            else:
                parts.append(f"[{location}] {self._original_message}")
        else:
            parts.append(self._original_message)

        # Add code snippet on subsequent lines (will be visible in full traceback)
        if self.source_location:
            snippet = self._get_config_snippet()
            if snippet:
                parts.append(f"\n\n{snippet}")

        if self.suggestion:
            parts.append(f"\n\n  💡 {self.suggestion}")

        return "".join(parts)

    def _get_config_snippet(self) -> str:
        """Extract and format a code snippet from the config file."""
        if not self.source_location:
            return ""

        try:
            filepath = Path(self.source_location.filepath)
            if not filepath.exists():
                return ""

            with open(filepath) as f:
                lines = f.readlines()

            # Show 2 lines before and 1 line after the error
            line_num = self.source_location.line
            start = max(0, line_num - 3)
            end = min(len(lines), line_num + 2)

            snippet_lines = []
            for i in range(start, end):
                marker = "→" if i == line_num - 1 else " "
                # Use 4-digit line numbers for alignment
                snippet_lines.append(f"  {marker} {i + 1:4d} │ {lines[i].rstrip()}")

            return "\n".join(snippet_lines)
        except Exception:
            # If we can't read the file, just skip the snippet
            return ""

_format_message()

Format error message with source location and suggestions.

Critical info (file:line) is on the first line for Rich compatibility, since Rich's traceback only shows the first line of exception messages.

Source code in src/sparkwheel/utils/exceptions.py
def _format_message(self) -> str:
    """Format error message with source location and suggestions.

    Critical info (file:line) is on the first line for Rich compatibility,
    since Rich's traceback only shows the first line of exception messages.
    """
    parts = []

    # Put file:line on the FIRST line for Rich visibility
    if self.source_location:
        location = f"{self.source_location.filepath}:{self.source_location.line}"
        if self.source_location.id:
            parts.append(f"[{location} @ {self.source_location.id}] {self._original_message}")
        else:
            parts.append(f"[{location}] {self._original_message}")
    else:
        parts.append(self._original_message)

    # Add code snippet on subsequent lines (will be visible in full traceback)
    if self.source_location:
        snippet = self._get_config_snippet()
        if snippet:
            parts.append(f"\n\n{snippet}")

    if self.suggestion:
        parts.append(f"\n\n  💡 {self.suggestion}")

    return "".join(parts)

_get_config_snippet()

Extract and format a code snippet from the config file.

Source code in src/sparkwheel/utils/exceptions.py
def _get_config_snippet(self) -> str:
    """Extract and format a code snippet from the config file."""
    if not self.source_location:
        return ""

    try:
        filepath = Path(self.source_location.filepath)
        if not filepath.exists():
            return ""

        with open(filepath) as f:
            lines = f.readlines()

        # Show 2 lines before and 1 line after the error
        line_num = self.source_location.line
        start = max(0, line_num - 3)
        end = min(len(lines), line_num + 2)

        snippet_lines = []
        for i in range(start, end):
            marker = "→" if i == line_num - 1 else " "
            # Use 4-digit line numbers for alignment
            snippet_lines.append(f"  {marker} {i + 1:4d} │ {lines[i].rstrip()}")

        return "\n".join(snippet_lines)
    except Exception:
        # If we can't read the file, just skip the snippet
        return ""

CircularReferenceError

Bases: BaseError

Raised when circular references are detected in config.

Source code in src/sparkwheel/utils/exceptions.py
class CircularReferenceError(BaseError):
    """Raised when circular references are detected in config."""

    pass

Component

Bases: Item, Instantiable

Component that can be instantiated from configuration.

Uses a dictionary with string keys to represent a Python class or function that can be dynamically instantiated. Other keys are passed as arguments to the target component.

Example
from sparkwheel import Component
from collections import Counter

config = {
    "_target_": "collections.Counter",
    "iterable": [1, 2, 2, 3, 3, 3]
}

component = Component(config, id="counter")
counter = component.instantiate()
print(counter)  # Counter({3: 3, 2: 2, 1: 1})

Parameters:

Name Type Description Default
config Any

Configuration content

required
id str

Identifier for this config item, defaults to ""

''
Note

Special configuration keys:

  • _target_: Full module path (e.g., "collections.Counter")
  • _requires_: Dependencies to evaluate/instantiate first
  • _disabled_: Skip instantiation if True
  • _mode_: Instantiation mode:
    • "default": Returns component(**kwargs)
    • "callable": Returns functools.partial(component, **kwargs)
    • "debug": Returns pdb.runcall(component, **kwargs)
Source code in src/sparkwheel/items.py
class Component(Item, Instantiable):
    """Component that can be instantiated from configuration.

    Uses a dictionary with string keys to represent a Python class or function
    that can be dynamically instantiated. Other keys are passed as arguments
    to the target component.

    Example:
        ```python
        from sparkwheel import Component
        from collections import Counter

        config = {
            "_target_": "collections.Counter",
            "iterable": [1, 2, 2, 3, 3, 3]
        }

        component = Component(config, id="counter")
        counter = component.instantiate()
        print(counter)  # Counter({3: 3, 2: 2, 1: 1})
        ```

    Args:
        config: Configuration content
        id: Identifier for this config item, defaults to ""

    Note:
        Special configuration keys:

        - `_target_`: Full module path (e.g., "collections.Counter")
        - `_requires_`: Dependencies to evaluate/instantiate first
        - `_disabled_`: Skip instantiation if True
        - `_mode_`: Instantiation mode:
            - `"default"`: Returns component(**kwargs)
            - `"callable"`: Returns functools.partial(component, **kwargs)
            - `"debug"`: Returns pdb.runcall(component, **kwargs)
    """

    non_arg_keys = {"_target_", "_disabled_", "_requires_", "_mode_"}

    def __init__(self, config: Any, id: str = "", source_location: SourceLocation | None = None) -> None:
        super().__init__(config=config, id=id, source_location=source_location)

    @staticmethod
    def is_instantiable(config: Any) -> bool:
        """
        Check whether this config represents a `class` or `function` that is to be instantiated.

        Args:
            config: input config content to check.
        """
        return isinstance(config, Mapping) and "_target_" in config

    def resolve_module_name(self):
        """Resolve the target module name from configuration.

        Requires full module path (e.g., "collections.Counter").
        No automatic module discovery is performed.

        Returns:
            str or callable: The module path or callable from _target_
        """
        config = dict(self.get_config())
        target = config.get("_target_")
        if not isinstance(target, str):
            return target  # for cases where _target_ is already a callable

        # No ComponentLocator - just return the target as-is (must be full path)
        return target

    def resolve_args(self):
        """
        Utility function used in `instantiate()` to resolve the arguments from current config content.
        """
        config = self.get_config()
        if not isinstance(config, Mapping):
            raise TypeError(
                f"Expected config to be a Mapping (dict-like), but got {type(config).__name__}. "
                f"Cannot resolve arguments from non-mapping config."
            )
        return {k: v for k, v in config.items() if k not in self.non_arg_keys}

    def is_disabled(self) -> bool:
        """
        Utility function used in `instantiate()` to check whether to skip the instantiation.
        """
        _is_disabled = self.get_config().get("_disabled_", False)
        return _is_disabled.lower().strip() == "true" if isinstance(_is_disabled, str) else bool(_is_disabled)

    def instantiate(self, **kwargs: Any) -> object:
        """
        Instantiate component based on ``self.config`` content.
        The target component must be a `class` or a `function`, otherwise, return `None`.

        Args:
            kwargs: args to override / add the config args when instantiation.
        """
        if not self.is_instantiable(self.get_config()) or self.is_disabled():
            # if not a class or function or marked as `disabled`, skip parsing and return `None`
            return None

        modname = self.resolve_module_name()
        mode = self.get_config().get("_mode_", CompInitMode.DEFAULT)
        args = self.resolve_args()
        args.update(kwargs)

        try:
            return instantiate(modname, mode, **args)
        except ModuleNotFoundError as e:
            # Re-raise with source location and suggestions
            suggestion = self._suggest_similar_modules(modname) if isinstance(modname, str) else None
            raise ModuleNotFoundError(
                f"Cannot locate class or function: '{modname}'",
                source_location=self.source_location,
                suggestion=suggestion,
            ) from e
        except Exception as e:
            # Wrap other errors with location context (points to _target_ line)
            raise InstantiationError(
                f"Failed to instantiate '{modname}': {type(e).__name__}: {e}",
                source_location=self.source_location,
            ) from e

    def _suggest_similar_modules(self, target: str) -> str | None:
        """Suggest similar valid module names using fuzzy matching.

        Args:
            target: The module path that couldn't be found (e.g., 'torch.optim.Adamfad')

        Returns:
            A helpful suggestion string, or None if no good suggestions found.
        """
        if not isinstance(target, str) or "." not in target:
            return None

        try:
            from pydoc import locate

            from .utils import damerau_levenshtein_distance

            # Split into module path and attribute name
            parts = target.rsplit(".", 1)
            base_module, attr_name = parts[0], parts[1]

            # Try to import the base module
            base = locate(base_module)
            if base is None:
                return None

            # Find similar attribute names in the module
            similar = []
            for name in dir(base):
                if name.startswith("_"):
                    continue
                distance = damerau_levenshtein_distance(name, attr_name)
                if distance <= 2:  # Allow up to 2 character edits
                    similar.append((distance, name))

            if similar:
                # Sort by distance and return the closest match
                similar.sort(key=lambda x: x[0])
                closest = similar[0][1]
                return f"Did you mean '{base_module}.{closest}'?"
        except Exception:
            # If anything fails during suggestion generation, just skip it
            pass

        return None

_suggest_similar_modules(target)

Suggest similar valid module names using fuzzy matching.

Parameters:

Name Type Description Default
target str

The module path that couldn't be found (e.g., 'torch.optim.Adamfad')

required

Returns:

Type Description
str | None

A helpful suggestion string, or None if no good suggestions found.

Source code in src/sparkwheel/items.py
def _suggest_similar_modules(self, target: str) -> str | None:
    """Suggest similar valid module names using fuzzy matching.

    Args:
        target: The module path that couldn't be found (e.g., 'torch.optim.Adamfad')

    Returns:
        A helpful suggestion string, or None if no good suggestions found.
    """
    if not isinstance(target, str) or "." not in target:
        return None

    try:
        from pydoc import locate

        from .utils import damerau_levenshtein_distance

        # Split into module path and attribute name
        parts = target.rsplit(".", 1)
        base_module, attr_name = parts[0], parts[1]

        # Try to import the base module
        base = locate(base_module)
        if base is None:
            return None

        # Find similar attribute names in the module
        similar = []
        for name in dir(base):
            if name.startswith("_"):
                continue
            distance = damerau_levenshtein_distance(name, attr_name)
            if distance <= 2:  # Allow up to 2 character edits
                similar.append((distance, name))

        if similar:
            # Sort by distance and return the closest match
            similar.sort(key=lambda x: x[0])
            closest = similar[0][1]
            return f"Did you mean '{base_module}.{closest}'?"
    except Exception:
        # If anything fails during suggestion generation, just skip it
        pass

    return None

instantiate(**kwargs)

Instantiate component based on self.config content. The target component must be a class or a function, otherwise, return None.

Parameters:

Name Type Description Default
kwargs Any

args to override / add the config args when instantiation.

{}
Source code in src/sparkwheel/items.py
def instantiate(self, **kwargs: Any) -> object:
    """
    Instantiate component based on ``self.config`` content.
    The target component must be a `class` or a `function`, otherwise, return `None`.

    Args:
        kwargs: args to override / add the config args when instantiation.
    """
    if not self.is_instantiable(self.get_config()) or self.is_disabled():
        # if not a class or function or marked as `disabled`, skip parsing and return `None`
        return None

    modname = self.resolve_module_name()
    mode = self.get_config().get("_mode_", CompInitMode.DEFAULT)
    args = self.resolve_args()
    args.update(kwargs)

    try:
        return instantiate(modname, mode, **args)
    except ModuleNotFoundError as e:
        # Re-raise with source location and suggestions
        suggestion = self._suggest_similar_modules(modname) if isinstance(modname, str) else None
        raise ModuleNotFoundError(
            f"Cannot locate class or function: '{modname}'",
            source_location=self.source_location,
            suggestion=suggestion,
        ) from e
    except Exception as e:
        # Wrap other errors with location context (points to _target_ line)
        raise InstantiationError(
            f"Failed to instantiate '{modname}': {type(e).__name__}: {e}",
            source_location=self.source_location,
        ) from e

is_disabled()

Utility function used in instantiate() to check whether to skip the instantiation.

Source code in src/sparkwheel/items.py
def is_disabled(self) -> bool:
    """
    Utility function used in `instantiate()` to check whether to skip the instantiation.
    """
    _is_disabled = self.get_config().get("_disabled_", False)
    return _is_disabled.lower().strip() == "true" if isinstance(_is_disabled, str) else bool(_is_disabled)

is_instantiable(config) staticmethod

Check whether this config represents a class or function that is to be instantiated.

Parameters:

Name Type Description Default
config Any

input config content to check.

required
Source code in src/sparkwheel/items.py
@staticmethod
def is_instantiable(config: Any) -> bool:
    """
    Check whether this config represents a `class` or `function` that is to be instantiated.

    Args:
        config: input config content to check.
    """
    return isinstance(config, Mapping) and "_target_" in config

resolve_args()

Utility function used in instantiate() to resolve the arguments from current config content.

Source code in src/sparkwheel/items.py
def resolve_args(self):
    """
    Utility function used in `instantiate()` to resolve the arguments from current config content.
    """
    config = self.get_config()
    if not isinstance(config, Mapping):
        raise TypeError(
            f"Expected config to be a Mapping (dict-like), but got {type(config).__name__}. "
            f"Cannot resolve arguments from non-mapping config."
        )
    return {k: v for k, v in config.items() if k not in self.non_arg_keys}

resolve_module_name()

Resolve the target module name from configuration.

Requires full module path (e.g., "collections.Counter"). No automatic module discovery is performed.

Returns:

Type Description

str or callable: The module path or callable from target

Source code in src/sparkwheel/items.py
def resolve_module_name(self):
    """Resolve the target module name from configuration.

    Requires full module path (e.g., "collections.Counter").
    No automatic module discovery is performed.

    Returns:
        str or callable: The module path or callable from _target_
    """
    config = dict(self.get_config())
    target = config.get("_target_")
    if not isinstance(target, str):
        return target  # for cases where _target_ is already a callable

    # No ComponentLocator - just return the target as-is (must be full path)
    return target

Config

Configuration management with continuous validation, coercion, resolved references, and instantiation.

Main entry point for loading, managing, and resolving configurations. Supports YAML files with resolved references (@), raw references (%), expressions ($), and dynamic instantiation (target).

Example
from sparkwheel import Config

# Create and load from file
config = Config(schema=MySchema).update("config.yaml")

# Or chain multiple sources
config = (Config(schema=MySchema)
          .update("base.yaml")
          .update("override.yaml")
          .update({"model::lr": 0.001}))

# Access raw values
lr = config.get("model::lr")

# Set values (validates automatically if schema provided)
config.set("model::dropout", 0.1)

# Freeze to prevent modifications
config.freeze()

# Resolve references and instantiate
model = config.resolve("model")
everything = config.resolve()

Parameters:

Name Type Description Default
globals dict[str, Any] | None

Pre-imported packages for expressions (e.g., {"torch": "torch"})

None
schema type | None

Dataclass schema for continuous validation

None
coerce bool

Auto-convert compatible types (default: True)

True
strict bool

Reject fields not in schema (default: True)

True
allow_missing bool

Allow MISSING sentinel values (default: False)

False
Source code in src/sparkwheel/config.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
class Config:
    """Configuration management with continuous validation, coercion, resolved references, and instantiation.

    Main entry point for loading, managing, and resolving configurations.
    Supports YAML files with resolved references (@), raw references (%), expressions ($),
    and dynamic instantiation (_target_).

    Example:
        ```python
        from sparkwheel import Config

        # Create and load from file
        config = Config(schema=MySchema).update("config.yaml")

        # Or chain multiple sources
        config = (Config(schema=MySchema)
                  .update("base.yaml")
                  .update("override.yaml")
                  .update({"model::lr": 0.001}))

        # Access raw values
        lr = config.get("model::lr")

        # Set values (validates automatically if schema provided)
        config.set("model::dropout", 0.1)

        # Freeze to prevent modifications
        config.freeze()

        # Resolve references and instantiate
        model = config.resolve("model")
        everything = config.resolve()
        ```

    Args:
        globals: Pre-imported packages for expressions (e.g., {"torch": "torch"})
        schema: Dataclass schema for continuous validation
        coerce: Auto-convert compatible types (default: True)
        strict: Reject fields not in schema (default: True)
        allow_missing: Allow MISSING sentinel values (default: False)
    """

    def __init__(
        self,
        data: dict[str, Any] | None = None,  # Internal/testing use only
        *,  # Rest are keyword-only
        globals: dict[str, Any] | None = None,
        schema: type | None = None,
        coerce: bool = True,
        strict: bool = True,
        allow_missing: bool = False,
    ):
        """Initialize Config container.

        Normally starts empty - use update() to load data.

        Args:
            data: Initial data (internal/testing use only, not validated)
            globals: Pre-imported packages for expression evaluation
            schema: Dataclass schema for continuous validation
            coerce: Auto-convert compatible types
            strict: Reject fields not in schema
            allow_missing: Allow MISSING sentinel values

        Examples:
            >>> config = Config(schema=MySchema)
            >>> config.update("config.yaml")

            >>> # Chaining
            >>> config = Config(schema=MySchema).update("config.yaml")
        """
        self._data: dict[str, Any] = data or {}  # Start with provided data or empty
        self._metadata = MetadataRegistry()
        self._resolver = Resolver()
        self._is_parsed = False
        self._frozen = False  # Set via freeze() method later

        # Schema validation state
        self._schema: type | None = schema
        self._coerce: bool = coerce
        self._strict: bool = strict
        self._allow_missing: bool = allow_missing

        # Process globals (import string module paths)
        self._globals: dict[str, Any] = {}
        if isinstance(globals, dict):
            for k, v in globals.items():
                self._globals[k] = optional_import(v)[0] if isinstance(v, str) else v

        self._loader = Loader()
        self._preprocessor = Preprocessor(self._loader, self._globals)

    def get(self, id: str = "", default: Any = None) -> Any:
        """Get raw config value (unresolved).

        Args:
            id: Configuration path (use :: for nesting, e.g., "model::lr")
                Empty string returns entire config
            default: Default value if id not found

        Returns:
            Raw configuration value (resolved references not resolved, raw references not expanded)

        Example:
            >>> config = Config.load({"model": {"lr": 0.001, "ref": "@model::lr"}})
            >>> config.get("model::lr")
            0.001
            >>> config.get("model::ref")
            "@model::lr"  # Unresolved resolved reference
        """
        try:
            return self._get_by_id(id)
        except (KeyError, IndexError, ValueError):
            return default

    def set(self, id: str, value: Any) -> None:
        """Set config value, creating paths as needed.

        Args:
            id: Configuration path (use :: for nesting)
            value: Value to set

        Raises:
            FrozenConfigError: If config is frozen

        Example:
            >>> config = Config()
            >>> config.set("model::lr", 0.001)
            >>> config.get("model::lr")
            0.001
        """
        from .utils.exceptions import FrozenConfigError

        # Check frozen state
        if self._frozen:
            raise FrozenConfigError("Cannot modify frozen config", field_path=id)

        if id == "":
            self._data = value
            self._invalidate_resolution()
            return

        keys = split_id(id)

        # Ensure root is dict
        if not isinstance(self._data, dict):
            self._data = {}  # type: ignore[unreachable]

        # Create missing intermediate paths
        current = self._data
        for k in keys[:-1]:
            if k not in current:
                current[k] = {}
            elif not isinstance(current[k], dict):
                current[k] = {}
            current = current[k]

        # Set final value
        current[keys[-1]] = value
        self._invalidate_resolution()

    def validate(self, schema: type) -> None:
        """Validate configuration against a dataclass schema.

        Args:
            schema: Dataclass type defining the expected structure and types

        Raises:
            ValidationError: If configuration doesn't match schema
            TypeError: If schema is not a dataclass

        Example:
            >>> from dataclasses import dataclass
            >>> @dataclass
            ... class ModelConfig:
            ...     hidden_size: int
            ...     dropout: float
            >>> config = Config.load({"hidden_size": 512, "dropout": 0.1})
            >>> config.validate(ModelConfig)  # Passes
            >>> bad_config = Config.load({"hidden_size": "not an int"})
            >>> bad_config.validate(ModelConfig)  # Raises ValidationError
        """
        from .schema import validate as validate_schema

        validate_schema(self._data, schema, metadata=self._metadata)

    def freeze(self) -> None:
        """Freeze config to prevent further modifications.

        After freezing:
        - set() raises FrozenConfigError
        - update() raises FrozenConfigError
        - resolve() still works (read-only)
        - get() still works (read-only)

        Example:
            >>> config = Config(schema=MySchema).update("config.yaml")
            >>> config.freeze()
            >>> config.set("model::lr", 0.001)  # Raises FrozenConfigError
        """
        self._frozen = True

    def unfreeze(self) -> None:
        """Unfreeze config to allow modifications."""
        self._frozen = False

    def is_frozen(self) -> bool:
        """Check if config is frozen.

        Returns:
            True if frozen, False otherwise
        """
        return self._frozen

    def update(self, source: PathLike | dict[str, Any] | "Config" | str) -> "Config":
        """Update configuration with changes from another source.

        Auto-detects strings as either file paths or CLI overrides:
        - Strings with '=' are parsed as overrides (e.g., "key=value", "=key=value", "~key")
        - Strings without '=' are treated as file paths
        - Dicts and Config instances work as before

        Args:
            source: File path, override string, dict, or Config instance to update from

        Returns:
            self (for chaining)

        Operators:
            - key=value      - Compose (default): merge dict or extend list
            - =key=value     - Replace operator: completely replace value
            - ~key           - Remove operator: delete key (idempotent)

        Examples:
            >>> # Update from file
            >>> config.update("base.yaml")

            >>> # Update from override string (auto-detected)
            >>> config.update("model::lr=0.001")

            >>> # Chain multiple updates (mixed files and overrides)
            >>> config = (Config(schema=MySchema)
            ...           .update("base.yaml")
            ...           .update("exp.yaml")
            ...           .update("optimizer::lr=0.01")
            ...           .update("=model={'_target_': 'MyModel'}")
            ...           .update("~debug"))

            >>> # Update from dict
            >>> config.update({"model": {"dropout": 0.1}})

            >>> # Update from another Config instance
            >>> config1 = Config()
            >>> config2 = Config().update({"model::lr": 0.001})
            >>> config1.update(config2)

            >>> # CLI integration pattern (just loop!)
            >>> for item in cli_args:
            ...     config.update(item)
        """
        from .utils.exceptions import FrozenConfigError

        if self._frozen:
            raise FrozenConfigError("Cannot update frozen config")

        if isinstance(source, Config):
            self._update_from_config(source)
        elif isinstance(source, dict):
            if self._uses_nested_paths(source):
                self._apply_path_updates(source)
            else:
                self._apply_structural_update(source)
        elif isinstance(source, str) and ("=" in source or source.startswith("~")):
            # Auto-detect override string (key=value, =key=value, ~key)
            self._update_from_override_string(source)
        else:
            self._update_from_file(source)

        # Validate after update if schema exists
        if self._schema:
            from .schema import validate as validate_schema

            validate_schema(
                self._data,
                self._schema,
                metadata=self._metadata,
                allow_missing=self._allow_missing,
                strict=self._strict,
            )

        return self  # Enable chaining

    def _update_from_config(self, source: "Config") -> None:
        """Update from another Config instance."""
        self._data = apply_operators(self._data, source._data)
        self._metadata.merge(source._metadata)
        self._invalidate_resolution()

    def _uses_nested_paths(self, source: dict[str, Any]) -> bool:
        """Check if dict uses :: path syntax."""
        return any(ID_SEP_KEY in str(k).lstrip(REPLACE_KEY).lstrip(REMOVE_KEY) for k in source.keys())

    def _apply_path_updates(self, source: dict[str, Any]) -> None:
        """Apply nested path updates (e.g., model::lr=value, =model=replace, ~old::param=null)."""
        for key, value in source.items():
            if not isinstance(key, str):
                self.set(str(key), value)  # type: ignore[unreachable]
                continue

            if key.startswith(REPLACE_KEY):
                # Replace operator: =key (explicit override)
                actual_key = key[1:]
                self.set(actual_key, value)

            elif key.startswith(REMOVE_KEY):
                # Delete operator: ~key (idempotent)
                actual_key = key[1:]
                _validate_delete_operator(actual_key, value)

                if actual_key in self:
                    self._delete_nested_key(actual_key)

            else:
                # Default: compose (merge dict or extend list)
                if key in self and isinstance(self[key], dict) and isinstance(value, dict):
                    merged = apply_operators(self[key], value)
                    self.set(key, merged)
                elif key in self and isinstance(self[key], list) and isinstance(value, list):
                    self.set(key, self[key] + value)
                else:
                    # Normal set (handles nested paths with ::)
                    self.set(key, value)

    def _delete_nested_key(self, key: str) -> None:
        """Delete a key, supporting nested paths with ::."""
        if ID_SEP_KEY in key:
            keys = split_id(key)
            parent_id = ID_SEP_KEY.join(keys[:-1])
            parent = self[parent_id] if parent_id else self._data
            if isinstance(parent, dict) and keys[-1] in parent:
                del parent[keys[-1]]
        else:
            # Top-level key
            if isinstance(self._data, dict) and key in self._data:
                del self._data[key]
        self._invalidate_resolution()

    def _apply_structural_update(self, source: dict[str, Any]) -> None:
        """Apply structural update with operators."""
        validate_operators(source)
        self._data = apply_operators(self._data, source)
        self._invalidate_resolution()

    def _update_from_file(self, source: PathLike) -> None:
        """Load and update from a file."""
        new_data, new_metadata = self._loader.load_file(source)
        validate_operators(new_data)
        self._data = apply_operators(self._data, new_data)
        self._metadata.merge(new_metadata)
        self._invalidate_resolution()

    def _update_from_override_string(self, override: str) -> None:
        """Parse and apply a single override string (e.g., 'key=value', '=key=value', '~key')."""
        overrides_dict = parse_overrides([override])
        self._apply_path_updates(overrides_dict)

    def resolve(
        self,
        id: str = "",
        instantiate: bool = True,
        eval_expr: bool = True,
        lazy: bool = True,
        default: Any = None,
    ) -> Any:
        """Resolve resolved references (@) and return parsed config.

        Automatically parses config on first call. Resolves @ resolved references (follows
        them to get instantiated/evaluated values), evaluates $ expressions, and
        instantiates _target_ components. Note: % raw references are expanded during
        preprocessing (before this stage).

        Args:
            id: Config path to resolve (empty string for entire config)
            instantiate: Whether to instantiate components with _target_
            eval_expr: Whether to evaluate $ expressions
            lazy: Whether to use cached resolution
            default: Default value if id not found (returns default.get_config() if Item)

        Returns:
            Resolved value (instantiated objects, evaluated expressions, etc.)

        Example:
            >>> config = Config.load({
            ...     "lr": 0.001,
            ...     "doubled": "$@lr * 2",
            ...     "optimizer": {
            ...         "_target_": "torch.optim.Adam",
            ...         "lr": "@lr"
            ...     }
            ... })
            >>> config.resolve("lr")
            0.001
            >>> config.resolve("doubled")
            0.002
            >>> optimizer = config.resolve("optimizer")
            >>> type(optimizer).__name__
            'Adam'
        """
        # Parse if needed
        if not self._is_parsed or not lazy:
            self._parse()

        # Resolve and return
        try:
            return self._resolver.resolve(id=id, instantiate=instantiate, eval_expr=eval_expr)
        except (KeyError, ConfigKeyError):
            if default is not None:
                # If default is an Item, return its config
                from .items import Item

                if isinstance(default, Item):
                    return default.get_config()
                return default
            raise

    def _parse(self, reset: bool = True) -> None:
        """Parse config tree and prepare for resolution.

        Internal method called automatically by resolve().

        Args:
            reset: Whether to reset the resolver before parsing (default: True)
        """
        # Reset resolver if requested
        if reset:
            self._resolver.reset()

        # Stage 1: Preprocess (% raw references, @:: relative resolved IDs)
        self._data = self._preprocessor.process(self._data, self._data, id="")

        # Stage 2: Parse config tree to create Items
        parser = Parser(globals=self._globals, metadata=self._metadata)
        items = parser.parse(self._data)

        # Stage 3: Add items to resolver
        self._resolver.add_items(items)

        self._is_parsed = True

    def _get_by_id(self, id: str) -> Any:
        """Get config value by ID path.

        Args:
            id: ID path (e.g., "model::lr")

        Returns:
            Config value at that path

        Raises:
            KeyError: If path not found
        """
        if id == "":
            return self._data

        config = self._data
        for k in split_id(id):
            if not isinstance(config, (dict, list)):
                raise ValueError(f"Config must be dict or list for key `{k}`, but got {type(config)}: {config}")
            try:
                config = look_up_option(k, config, print_all_options=False) if isinstance(config, dict) else config[int(k)]
            except ValueError as e:
                raise KeyError(f"Key not found: {k}") from e

        return config

    def _invalidate_resolution(self) -> None:
        """Invalidate cached resolution (called when config changes)."""
        self._is_parsed = False
        self._resolver.reset()

    def __getitem__(self, id: str) -> Any:
        """Get config value by ID (subscript access).

        Args:
            id: Configuration path

        Returns:
            Config value at that path

        Example:
            >>> config = Config.load({"model": {"lr": 0.001}})
            >>> config["model::lr"]
            0.001
        """
        return self._get_by_id(id)

    def __setitem__(self, id: str, value: Any) -> None:
        """Set config value by ID (subscript access).

        Args:
            id: Configuration path
            value: Value to set

        Example:
            >>> config = Config.load({})
            >>> config["model::lr"] = 0.001
        """
        self.set(id, value)

    def __contains__(self, id: str) -> bool:
        """Check if ID exists in config.

        Args:
            id: ID path to check

        Returns:
            True if exists, False otherwise
        """
        try:
            self._get_by_id(id)
            return True
        except (KeyError, IndexError, ValueError):
            return False

    def __repr__(self) -> str:
        """String representation of config."""
        return f"Config({self._data})"

    @staticmethod
    def export_config_file(config: dict[str, Any], filepath: PathLike, **kwargs: Any) -> None:
        """Export config to YAML file.

        Args:
            config: Config dict to export
            filepath: Target file path
            kwargs: Additional arguments for yaml.safe_dump
        """
        import yaml  # type: ignore[import-untyped]

        filepath_str = str(Path(filepath))
        with open(filepath_str, "w") as f:
            yaml.safe_dump(config, f, **kwargs)

__contains__(id)

Check if ID exists in config.

Parameters:

Name Type Description Default
id str

ID path to check

required

Returns:

Type Description
bool

True if exists, False otherwise

Source code in src/sparkwheel/config.py
def __contains__(self, id: str) -> bool:
    """Check if ID exists in config.

    Args:
        id: ID path to check

    Returns:
        True if exists, False otherwise
    """
    try:
        self._get_by_id(id)
        return True
    except (KeyError, IndexError, ValueError):
        return False

__getitem__(id)

Get config value by ID (subscript access).

Parameters:

Name Type Description Default
id str

Configuration path

required

Returns:

Type Description
Any

Config value at that path

Example

config = Config.load({"model": {"lr": 0.001}}) config["model::lr"] 0.001

Source code in src/sparkwheel/config.py
def __getitem__(self, id: str) -> Any:
    """Get config value by ID (subscript access).

    Args:
        id: Configuration path

    Returns:
        Config value at that path

    Example:
        >>> config = Config.load({"model": {"lr": 0.001}})
        >>> config["model::lr"]
        0.001
    """
    return self._get_by_id(id)

__init__(data=None, *, globals=None, schema=None, coerce=True, strict=True, allow_missing=False)

Initialize Config container.

Normally starts empty - use update() to load data.

Parameters:

Name Type Description Default
data dict[str, Any] | None

Initial data (internal/testing use only, not validated)

None
globals dict[str, Any] | None

Pre-imported packages for expression evaluation

None
schema type | None

Dataclass schema for continuous validation

None
coerce bool

Auto-convert compatible types

True
strict bool

Reject fields not in schema

True
allow_missing bool

Allow MISSING sentinel values

False

Examples:

>>> config = Config(schema=MySchema)
>>> config.update("config.yaml")
>>> # Chaining
>>> config = Config(schema=MySchema).update("config.yaml")
Source code in src/sparkwheel/config.py
def __init__(
    self,
    data: dict[str, Any] | None = None,  # Internal/testing use only
    *,  # Rest are keyword-only
    globals: dict[str, Any] | None = None,
    schema: type | None = None,
    coerce: bool = True,
    strict: bool = True,
    allow_missing: bool = False,
):
    """Initialize Config container.

    Normally starts empty - use update() to load data.

    Args:
        data: Initial data (internal/testing use only, not validated)
        globals: Pre-imported packages for expression evaluation
        schema: Dataclass schema for continuous validation
        coerce: Auto-convert compatible types
        strict: Reject fields not in schema
        allow_missing: Allow MISSING sentinel values

    Examples:
        >>> config = Config(schema=MySchema)
        >>> config.update("config.yaml")

        >>> # Chaining
        >>> config = Config(schema=MySchema).update("config.yaml")
    """
    self._data: dict[str, Any] = data or {}  # Start with provided data or empty
    self._metadata = MetadataRegistry()
    self._resolver = Resolver()
    self._is_parsed = False
    self._frozen = False  # Set via freeze() method later

    # Schema validation state
    self._schema: type | None = schema
    self._coerce: bool = coerce
    self._strict: bool = strict
    self._allow_missing: bool = allow_missing

    # Process globals (import string module paths)
    self._globals: dict[str, Any] = {}
    if isinstance(globals, dict):
        for k, v in globals.items():
            self._globals[k] = optional_import(v)[0] if isinstance(v, str) else v

    self._loader = Loader()
    self._preprocessor = Preprocessor(self._loader, self._globals)

__repr__()

String representation of config.

Source code in src/sparkwheel/config.py
def __repr__(self) -> str:
    """String representation of config."""
    return f"Config({self._data})"

__setitem__(id, value)

Set config value by ID (subscript access).

Parameters:

Name Type Description Default
id str

Configuration path

required
value Any

Value to set

required
Example

config = Config.load({}) config["model::lr"] = 0.001

Source code in src/sparkwheel/config.py
def __setitem__(self, id: str, value: Any) -> None:
    """Set config value by ID (subscript access).

    Args:
        id: Configuration path
        value: Value to set

    Example:
        >>> config = Config.load({})
        >>> config["model::lr"] = 0.001
    """
    self.set(id, value)

_apply_path_updates(source)

Apply nested path updates (e.g., model::lr=value, =model=replace, ~old::param=null).

Source code in src/sparkwheel/config.py
def _apply_path_updates(self, source: dict[str, Any]) -> None:
    """Apply nested path updates (e.g., model::lr=value, =model=replace, ~old::param=null)."""
    for key, value in source.items():
        if not isinstance(key, str):
            self.set(str(key), value)  # type: ignore[unreachable]
            continue

        if key.startswith(REPLACE_KEY):
            # Replace operator: =key (explicit override)
            actual_key = key[1:]
            self.set(actual_key, value)

        elif key.startswith(REMOVE_KEY):
            # Delete operator: ~key (idempotent)
            actual_key = key[1:]
            _validate_delete_operator(actual_key, value)

            if actual_key in self:
                self._delete_nested_key(actual_key)

        else:
            # Default: compose (merge dict or extend list)
            if key in self and isinstance(self[key], dict) and isinstance(value, dict):
                merged = apply_operators(self[key], value)
                self.set(key, merged)
            elif key in self and isinstance(self[key], list) and isinstance(value, list):
                self.set(key, self[key] + value)
            else:
                # Normal set (handles nested paths with ::)
                self.set(key, value)

_apply_structural_update(source)

Apply structural update with operators.

Source code in src/sparkwheel/config.py
def _apply_structural_update(self, source: dict[str, Any]) -> None:
    """Apply structural update with operators."""
    validate_operators(source)
    self._data = apply_operators(self._data, source)
    self._invalidate_resolution()

_delete_nested_key(key)

Delete a key, supporting nested paths with ::.

Source code in src/sparkwheel/config.py
def _delete_nested_key(self, key: str) -> None:
    """Delete a key, supporting nested paths with ::."""
    if ID_SEP_KEY in key:
        keys = split_id(key)
        parent_id = ID_SEP_KEY.join(keys[:-1])
        parent = self[parent_id] if parent_id else self._data
        if isinstance(parent, dict) and keys[-1] in parent:
            del parent[keys[-1]]
    else:
        # Top-level key
        if isinstance(self._data, dict) and key in self._data:
            del self._data[key]
    self._invalidate_resolution()

_get_by_id(id)

Get config value by ID path.

Parameters:

Name Type Description Default
id str

ID path (e.g., "model::lr")

required

Returns:

Type Description
Any

Config value at that path

Raises:

Type Description
KeyError

If path not found

Source code in src/sparkwheel/config.py
def _get_by_id(self, id: str) -> Any:
    """Get config value by ID path.

    Args:
        id: ID path (e.g., "model::lr")

    Returns:
        Config value at that path

    Raises:
        KeyError: If path not found
    """
    if id == "":
        return self._data

    config = self._data
    for k in split_id(id):
        if not isinstance(config, (dict, list)):
            raise ValueError(f"Config must be dict or list for key `{k}`, but got {type(config)}: {config}")
        try:
            config = look_up_option(k, config, print_all_options=False) if isinstance(config, dict) else config[int(k)]
        except ValueError as e:
            raise KeyError(f"Key not found: {k}") from e

    return config

_invalidate_resolution()

Invalidate cached resolution (called when config changes).

Source code in src/sparkwheel/config.py
def _invalidate_resolution(self) -> None:
    """Invalidate cached resolution (called when config changes)."""
    self._is_parsed = False
    self._resolver.reset()

_parse(reset=True)

Parse config tree and prepare for resolution.

Internal method called automatically by resolve().

Parameters:

Name Type Description Default
reset bool

Whether to reset the resolver before parsing (default: True)

True
Source code in src/sparkwheel/config.py
def _parse(self, reset: bool = True) -> None:
    """Parse config tree and prepare for resolution.

    Internal method called automatically by resolve().

    Args:
        reset: Whether to reset the resolver before parsing (default: True)
    """
    # Reset resolver if requested
    if reset:
        self._resolver.reset()

    # Stage 1: Preprocess (% raw references, @:: relative resolved IDs)
    self._data = self._preprocessor.process(self._data, self._data, id="")

    # Stage 2: Parse config tree to create Items
    parser = Parser(globals=self._globals, metadata=self._metadata)
    items = parser.parse(self._data)

    # Stage 3: Add items to resolver
    self._resolver.add_items(items)

    self._is_parsed = True

_update_from_config(source)

Update from another Config instance.

Source code in src/sparkwheel/config.py
def _update_from_config(self, source: "Config") -> None:
    """Update from another Config instance."""
    self._data = apply_operators(self._data, source._data)
    self._metadata.merge(source._metadata)
    self._invalidate_resolution()

_update_from_file(source)

Load and update from a file.

Source code in src/sparkwheel/config.py
def _update_from_file(self, source: PathLike) -> None:
    """Load and update from a file."""
    new_data, new_metadata = self._loader.load_file(source)
    validate_operators(new_data)
    self._data = apply_operators(self._data, new_data)
    self._metadata.merge(new_metadata)
    self._invalidate_resolution()

_update_from_override_string(override)

Parse and apply a single override string (e.g., 'key=value', '=key=value', '~key').

Source code in src/sparkwheel/config.py
def _update_from_override_string(self, override: str) -> None:
    """Parse and apply a single override string (e.g., 'key=value', '=key=value', '~key')."""
    overrides_dict = parse_overrides([override])
    self._apply_path_updates(overrides_dict)

_uses_nested_paths(source)

Check if dict uses :: path syntax.

Source code in src/sparkwheel/config.py
def _uses_nested_paths(self, source: dict[str, Any]) -> bool:
    """Check if dict uses :: path syntax."""
    return any(ID_SEP_KEY in str(k).lstrip(REPLACE_KEY).lstrip(REMOVE_KEY) for k in source.keys())

export_config_file(config, filepath, **kwargs) staticmethod

Export config to YAML file.

Parameters:

Name Type Description Default
config dict[str, Any]

Config dict to export

required
filepath PathLike

Target file path

required
kwargs Any

Additional arguments for yaml.safe_dump

{}
Source code in src/sparkwheel/config.py
@staticmethod
def export_config_file(config: dict[str, Any], filepath: PathLike, **kwargs: Any) -> None:
    """Export config to YAML file.

    Args:
        config: Config dict to export
        filepath: Target file path
        kwargs: Additional arguments for yaml.safe_dump
    """
    import yaml  # type: ignore[import-untyped]

    filepath_str = str(Path(filepath))
    with open(filepath_str, "w") as f:
        yaml.safe_dump(config, f, **kwargs)

freeze()

Freeze config to prevent further modifications.

After freezing: - set() raises FrozenConfigError - update() raises FrozenConfigError - resolve() still works (read-only) - get() still works (read-only)

Example

config = Config(schema=MySchema).update("config.yaml") config.freeze() config.set("model::lr", 0.001) # Raises FrozenConfigError

Source code in src/sparkwheel/config.py
def freeze(self) -> None:
    """Freeze config to prevent further modifications.

    After freezing:
    - set() raises FrozenConfigError
    - update() raises FrozenConfigError
    - resolve() still works (read-only)
    - get() still works (read-only)

    Example:
        >>> config = Config(schema=MySchema).update("config.yaml")
        >>> config.freeze()
        >>> config.set("model::lr", 0.001)  # Raises FrozenConfigError
    """
    self._frozen = True

get(id='', default=None)

Get raw config value (unresolved).

Parameters:

Name Type Description Default
id str

Configuration path (use :: for nesting, e.g., "model::lr") Empty string returns entire config

''
default Any

Default value if id not found

None

Returns:

Type Description
Any

Raw configuration value (resolved references not resolved, raw references not expanded)

Example

config = Config.load({"model": {"lr": 0.001, "ref": "@model::lr"}}) config.get("model::lr") 0.001 config.get("model::ref") "@model::lr" # Unresolved resolved reference

Source code in src/sparkwheel/config.py
def get(self, id: str = "", default: Any = None) -> Any:
    """Get raw config value (unresolved).

    Args:
        id: Configuration path (use :: for nesting, e.g., "model::lr")
            Empty string returns entire config
        default: Default value if id not found

    Returns:
        Raw configuration value (resolved references not resolved, raw references not expanded)

    Example:
        >>> config = Config.load({"model": {"lr": 0.001, "ref": "@model::lr"}})
        >>> config.get("model::lr")
        0.001
        >>> config.get("model::ref")
        "@model::lr"  # Unresolved resolved reference
    """
    try:
        return self._get_by_id(id)
    except (KeyError, IndexError, ValueError):
        return default

is_frozen()

Check if config is frozen.

Returns:

Type Description
bool

True if frozen, False otherwise

Source code in src/sparkwheel/config.py
def is_frozen(self) -> bool:
    """Check if config is frozen.

    Returns:
        True if frozen, False otherwise
    """
    return self._frozen

resolve(id='', instantiate=True, eval_expr=True, lazy=True, default=None)

Resolve resolved references (@) and return parsed config.

Automatically parses config on first call. Resolves @ resolved references (follows them to get instantiated/evaluated values), evaluates $ expressions, and instantiates target components. Note: % raw references are expanded during preprocessing (before this stage).

Parameters:

Name Type Description Default
id str

Config path to resolve (empty string for entire config)

''
instantiate bool

Whether to instantiate components with target

True
eval_expr bool

Whether to evaluate $ expressions

True
lazy bool

Whether to use cached resolution

True
default Any

Default value if id not found (returns default.get_config() if Item)

None

Returns:

Type Description
Any

Resolved value (instantiated objects, evaluated expressions, etc.)

Example

config = Config.load({ ... "lr": 0.001, ... "doubled": "$@lr * 2", ... "optimizer": { ... "target": "torch.optim.Adam", ... "lr": "@lr" ... } ... }) config.resolve("lr") 0.001 config.resolve("doubled") 0.002 optimizer = config.resolve("optimizer") type(optimizer).name 'Adam'

Source code in src/sparkwheel/config.py
def resolve(
    self,
    id: str = "",
    instantiate: bool = True,
    eval_expr: bool = True,
    lazy: bool = True,
    default: Any = None,
) -> Any:
    """Resolve resolved references (@) and return parsed config.

    Automatically parses config on first call. Resolves @ resolved references (follows
    them to get instantiated/evaluated values), evaluates $ expressions, and
    instantiates _target_ components. Note: % raw references are expanded during
    preprocessing (before this stage).

    Args:
        id: Config path to resolve (empty string for entire config)
        instantiate: Whether to instantiate components with _target_
        eval_expr: Whether to evaluate $ expressions
        lazy: Whether to use cached resolution
        default: Default value if id not found (returns default.get_config() if Item)

    Returns:
        Resolved value (instantiated objects, evaluated expressions, etc.)

    Example:
        >>> config = Config.load({
        ...     "lr": 0.001,
        ...     "doubled": "$@lr * 2",
        ...     "optimizer": {
        ...         "_target_": "torch.optim.Adam",
        ...         "lr": "@lr"
        ...     }
        ... })
        >>> config.resolve("lr")
        0.001
        >>> config.resolve("doubled")
        0.002
        >>> optimizer = config.resolve("optimizer")
        >>> type(optimizer).__name__
        'Adam'
    """
    # Parse if needed
    if not self._is_parsed or not lazy:
        self._parse()

    # Resolve and return
    try:
        return self._resolver.resolve(id=id, instantiate=instantiate, eval_expr=eval_expr)
    except (KeyError, ConfigKeyError):
        if default is not None:
            # If default is an Item, return its config
            from .items import Item

            if isinstance(default, Item):
                return default.get_config()
            return default
        raise

set(id, value)

Set config value, creating paths as needed.

Parameters:

Name Type Description Default
id str

Configuration path (use :: for nesting)

required
value Any

Value to set

required

Raises:

Type Description
FrozenConfigError

If config is frozen

Example

config = Config() config.set("model::lr", 0.001) config.get("model::lr") 0.001

Source code in src/sparkwheel/config.py
def set(self, id: str, value: Any) -> None:
    """Set config value, creating paths as needed.

    Args:
        id: Configuration path (use :: for nesting)
        value: Value to set

    Raises:
        FrozenConfigError: If config is frozen

    Example:
        >>> config = Config()
        >>> config.set("model::lr", 0.001)
        >>> config.get("model::lr")
        0.001
    """
    from .utils.exceptions import FrozenConfigError

    # Check frozen state
    if self._frozen:
        raise FrozenConfigError("Cannot modify frozen config", field_path=id)

    if id == "":
        self._data = value
        self._invalidate_resolution()
        return

    keys = split_id(id)

    # Ensure root is dict
    if not isinstance(self._data, dict):
        self._data = {}  # type: ignore[unreachable]

    # Create missing intermediate paths
    current = self._data
    for k in keys[:-1]:
        if k not in current:
            current[k] = {}
        elif not isinstance(current[k], dict):
            current[k] = {}
        current = current[k]

    # Set final value
    current[keys[-1]] = value
    self._invalidate_resolution()

unfreeze()

Unfreeze config to allow modifications.

Source code in src/sparkwheel/config.py
def unfreeze(self) -> None:
    """Unfreeze config to allow modifications."""
    self._frozen = False

update(source)

Update configuration with changes from another source.

Auto-detects strings as either file paths or CLI overrides: - Strings with '=' are parsed as overrides (e.g., "key=value", "=key=value", "~key") - Strings without '=' are treated as file paths - Dicts and Config instances work as before

Parameters:

Name Type Description Default
source PathLike | dict[str, Any] | Config | str

File path, override string, dict, or Config instance to update from

required

Returns:

Type Description
Config

self (for chaining)

Operators
  • key=value - Compose (default): merge dict or extend list
  • =key=value - Replace operator: completely replace value
  • ~key - Remove operator: delete key (idempotent)

Examples:

>>> # Update from file
>>> config.update("base.yaml")
>>> # Update from override string (auto-detected)
>>> config.update("model::lr=0.001")
>>> # Chain multiple updates (mixed files and overrides)
>>> config = (Config(schema=MySchema)
...           .update("base.yaml")
...           .update("exp.yaml")
...           .update("optimizer::lr=0.01")
...           .update("=model={'_target_': 'MyModel'}")
...           .update("~debug"))
>>> # Update from dict
>>> config.update({"model": {"dropout": 0.1}})
>>> # Update from another Config instance
>>> config1 = Config()
>>> config2 = Config().update({"model::lr": 0.001})
>>> config1.update(config2)
>>> # CLI integration pattern (just loop!)
>>> for item in cli_args:
...     config.update(item)
Source code in src/sparkwheel/config.py
def update(self, source: PathLike | dict[str, Any] | "Config" | str) -> "Config":
    """Update configuration with changes from another source.

    Auto-detects strings as either file paths or CLI overrides:
    - Strings with '=' are parsed as overrides (e.g., "key=value", "=key=value", "~key")
    - Strings without '=' are treated as file paths
    - Dicts and Config instances work as before

    Args:
        source: File path, override string, dict, or Config instance to update from

    Returns:
        self (for chaining)

    Operators:
        - key=value      - Compose (default): merge dict or extend list
        - =key=value     - Replace operator: completely replace value
        - ~key           - Remove operator: delete key (idempotent)

    Examples:
        >>> # Update from file
        >>> config.update("base.yaml")

        >>> # Update from override string (auto-detected)
        >>> config.update("model::lr=0.001")

        >>> # Chain multiple updates (mixed files and overrides)
        >>> config = (Config(schema=MySchema)
        ...           .update("base.yaml")
        ...           .update("exp.yaml")
        ...           .update("optimizer::lr=0.01")
        ...           .update("=model={'_target_': 'MyModel'}")
        ...           .update("~debug"))

        >>> # Update from dict
        >>> config.update({"model": {"dropout": 0.1}})

        >>> # Update from another Config instance
        >>> config1 = Config()
        >>> config2 = Config().update({"model::lr": 0.001})
        >>> config1.update(config2)

        >>> # CLI integration pattern (just loop!)
        >>> for item in cli_args:
        ...     config.update(item)
    """
    from .utils.exceptions import FrozenConfigError

    if self._frozen:
        raise FrozenConfigError("Cannot update frozen config")

    if isinstance(source, Config):
        self._update_from_config(source)
    elif isinstance(source, dict):
        if self._uses_nested_paths(source):
            self._apply_path_updates(source)
        else:
            self._apply_structural_update(source)
    elif isinstance(source, str) and ("=" in source or source.startswith("~")):
        # Auto-detect override string (key=value, =key=value, ~key)
        self._update_from_override_string(source)
    else:
        self._update_from_file(source)

    # Validate after update if schema exists
    if self._schema:
        from .schema import validate as validate_schema

        validate_schema(
            self._data,
            self._schema,
            metadata=self._metadata,
            allow_missing=self._allow_missing,
            strict=self._strict,
        )

    return self  # Enable chaining

validate(schema)

Validate configuration against a dataclass schema.

Parameters:

Name Type Description Default
schema type

Dataclass type defining the expected structure and types

required

Raises:

Type Description
ValidationError

If configuration doesn't match schema

TypeError

If schema is not a dataclass

Example

from dataclasses import dataclass @dataclass ... class ModelConfig: ... hidden_size: int ... dropout: float config = Config.load({"hidden_size": 512, "dropout": 0.1}) config.validate(ModelConfig) # Passes bad_config = Config.load({"hidden_size": "not an int"}) bad_config.validate(ModelConfig) # Raises ValidationError

Source code in src/sparkwheel/config.py
def validate(self, schema: type) -> None:
    """Validate configuration against a dataclass schema.

    Args:
        schema: Dataclass type defining the expected structure and types

    Raises:
        ValidationError: If configuration doesn't match schema
        TypeError: If schema is not a dataclass

    Example:
        >>> from dataclasses import dataclass
        >>> @dataclass
        ... class ModelConfig:
        ...     hidden_size: int
        ...     dropout: float
        >>> config = Config.load({"hidden_size": 512, "dropout": 0.1})
        >>> config.validate(ModelConfig)  # Passes
        >>> bad_config = Config.load({"hidden_size": "not an int"})
        >>> bad_config.validate(ModelConfig)  # Raises ValidationError
    """
    from .schema import validate as validate_schema

    validate_schema(self._data, schema, metadata=self._metadata)

ConfigKeyError

Bases: BaseError

Raised when a config key is not found.

Supports smart suggestions and available keys display.

Source code in src/sparkwheel/utils/exceptions.py
class ConfigKeyError(BaseError):
    """Raised when a config key is not found.

    Supports smart suggestions and available keys display.
    """

    def __init__(
        self,
        message: str,
        source_location: SourceLocation | None = None,
        suggestion: str | None = None,
        missing_key: str | None = None,
        available_keys: list[str] | None = None,
        config_context: dict[str, Any] | None = None,
    ) -> None:
        """Initialize ConfigKeyError with enhanced context.

        Args:
            message: Error message
            source_location: Location where error occurred
            suggestion: Manual suggestion (optional)
            missing_key: The key that wasn't found
            available_keys: List of available keys for suggestions
            config_context: The config dict where the key wasn't found (for displaying available keys)
        """
        self.missing_key = missing_key
        self.available_keys = available_keys or []
        self.config_context = config_context

        # Auto-generate suggestion if not provided
        if not suggestion and missing_key and available_keys:
            suggestion = self._generate_suggestion()

        super().__init__(message, source_location, suggestion)

    def _generate_suggestion(self) -> str | None:
        """Generate smart suggestion with typo detection and available keys."""
        from ..errors import format_available_keys, format_suggestions, get_suggestions

        parts = []

        # Try to find similar keys
        if self.missing_key and self.available_keys:
            suggestions = get_suggestions(self.missing_key, self.available_keys)
            if suggestions:
                parts.append(format_suggestions(suggestions))

        # Show available keys if we have config context and not too many keys
        if self.config_context and len(self.config_context) <= 10:
            available = format_available_keys(self.config_context)
            if available:
                if parts:
                    parts.append("")  # Blank line separator
                parts.append(available)

        return "\n".join(parts) if parts else None

__init__(message, source_location=None, suggestion=None, missing_key=None, available_keys=None, config_context=None)

Initialize ConfigKeyError with enhanced context.

Parameters:

Name Type Description Default
message str

Error message

required
source_location SourceLocation | None

Location where error occurred

None
suggestion str | None

Manual suggestion (optional)

None
missing_key str | None

The key that wasn't found

None
available_keys list[str] | None

List of available keys for suggestions

None
config_context dict[str, Any] | None

The config dict where the key wasn't found (for displaying available keys)

None
Source code in src/sparkwheel/utils/exceptions.py
def __init__(
    self,
    message: str,
    source_location: SourceLocation | None = None,
    suggestion: str | None = None,
    missing_key: str | None = None,
    available_keys: list[str] | None = None,
    config_context: dict[str, Any] | None = None,
) -> None:
    """Initialize ConfigKeyError with enhanced context.

    Args:
        message: Error message
        source_location: Location where error occurred
        suggestion: Manual suggestion (optional)
        missing_key: The key that wasn't found
        available_keys: List of available keys for suggestions
        config_context: The config dict where the key wasn't found (for displaying available keys)
    """
    self.missing_key = missing_key
    self.available_keys = available_keys or []
    self.config_context = config_context

    # Auto-generate suggestion if not provided
    if not suggestion and missing_key and available_keys:
        suggestion = self._generate_suggestion()

    super().__init__(message, source_location, suggestion)

_generate_suggestion()

Generate smart suggestion with typo detection and available keys.

Source code in src/sparkwheel/utils/exceptions.py
def _generate_suggestion(self) -> str | None:
    """Generate smart suggestion with typo detection and available keys."""
    from ..errors import format_available_keys, format_suggestions, get_suggestions

    parts = []

    # Try to find similar keys
    if self.missing_key and self.available_keys:
        suggestions = get_suggestions(self.missing_key, self.available_keys)
        if suggestions:
            parts.append(format_suggestions(suggestions))

    # Show available keys if we have config context and not too many keys
    if self.config_context and len(self.config_context) <= 10:
        available = format_available_keys(self.config_context)
        if available:
            if parts:
                parts.append("")  # Blank line separator
            parts.append(available)

    return "\n".join(parts) if parts else None

ConfigMergeError

Bases: BaseError

Raised when configuration merge operation fails.

This is typically raised when using operators (= or ~) incorrectly: - Using ~ on a non-existent key - Using ~ with invalid value (must be null, empty, or list) - Type mismatch during merge (e.g., trying to merge dict into list)

Source code in src/sparkwheel/utils/exceptions.py
class ConfigMergeError(BaseError):
    """Raised when configuration merge operation fails.

    This is typically raised when using operators (= or ~) incorrectly:
    - Using ~ on a non-existent key
    - Using ~ with invalid value (must be null, empty, or list)
    - Type mismatch during merge (e.g., trying to merge dict into list)
    """

    pass

EvaluationError

Bases: BaseError

Raised when evaluating an expression fails.

Source code in src/sparkwheel/utils/exceptions.py
class EvaluationError(BaseError):
    """Raised when evaluating an expression fails."""

    pass

Expression

Bases: Item

Executable expression that evaluates Python code.

Expressions start with $ and are evaluated using Python's eval(), or imported if they're import statements.

Example
from sparkwheel import Expression

config = "$len([1, 2, 3])"
expression = Expression(config, id="test", globals={"len": len})
print(expression.evaluate())  # 3

Parameters:

Name Type Description Default
config Any

Expression string starting with $

required
id str

Identifier for this config item, defaults to ""

''
globals dict[str, Any] | None

Additional global context for evaluation

None
See Also

Python eval documentation

Source code in src/sparkwheel/items.py
class Expression(Item):
    """Executable expression that evaluates Python code.

    Expressions start with `$` and are evaluated using Python's `eval()`,
    or imported if they're import statements.

    Example:
        ```python
        from sparkwheel import Expression

        config = "$len([1, 2, 3])"
        expression = Expression(config, id="test", globals={"len": len})
        print(expression.evaluate())  # 3
        ```

    Args:
        config: Expression string starting with `$`
        id: Identifier for this config item, defaults to ""
        globals: Additional global context for evaluation

    See Also:
        [Python eval documentation](https://docs.python.org/3/library/functions.html#eval)
    """

    prefix = EXPR_KEY
    run_eval = run_eval

    def __init__(
        self,
        config: Any,
        id: str = "",
        globals: dict[str, Any] | None = None,
        source_location: SourceLocation | None = None,
    ) -> None:
        super().__init__(config=config, id=id, source_location=source_location)
        self.globals = globals if globals is not None else {}

    def _parse_import_string(self, import_string: str) -> Any | None:
        """parse single import statement such as "from pathlib import Path" """
        node = first(ast.iter_child_nodes(ast.parse(import_string)))
        if not isinstance(node, (ast.Import, ast.ImportFrom)):
            return None
        if len(node.names) < 1:
            return None
        if len(node.names) > 1:
            warnings.warn(f"ignoring multiple import alias '{import_string}'.", stacklevel=2)
        name, asname = f"{node.names[0].name}", node.names[0].asname
        asname = name if asname is None else f"{asname}"
        if isinstance(node, ast.ImportFrom):
            self.globals[asname], _ = optional_import(f"{node.module}", name=f"{name}")
            return self.globals[asname]
        if isinstance(node, ast.Import):
            self.globals[asname], _ = optional_import(f"{name}")
            return self.globals[asname]
        return None  # type: ignore[unreachable]

    def evaluate(self, globals: dict[str, Any] | None = None, locals: dict[str, Any] | None = None) -> str | Any | None:
        """Evaluate the expression and return the result.

        Uses Python's `eval()` to execute the expression string.

        Args:
            globals: Additional global symbols for evaluation
            locals: Additional local symbols for evaluation

        Returns:
            Evaluation result, or None if not an expression

        Raises:
            RuntimeError: If evaluation fails
        """
        value = self.get_config()
        if not Expression.is_expression(value):
            return None
        optional_module = self._parse_import_string(value[len(self.prefix) :])
        if optional_module is not None:
            return optional_module
        if not self.run_eval:
            return f"{value[len(self.prefix) :]}"
        globals_ = dict(self.globals)
        if globals is not None:
            for k, v in globals.items():
                if k in globals_:
                    warnings.warn(f"the new global variable `{k}` conflicts with `self.globals`, override it.", stacklevel=2)
                globals_[k] = v
        if not run_debug:
            try:
                return eval(value[len(self.prefix) :], globals_, locals)
            except Exception as e:
                raise EvaluationError(
                    f"Failed to evaluate expression: '{value[len(self.prefix) :]}'",
                    source_location=self.source_location,
                ) from e
        warnings.warn(
            f"\n\npdb: value={value}\nSee also Debugger commands documentation: https://docs.python.org/3/library/pdb.html\n",
            stacklevel=2,
        )
        import pdb  # noqa: T100

        pdb.run(value[len(self.prefix) :], globals_, locals)
        return None

    @classmethod
    def is_expression(cls, config: dict[str, Any] | list[Any] | str) -> bool:
        """
        Check whether the config is an executable expression string.
        Currently, a string starts with ``"$"`` character is interpreted as an expression.

        Args:
            config: input config content to check.
        """
        return isinstance(config, str) and config.startswith(cls.prefix)

    @classmethod
    def is_import_statement(cls, config: dict[str, Any] | list[Any] | str) -> bool:
        """
        Check whether the config is an import statement (a special case of expression).

        Args:
            config: input config content to check.
        """
        if not cls.is_expression(config):
            return False
        if "import" not in config:
            return False
        return isinstance(first(ast.iter_child_nodes(ast.parse(f"{config[len(cls.prefix) :]}"))), (ast.Import, ast.ImportFrom))  # type: ignore[index]

_parse_import_string(import_string)

parse single import statement such as "from pathlib import Path"

Source code in src/sparkwheel/items.py
def _parse_import_string(self, import_string: str) -> Any | None:
    """parse single import statement such as "from pathlib import Path" """
    node = first(ast.iter_child_nodes(ast.parse(import_string)))
    if not isinstance(node, (ast.Import, ast.ImportFrom)):
        return None
    if len(node.names) < 1:
        return None
    if len(node.names) > 1:
        warnings.warn(f"ignoring multiple import alias '{import_string}'.", stacklevel=2)
    name, asname = f"{node.names[0].name}", node.names[0].asname
    asname = name if asname is None else f"{asname}"
    if isinstance(node, ast.ImportFrom):
        self.globals[asname], _ = optional_import(f"{node.module}", name=f"{name}")
        return self.globals[asname]
    if isinstance(node, ast.Import):
        self.globals[asname], _ = optional_import(f"{name}")
        return self.globals[asname]
    return None  # type: ignore[unreachable]

evaluate(globals=None, locals=None)

Evaluate the expression and return the result.

Uses Python's eval() to execute the expression string.

Parameters:

Name Type Description Default
globals dict[str, Any] | None

Additional global symbols for evaluation

None
locals dict[str, Any] | None

Additional local symbols for evaluation

None

Returns:

Type Description
str | Any | None

Evaluation result, or None if not an expression

Raises:

Type Description
RuntimeError

If evaluation fails

Source code in src/sparkwheel/items.py
def evaluate(self, globals: dict[str, Any] | None = None, locals: dict[str, Any] | None = None) -> str | Any | None:
    """Evaluate the expression and return the result.

    Uses Python's `eval()` to execute the expression string.

    Args:
        globals: Additional global symbols for evaluation
        locals: Additional local symbols for evaluation

    Returns:
        Evaluation result, or None if not an expression

    Raises:
        RuntimeError: If evaluation fails
    """
    value = self.get_config()
    if not Expression.is_expression(value):
        return None
    optional_module = self._parse_import_string(value[len(self.prefix) :])
    if optional_module is not None:
        return optional_module
    if not self.run_eval:
        return f"{value[len(self.prefix) :]}"
    globals_ = dict(self.globals)
    if globals is not None:
        for k, v in globals.items():
            if k in globals_:
                warnings.warn(f"the new global variable `{k}` conflicts with `self.globals`, override it.", stacklevel=2)
            globals_[k] = v
    if not run_debug:
        try:
            return eval(value[len(self.prefix) :], globals_, locals)
        except Exception as e:
            raise EvaluationError(
                f"Failed to evaluate expression: '{value[len(self.prefix) :]}'",
                source_location=self.source_location,
            ) from e
    warnings.warn(
        f"\n\npdb: value={value}\nSee also Debugger commands documentation: https://docs.python.org/3/library/pdb.html\n",
        stacklevel=2,
    )
    import pdb  # noqa: T100

    pdb.run(value[len(self.prefix) :], globals_, locals)
    return None

is_expression(config) classmethod

Check whether the config is an executable expression string. Currently, a string starts with "$" character is interpreted as an expression.

Parameters:

Name Type Description Default
config dict[str, Any] | list[Any] | str

input config content to check.

required
Source code in src/sparkwheel/items.py
@classmethod
def is_expression(cls, config: dict[str, Any] | list[Any] | str) -> bool:
    """
    Check whether the config is an executable expression string.
    Currently, a string starts with ``"$"`` character is interpreted as an expression.

    Args:
        config: input config content to check.
    """
    return isinstance(config, str) and config.startswith(cls.prefix)

is_import_statement(config) classmethod

Check whether the config is an import statement (a special case of expression).

Parameters:

Name Type Description Default
config dict[str, Any] | list[Any] | str

input config content to check.

required
Source code in src/sparkwheel/items.py
@classmethod
def is_import_statement(cls, config: dict[str, Any] | list[Any] | str) -> bool:
    """
    Check whether the config is an import statement (a special case of expression).

    Args:
        config: input config content to check.
    """
    if not cls.is_expression(config):
        return False
    if "import" not in config:
        return False
    return isinstance(first(ast.iter_child_nodes(ast.parse(f"{config[len(cls.prefix) :]}"))), (ast.Import, ast.ImportFrom))  # type: ignore[index]

FrozenConfigError

Bases: BaseError

Raised when attempting to modify a frozen config.

Attributes:

Name Type Description
message

Error description

field_path

Path that was attempted to modify

Source code in src/sparkwheel/utils/exceptions.py
class FrozenConfigError(BaseError):
    """Raised when attempting to modify a frozen config.

    Attributes:
        message: Error description
        field_path: Path that was attempted to modify
    """

    def __init__(self, message: str, field_path: str = ""):
        self.field_path = field_path
        full_message = message
        if field_path:
            full_message = f"Cannot modify frozen config at '{field_path}': {message}"
        super().__init__(full_message)

Instantiable

Bases: ABC

Base class for an instantiable object.

Source code in src/sparkwheel/items.py
class Instantiable(ABC):
    """
    Base class for an instantiable object.
    """

    @abstractmethod
    def is_disabled(self, *args: Any, **kwargs: Any) -> bool:
        """
        Return a boolean flag to indicate whether the object should be instantiated.
        """
        raise NotImplementedError(f"subclass {self.__class__.__name__} must implement this method.")

    @abstractmethod
    def instantiate(self, *args: Any, **kwargs: Any) -> object:
        """
        Instantiate the target component and return the instance.
        """
        raise NotImplementedError(f"subclass {self.__class__.__name__} must implement this method.")

instantiate(*args, **kwargs) abstractmethod

Instantiate the target component and return the instance.

Source code in src/sparkwheel/items.py
@abstractmethod
def instantiate(self, *args: Any, **kwargs: Any) -> object:
    """
    Instantiate the target component and return the instance.
    """
    raise NotImplementedError(f"subclass {self.__class__.__name__} must implement this method.")

is_disabled(*args, **kwargs) abstractmethod

Return a boolean flag to indicate whether the object should be instantiated.

Source code in src/sparkwheel/items.py
@abstractmethod
def is_disabled(self, *args: Any, **kwargs: Any) -> bool:
    """
    Return a boolean flag to indicate whether the object should be instantiated.
    """
    raise NotImplementedError(f"subclass {self.__class__.__name__} must implement this method.")

InstantiationError

Bases: BaseError

Raised when instantiating a component fails.

Source code in src/sparkwheel/utils/exceptions.py
class InstantiationError(BaseError):
    """Raised when instantiating a component fails."""

    pass

Item

Basic data structure to represent a configuration item.

A Item instance can optionally have a string id, so that other items can refer to it. It has a build-in config property to store the configuration object.

Parameters:

Name Type Description Default
config Any

content of a config item, can be objects of any types, a configuration resolver may interpret the content to generate a configuration object.

required
id str

name of the current config item, defaults to empty string.

''
source_location SourceLocation | None

optional location in source file where this config item was defined.

None
Source code in src/sparkwheel/items.py
class Item:
    """
    Basic data structure to represent a configuration item.

    A `Item` instance can optionally have a string id, so that other items can refer to it.
    It has a build-in `config` property to store the configuration object.

    Args:
        config: content of a config item, can be objects of any types,
            a configuration resolver may interpret the content to generate a configuration object.
        id: name of the current config item, defaults to empty string.
        source_location: optional location in source file where this config item was defined.
    """

    def __init__(self, config: Any, id: str = "", source_location: SourceLocation | None = None) -> None:
        self.config = config
        self.id = id
        self.source_location = source_location

    def get_id(self) -> str:
        """
        Get the ID name of current config item, useful to identify config items during parsing.
        """
        return self.id

    def update_config(self, config: Any) -> None:
        """
        Replace the content of `self.config` with new `config`.
        A typical usage is to modify the initial config content at runtime.

        Args:
            config: content of a `Item`.
        """
        self.config = config

    def get_config(self):
        """
        Get the config content of current config item.
        """
        return self.config

    def __repr__(self) -> str:
        return f"{type(self).__name__}: \n{pformat(self.config)}"

get_config()

Get the config content of current config item.

Source code in src/sparkwheel/items.py
def get_config(self):
    """
    Get the config content of current config item.
    """
    return self.config

get_id()

Get the ID name of current config item, useful to identify config items during parsing.

Source code in src/sparkwheel/items.py
def get_id(self) -> str:
    """
    Get the ID name of current config item, useful to identify config items during parsing.
    """
    return self.id

update_config(config)

Replace the content of self.config with new config. A typical usage is to modify the initial config content at runtime.

Parameters:

Name Type Description Default
config Any

content of a Item.

required
Source code in src/sparkwheel/items.py
def update_config(self, config: Any) -> None:
    """
    Replace the content of `self.config` with new `config`.
    A typical usage is to modify the initial config content at runtime.

    Args:
        config: content of a `Item`.
    """
    self.config = config

ModuleNotFoundError

Bases: BaseError

Raised when a target module/class/function cannot be located.

Source code in src/sparkwheel/utils/exceptions.py
class ModuleNotFoundError(BaseError):
    """Raised when a _target_ module/class/function cannot be located."""

    pass

Resolver

Resolve references between Items.

Manages Items and resolves resolved reference strings (starting with @) by substituting them with their corresponding resolved values (instantiated objects, evaluated expressions, etc.).

Example
from sparkwheel import Item, Component, Resolver

resolver = Resolver()

# Add items
resolver.add_item(Item(config=0.001, id="lr"))
resolver.add_item(Item(config={"lr": "@lr"}, id="config"))

# Resolve
result = resolver.resolve("config")
print(result)  # {"lr": 0.001}

Resolved references can use :: separator for nested access: - Dictionary keys: @config:🔑:subkey - List indices: @list::0::subitem

Source code in src/sparkwheel/resolver.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
class Resolver:
    """Resolve references between Items.

    Manages Items and resolves resolved reference strings (starting with @) by
    substituting them with their corresponding resolved values (instantiated objects,
    evaluated expressions, etc.).

    Example:
        ```python
        from sparkwheel import Item, Component, Resolver

        resolver = Resolver()

        # Add items
        resolver.add_item(Item(config=0.001, id="lr"))
        resolver.add_item(Item(config={"lr": "@lr"}, id="config"))

        # Resolve
        result = resolver.resolve("config")
        print(result)  # {"lr": 0.001}
        ```

    Resolved references can use :: separator for nested access:
    - Dictionary keys: @config::key::subkey
    - List indices: @list::0::subitem
    """

    _vars = "__local_refs"  # Variable name for resolved refs in expression evaluation
    sep = ID_SEP_KEY  # Separator for nested key access
    ref = RESOLVED_REF_KEY  # Resolved reference prefix (@)
    allow_missing_reference = allow_missing_reference
    max_resolution_depth = 100  # Prevent DoS from deeply nested references

    def __init__(self, items: list[Item] | None = None):
        """Initialize resolver with optional items.

        Args:
            items: Optional list of Items to add during initialization
        """
        self._items: dict[str, Item] = {}
        self._resolved: dict[str, Any] = {}

        if items:
            for item in items:
                self.add_item(item)

    def reset(self) -> None:
        """Clear all items and resolved content."""
        self._items = {}
        self._resolved = {}

    def is_resolved(self) -> bool:
        """Check if any items have been resolved."""
        return bool(self._resolved)

    def add_item(self, item: Item) -> None:
        """Add a Item to resolve.

        Args:
            item: Item to add
        """
        id = item.get_id()
        if id in self._items:
            warnings.warn(
                f"Duplicate config item ID '{id}' detected. "
                f"The new item will be ignored and the existing item will be kept. "
                f"This may indicate a configuration error.",
                UserWarning,
                stacklevel=2,
            )
            return
        self._items[id] = item

    def add_items(self, items: list[Item]) -> None:
        """Add multiple Items at once.

        Args:
            items: List of Items to add
        """
        for item in items:
            self.add_item(item)

    def get_item(self, id: str, resolve: bool = False, **kwargs: Any) -> Item | None:
        """Get Item by id, optionally resolved.

        Args:
            id: ID of the config item
            resolve: Whether to resolve the item (default: False)
            **kwargs: Additional arguments for resolution

        Returns:
            Item if found, None otherwise (or resolved value if resolve=True)
        """
        id = self.normalize_id(id)
        if resolve and id not in self._resolved:
            self._resolve_one_item(id=id, **kwargs)
        return self._items.get(id)

    def resolve(
        self,
        id: str = "",
        instantiate: bool = True,
        eval_expr: bool = True,
        default: Any = None,
    ) -> Any:
        """Resolve a config item and return the result.

        Resolves all references, instantiates components (if requested), and
        evaluates expressions (if requested). Results are cached for efficiency.

        Args:
            id: ID of item to resolve (empty string for root)
            instantiate: Whether to instantiate components with _target_
            eval_expr: Whether to evaluate expressions starting with $
            default: Default value if id not found

        Returns:
            Resolved value (instantiated object, evaluated result, or raw value)

        Raises:
            ConfigKeyError: If id not found and no default provided
            CircularReferenceError: If circular reference detected
        """
        return self._resolve_one_item(id=id, instantiate=instantiate, eval_expr=eval_expr, default=default)

    def _resolve_one_item(
        self,
        id: str,
        waiting_list: set[str] | None = None,
        _depth: int = 0,
        instantiate: bool = True,
        eval_expr: bool = True,
        default: Any = None,
    ) -> Any:
        """Internal recursive resolution implementation.

        Args:
            id: ID to resolve
            waiting_list: Set of IDs currently being resolved (for cycle detection)
            _depth: Current recursion depth (for DoS prevention)
            instantiate: Whether to instantiate components
            eval_expr: Whether to evaluate expressions
            default: Default value if not found

        Returns:
            Resolved value

        Raises:
            RecursionError: If max depth exceeded
            CircularReferenceError: If circular reference detected
            ConfigKeyError: If reference not found
        """
        # Prevent stack overflow attacks
        if _depth >= self.max_resolution_depth:
            raise RecursionError(
                f"Maximum reference resolution depth ({self.max_resolution_depth}) exceeded while resolving '{id}'. "
                f"This may indicate an overly complex configuration or a potential DoS attack."
            )

        id = self.normalize_id(id)

        # Return cached result if available
        if id in self._resolved:
            return self._resolved[id]

        # Look up the item
        try:
            item = look_up_option(id, self._items, print_all_options=False, default=default or "no_default")
        except ValueError as err:
            # Provide helpful error with suggestions
            source_location = None
            for config_item in self._items.values():
                if hasattr(config_item, "source_location") and config_item.source_location:
                    source_location = config_item.source_location
                    break

            available_keys = list(self._items.keys())
            config_context = None

            # For nested IDs, try to get parent context to show available keys
            if ID_SEP_KEY in id:
                parent_id = ID_SEP_KEY.join(id.split(ID_SEP_KEY)[:-1])
                try:
                    parent_item = self.get_item(parent_id)
                    if parent_item and isinstance(parent_item.get_config(), dict):
                        config_context = parent_item.get_config()
                except (ValueError, KeyError):
                    pass

            raise ConfigKeyError(
                f"Config ID '{id}' not found in the configuration",
                source_location=source_location,
                missing_key=id,
                available_keys=available_keys,
                config_context=config_context,
            ) from err

        # If default was returned, just return it
        if not isinstance(item, Item):
            return item

        item_config = item.get_config()

        # Initialize waiting list for circular reference detection
        if waiting_list is None:
            waiting_list = set()
        waiting_list.add(id)

        # First, resolve any import expressions (they need to run first)
        for t, v in self._items.items():
            if t not in self._resolved and isinstance(v, Expression) and v.is_import_statement(v.get_config()):
                self._resolved[t] = v.evaluate() if eval_expr else v

        # Find all references in this item's config
        refs = self.find_refs_in_config(config=item_config, id=id)

        # Resolve dependencies first
        for dep_id in refs.keys():
            # Check for circular references
            if dep_id in waiting_list:
                raise CircularReferenceError(
                    f"Circular reference detected: '{dep_id}' references back to '{id}'",
                    source_location=item.source_location if hasattr(item, "source_location") else None,
                )

            # Resolve dependency if not already resolved
            if dep_id not in self._resolved:
                try:
                    look_up_option(dep_id, self._items, print_all_options=False)
                except ValueError as err:
                    msg = f"the referring item `@{dep_id}` is not defined in the config content."
                    if not self.allow_missing_reference:
                        available_keys = list(self._items.keys())
                        raise ConfigKeyError(
                            f"Reference '@{dep_id}' not found in configuration",
                            source_location=item.source_location if hasattr(item, "source_location") else None,
                            missing_key=dep_id,
                            available_keys=available_keys,
                        ) from err
                    warnings.warn(msg, stacklevel=2)
                    continue

                # Recursively resolve dependency
                self._resolve_one_item(
                    id=dep_id,
                    waiting_list=waiting_list,
                    _depth=_depth + 1,
                    instantiate=instantiate,
                    eval_expr=eval_expr,
                )
                waiting_list.discard(dep_id)

        # All dependencies resolved, now resolve this item
        new_config = self.update_config_with_refs(config=item_config, id=id, refs=self._resolved)
        item.update_config(config=new_config)

        # Generate final resolved value based on item type
        if isinstance(item, Component):
            self._resolved[id] = item.instantiate() if instantiate else item
        elif isinstance(item, Expression):
            self._resolved[id] = item.evaluate(globals={f"{self._vars}": self._resolved}) if eval_expr else item
        else:
            self._resolved[id] = new_config

        return self._resolved[id]

    @classmethod
    def normalize_id(cls, id: str | int) -> str:
        """Normalize ID to string format.

        Args:
            id: ID to normalize

        Returns:
            String ID
        """
        return str(id)

    @classmethod
    def split_id(cls, id: str | int, last: bool = False) -> list[str]:
        """Split ID string by separator.

        Args:
            id: ID to split
            last: If True, only split rightmost part

        Returns:
            List of ID components
        """
        if not last:
            return cls.normalize_id(id).split(cls.sep)
        res = cls.normalize_id(id).rsplit(cls.sep, 1)
        return ["".join(res[:-1]), res[-1]]

    @classmethod
    def iter_subconfigs(cls, id: str, config: Any) -> Iterator[tuple[str, str, Any]]:
        """Iterate over sub-configs with IDs.

        Args:
            id: Current ID path
            config: Config to iterate (dict or list)

        Yields:
            Tuples of (key, sub_id, value)
        """
        for k, v in config.items() if isinstance(config, dict) else enumerate(config):
            sub_id = f"{id}{cls.sep}{k}" if id != "" else f"{k}"
            yield k, sub_id, v  # type: ignore[misc]

    @classmethod
    def match_refs_pattern(cls, value: str) -> dict[str, int]:
        """Find reference patterns in a string.

        Args:
            value: String to search for references

        Returns:
            Dict mapping reference IDs to occurrence counts
        """
        value = normalize_id(value)
        return scan_references(value)

    @classmethod
    def update_refs_pattern(cls, value: str, refs: dict[str, Any]) -> str:
        """Replace reference patterns with resolved values.

        Args:
            value: String containing references
            refs: Dict of resolved references

        Returns:
            String with references replaced
        """
        value = normalize_id(value)

        try:
            return replace_references(value, refs, cls._vars)
        except KeyError as e:
            # Extract reference ID from error message
            # The error message format is: "Reference '@ref_id' not found in resolved references"
            ref_id = str(e).split("'")[1].lstrip("@")
            msg = f"can not find expected ID '{ref_id}' in the references."
            if not cls.allow_missing_reference:
                raise KeyError(msg) from e
            warnings.warn(msg, stacklevel=2)
            return value

    @classmethod
    def find_refs_in_config(cls, config: Any, id: str, refs: dict[str, int] | None = None) -> dict[str, int]:
        """Recursively find all references in config.

        Args:
            config: Config to search
            id: Current ID path
            refs: Accumulated references dict

        Returns:
            Dict of reference IDs to counts
        """
        refs_ = refs or {}

        # Check string values for reference patterns
        if isinstance(config, str):
            for ref_id, count in cls.match_refs_pattern(value=config).items():
                refs_[ref_id] = refs_.get(ref_id, 0) + count

        # Recursively search nested structures
        if isinstance(config, (list, dict)):
            for _, sub_id, v in cls.iter_subconfigs(id, config):
                # Instantiable and expression items are also dependencies
                if (Component.is_instantiable(v) or Expression.is_expression(v)) and sub_id not in refs_:
                    refs_[sub_id] = 1
                refs_ = cls.find_refs_in_config(v, sub_id, refs_)

        return refs_

    @classmethod
    def update_config_with_refs(cls, config: Any, id: str, refs: dict[str, Any] | None = None) -> Any:
        """Update config by replacing references with resolved values.

        Args:
            config: Config to update
            id: Current ID path
            refs: Dict of resolved references

        Returns:
            Config with references replaced
        """
        refs_ = refs or {}

        # Replace references in strings
        if isinstance(config, str):
            return cls.update_refs_pattern(config, refs_)

        # Return non-container types as-is
        if not isinstance(config, (list, dict)):
            return config

        # Recursively update nested structures
        ret = type(config)()
        for idx, sub_id, v in cls.iter_subconfigs(id, config):
            if Component.is_instantiable(v) or Expression.is_expression(v):
                updated = refs_[sub_id]
                # Skip disabled components
                if Component.is_instantiable(v) and updated is None:
                    continue
            else:
                updated = cls.update_config_with_refs(v, sub_id, refs_)

            if isinstance(ret, dict):
                ret[idx] = updated
            else:
                ret.append(updated)

        return ret

__init__(items=None)

Initialize resolver with optional items.

Parameters:

Name Type Description Default
items list[Item] | None

Optional list of Items to add during initialization

None
Source code in src/sparkwheel/resolver.py
def __init__(self, items: list[Item] | None = None):
    """Initialize resolver with optional items.

    Args:
        items: Optional list of Items to add during initialization
    """
    self._items: dict[str, Item] = {}
    self._resolved: dict[str, Any] = {}

    if items:
        for item in items:
            self.add_item(item)

_resolve_one_item(id, waiting_list=None, _depth=0, instantiate=True, eval_expr=True, default=None)

Internal recursive resolution implementation.

Parameters:

Name Type Description Default
id str

ID to resolve

required
waiting_list set[str] | None

Set of IDs currently being resolved (for cycle detection)

None
_depth int

Current recursion depth (for DoS prevention)

0
instantiate bool

Whether to instantiate components

True
eval_expr bool

Whether to evaluate expressions

True
default Any

Default value if not found

None

Returns:

Type Description
Any

Resolved value

Raises:

Type Description
RecursionError

If max depth exceeded

CircularReferenceError

If circular reference detected

ConfigKeyError

If reference not found

Source code in src/sparkwheel/resolver.py
def _resolve_one_item(
    self,
    id: str,
    waiting_list: set[str] | None = None,
    _depth: int = 0,
    instantiate: bool = True,
    eval_expr: bool = True,
    default: Any = None,
) -> Any:
    """Internal recursive resolution implementation.

    Args:
        id: ID to resolve
        waiting_list: Set of IDs currently being resolved (for cycle detection)
        _depth: Current recursion depth (for DoS prevention)
        instantiate: Whether to instantiate components
        eval_expr: Whether to evaluate expressions
        default: Default value if not found

    Returns:
        Resolved value

    Raises:
        RecursionError: If max depth exceeded
        CircularReferenceError: If circular reference detected
        ConfigKeyError: If reference not found
    """
    # Prevent stack overflow attacks
    if _depth >= self.max_resolution_depth:
        raise RecursionError(
            f"Maximum reference resolution depth ({self.max_resolution_depth}) exceeded while resolving '{id}'. "
            f"This may indicate an overly complex configuration or a potential DoS attack."
        )

    id = self.normalize_id(id)

    # Return cached result if available
    if id in self._resolved:
        return self._resolved[id]

    # Look up the item
    try:
        item = look_up_option(id, self._items, print_all_options=False, default=default or "no_default")
    except ValueError as err:
        # Provide helpful error with suggestions
        source_location = None
        for config_item in self._items.values():
            if hasattr(config_item, "source_location") and config_item.source_location:
                source_location = config_item.source_location
                break

        available_keys = list(self._items.keys())
        config_context = None

        # For nested IDs, try to get parent context to show available keys
        if ID_SEP_KEY in id:
            parent_id = ID_SEP_KEY.join(id.split(ID_SEP_KEY)[:-1])
            try:
                parent_item = self.get_item(parent_id)
                if parent_item and isinstance(parent_item.get_config(), dict):
                    config_context = parent_item.get_config()
            except (ValueError, KeyError):
                pass

        raise ConfigKeyError(
            f"Config ID '{id}' not found in the configuration",
            source_location=source_location,
            missing_key=id,
            available_keys=available_keys,
            config_context=config_context,
        ) from err

    # If default was returned, just return it
    if not isinstance(item, Item):
        return item

    item_config = item.get_config()

    # Initialize waiting list for circular reference detection
    if waiting_list is None:
        waiting_list = set()
    waiting_list.add(id)

    # First, resolve any import expressions (they need to run first)
    for t, v in self._items.items():
        if t not in self._resolved and isinstance(v, Expression) and v.is_import_statement(v.get_config()):
            self._resolved[t] = v.evaluate() if eval_expr else v

    # Find all references in this item's config
    refs = self.find_refs_in_config(config=item_config, id=id)

    # Resolve dependencies first
    for dep_id in refs.keys():
        # Check for circular references
        if dep_id in waiting_list:
            raise CircularReferenceError(
                f"Circular reference detected: '{dep_id}' references back to '{id}'",
                source_location=item.source_location if hasattr(item, "source_location") else None,
            )

        # Resolve dependency if not already resolved
        if dep_id not in self._resolved:
            try:
                look_up_option(dep_id, self._items, print_all_options=False)
            except ValueError as err:
                msg = f"the referring item `@{dep_id}` is not defined in the config content."
                if not self.allow_missing_reference:
                    available_keys = list(self._items.keys())
                    raise ConfigKeyError(
                        f"Reference '@{dep_id}' not found in configuration",
                        source_location=item.source_location if hasattr(item, "source_location") else None,
                        missing_key=dep_id,
                        available_keys=available_keys,
                    ) from err
                warnings.warn(msg, stacklevel=2)
                continue

            # Recursively resolve dependency
            self._resolve_one_item(
                id=dep_id,
                waiting_list=waiting_list,
                _depth=_depth + 1,
                instantiate=instantiate,
                eval_expr=eval_expr,
            )
            waiting_list.discard(dep_id)

    # All dependencies resolved, now resolve this item
    new_config = self.update_config_with_refs(config=item_config, id=id, refs=self._resolved)
    item.update_config(config=new_config)

    # Generate final resolved value based on item type
    if isinstance(item, Component):
        self._resolved[id] = item.instantiate() if instantiate else item
    elif isinstance(item, Expression):
        self._resolved[id] = item.evaluate(globals={f"{self._vars}": self._resolved}) if eval_expr else item
    else:
        self._resolved[id] = new_config

    return self._resolved[id]

add_item(item)

Add a Item to resolve.

Parameters:

Name Type Description Default
item Item

Item to add

required
Source code in src/sparkwheel/resolver.py
def add_item(self, item: Item) -> None:
    """Add a Item to resolve.

    Args:
        item: Item to add
    """
    id = item.get_id()
    if id in self._items:
        warnings.warn(
            f"Duplicate config item ID '{id}' detected. "
            f"The new item will be ignored and the existing item will be kept. "
            f"This may indicate a configuration error.",
            UserWarning,
            stacklevel=2,
        )
        return
    self._items[id] = item

add_items(items)

Add multiple Items at once.

Parameters:

Name Type Description Default
items list[Item]

List of Items to add

required
Source code in src/sparkwheel/resolver.py
def add_items(self, items: list[Item]) -> None:
    """Add multiple Items at once.

    Args:
        items: List of Items to add
    """
    for item in items:
        self.add_item(item)

find_refs_in_config(config, id, refs=None) classmethod

Recursively find all references in config.

Parameters:

Name Type Description Default
config Any

Config to search

required
id str

Current ID path

required
refs dict[str, int] | None

Accumulated references dict

None

Returns:

Type Description
dict[str, int]

Dict of reference IDs to counts

Source code in src/sparkwheel/resolver.py
@classmethod
def find_refs_in_config(cls, config: Any, id: str, refs: dict[str, int] | None = None) -> dict[str, int]:
    """Recursively find all references in config.

    Args:
        config: Config to search
        id: Current ID path
        refs: Accumulated references dict

    Returns:
        Dict of reference IDs to counts
    """
    refs_ = refs or {}

    # Check string values for reference patterns
    if isinstance(config, str):
        for ref_id, count in cls.match_refs_pattern(value=config).items():
            refs_[ref_id] = refs_.get(ref_id, 0) + count

    # Recursively search nested structures
    if isinstance(config, (list, dict)):
        for _, sub_id, v in cls.iter_subconfigs(id, config):
            # Instantiable and expression items are also dependencies
            if (Component.is_instantiable(v) or Expression.is_expression(v)) and sub_id not in refs_:
                refs_[sub_id] = 1
            refs_ = cls.find_refs_in_config(v, sub_id, refs_)

    return refs_

get_item(id, resolve=False, **kwargs)

Get Item by id, optionally resolved.

Parameters:

Name Type Description Default
id str

ID of the config item

required
resolve bool

Whether to resolve the item (default: False)

False
**kwargs Any

Additional arguments for resolution

{}

Returns:

Type Description
Item | None

Item if found, None otherwise (or resolved value if resolve=True)

Source code in src/sparkwheel/resolver.py
def get_item(self, id: str, resolve: bool = False, **kwargs: Any) -> Item | None:
    """Get Item by id, optionally resolved.

    Args:
        id: ID of the config item
        resolve: Whether to resolve the item (default: False)
        **kwargs: Additional arguments for resolution

    Returns:
        Item if found, None otherwise (or resolved value if resolve=True)
    """
    id = self.normalize_id(id)
    if resolve and id not in self._resolved:
        self._resolve_one_item(id=id, **kwargs)
    return self._items.get(id)

is_resolved()

Check if any items have been resolved.

Source code in src/sparkwheel/resolver.py
def is_resolved(self) -> bool:
    """Check if any items have been resolved."""
    return bool(self._resolved)

iter_subconfigs(id, config) classmethod

Iterate over sub-configs with IDs.

Parameters:

Name Type Description Default
id str

Current ID path

required
config Any

Config to iterate (dict or list)

required

Yields:

Type Description
tuple[str, str, Any]

Tuples of (key, sub_id, value)

Source code in src/sparkwheel/resolver.py
@classmethod
def iter_subconfigs(cls, id: str, config: Any) -> Iterator[tuple[str, str, Any]]:
    """Iterate over sub-configs with IDs.

    Args:
        id: Current ID path
        config: Config to iterate (dict or list)

    Yields:
        Tuples of (key, sub_id, value)
    """
    for k, v in config.items() if isinstance(config, dict) else enumerate(config):
        sub_id = f"{id}{cls.sep}{k}" if id != "" else f"{k}"
        yield k, sub_id, v  # type: ignore[misc]

match_refs_pattern(value) classmethod

Find reference patterns in a string.

Parameters:

Name Type Description Default
value str

String to search for references

required

Returns:

Type Description
dict[str, int]

Dict mapping reference IDs to occurrence counts

Source code in src/sparkwheel/resolver.py
@classmethod
def match_refs_pattern(cls, value: str) -> dict[str, int]:
    """Find reference patterns in a string.

    Args:
        value: String to search for references

    Returns:
        Dict mapping reference IDs to occurrence counts
    """
    value = normalize_id(value)
    return scan_references(value)

normalize_id(id) classmethod

Normalize ID to string format.

Parameters:

Name Type Description Default
id str | int

ID to normalize

required

Returns:

Type Description
str

String ID

Source code in src/sparkwheel/resolver.py
@classmethod
def normalize_id(cls, id: str | int) -> str:
    """Normalize ID to string format.

    Args:
        id: ID to normalize

    Returns:
        String ID
    """
    return str(id)

reset()

Clear all items and resolved content.

Source code in src/sparkwheel/resolver.py
def reset(self) -> None:
    """Clear all items and resolved content."""
    self._items = {}
    self._resolved = {}

resolve(id='', instantiate=True, eval_expr=True, default=None)

Resolve a config item and return the result.

Resolves all references, instantiates components (if requested), and evaluates expressions (if requested). Results are cached for efficiency.

Parameters:

Name Type Description Default
id str

ID of item to resolve (empty string for root)

''
instantiate bool

Whether to instantiate components with target

True
eval_expr bool

Whether to evaluate expressions starting with $

True
default Any

Default value if id not found

None

Returns:

Type Description
Any

Resolved value (instantiated object, evaluated result, or raw value)

Raises:

Type Description
ConfigKeyError

If id not found and no default provided

CircularReferenceError

If circular reference detected

Source code in src/sparkwheel/resolver.py
def resolve(
    self,
    id: str = "",
    instantiate: bool = True,
    eval_expr: bool = True,
    default: Any = None,
) -> Any:
    """Resolve a config item and return the result.

    Resolves all references, instantiates components (if requested), and
    evaluates expressions (if requested). Results are cached for efficiency.

    Args:
        id: ID of item to resolve (empty string for root)
        instantiate: Whether to instantiate components with _target_
        eval_expr: Whether to evaluate expressions starting with $
        default: Default value if id not found

    Returns:
        Resolved value (instantiated object, evaluated result, or raw value)

    Raises:
        ConfigKeyError: If id not found and no default provided
        CircularReferenceError: If circular reference detected
    """
    return self._resolve_one_item(id=id, instantiate=instantiate, eval_expr=eval_expr, default=default)

split_id(id, last=False) classmethod

Split ID string by separator.

Parameters:

Name Type Description Default
id str | int

ID to split

required
last bool

If True, only split rightmost part

False

Returns:

Type Description
list[str]

List of ID components

Source code in src/sparkwheel/resolver.py
@classmethod
def split_id(cls, id: str | int, last: bool = False) -> list[str]:
    """Split ID string by separator.

    Args:
        id: ID to split
        last: If True, only split rightmost part

    Returns:
        List of ID components
    """
    if not last:
        return cls.normalize_id(id).split(cls.sep)
    res = cls.normalize_id(id).rsplit(cls.sep, 1)
    return ["".join(res[:-1]), res[-1]]

update_config_with_refs(config, id, refs=None) classmethod

Update config by replacing references with resolved values.

Parameters:

Name Type Description Default
config Any

Config to update

required
id str

Current ID path

required
refs dict[str, Any] | None

Dict of resolved references

None

Returns:

Type Description
Any

Config with references replaced

Source code in src/sparkwheel/resolver.py
@classmethod
def update_config_with_refs(cls, config: Any, id: str, refs: dict[str, Any] | None = None) -> Any:
    """Update config by replacing references with resolved values.

    Args:
        config: Config to update
        id: Current ID path
        refs: Dict of resolved references

    Returns:
        Config with references replaced
    """
    refs_ = refs or {}

    # Replace references in strings
    if isinstance(config, str):
        return cls.update_refs_pattern(config, refs_)

    # Return non-container types as-is
    if not isinstance(config, (list, dict)):
        return config

    # Recursively update nested structures
    ret = type(config)()
    for idx, sub_id, v in cls.iter_subconfigs(id, config):
        if Component.is_instantiable(v) or Expression.is_expression(v):
            updated = refs_[sub_id]
            # Skip disabled components
            if Component.is_instantiable(v) and updated is None:
                continue
        else:
            updated = cls.update_config_with_refs(v, sub_id, refs_)

        if isinstance(ret, dict):
            ret[idx] = updated
        else:
            ret.append(updated)

    return ret

update_refs_pattern(value, refs) classmethod

Replace reference patterns with resolved values.

Parameters:

Name Type Description Default
value str

String containing references

required
refs dict[str, Any]

Dict of resolved references

required

Returns:

Type Description
str

String with references replaced

Source code in src/sparkwheel/resolver.py
@classmethod
def update_refs_pattern(cls, value: str, refs: dict[str, Any]) -> str:
    """Replace reference patterns with resolved values.

    Args:
        value: String containing references
        refs: Dict of resolved references

    Returns:
        String with references replaced
    """
    value = normalize_id(value)

    try:
        return replace_references(value, refs, cls._vars)
    except KeyError as e:
        # Extract reference ID from error message
        # The error message format is: "Reference '@ref_id' not found in resolved references"
        ref_id = str(e).split("'")[1].lstrip("@")
        msg = f"can not find expected ID '{ref_id}' in the references."
        if not cls.allow_missing_reference:
            raise KeyError(msg) from e
        warnings.warn(msg, stacklevel=2)
        return value

SourceLocation dataclass

Tracks the source location of a config item.

Source code in src/sparkwheel/utils/exceptions.py
@dataclass
class SourceLocation:
    """Tracks the source location of a config item."""

    filepath: str
    line: int
    column: int = 0
    id: str = ""

    def __str__(self) -> str:
        return f"{self.filepath}:{self.line}"

ValidationError

Bases: BaseError

Raised when configuration validation fails.

Attributes:

Name Type Description
message

Error description

field_path

Dot-separated path to the invalid field (e.g., "model.optimizer.lr")

expected_type

The type that was expected

actual_value

The value that failed validation

source_location

Optional location in source file where error occurred

Source code in src/sparkwheel/schema.py
class ValidationError(BaseError):
    """Raised when configuration validation fails.

    Attributes:
        message: Error description
        field_path: Dot-separated path to the invalid field (e.g., "model.optimizer.lr")
        expected_type: The type that was expected
        actual_value: The value that failed validation
        source_location: Optional location in source file where error occurred
    """

    def __init__(
        self,
        message: str,
        field_path: str = "",
        expected_type: type | None = None,
        actual_value: Any = None,
        source_location: SourceLocation | None = None,
    ):
        """Initialize validation error.

        Args:
            message: Human-readable error message
            field_path: Dot-separated path to invalid field
            expected_type: Expected type for the field
            actual_value: The actual value that failed validation
            source_location: Source location where the invalid value was defined
        """
        self.field_path = field_path
        self.expected_type = expected_type
        self.actual_value = actual_value

        # Build detailed message
        full_message = message
        if field_path:
            full_message = f"Validation error at '{field_path}': {message}"
        if expected_type is not None:
            type_name = getattr(expected_type, "__name__", str(expected_type))
            full_message += f"\n  Expected type: {type_name}"
        if actual_value is not None:
            actual_type = type(actual_value).__name__
            full_message += f"\n  Actual type: {actual_type}"
            full_message += f"\n  Actual value: {actual_value!r}"

        super().__init__(full_message, source_location=source_location)

__init__(message, field_path='', expected_type=None, actual_value=None, source_location=None)

Initialize validation error.

Parameters:

Name Type Description Default
message str

Human-readable error message

required
field_path str

Dot-separated path to invalid field

''
expected_type type | None

Expected type for the field

None
actual_value Any

The actual value that failed validation

None
source_location SourceLocation | None

Source location where the invalid value was defined

None
Source code in src/sparkwheel/schema.py
def __init__(
    self,
    message: str,
    field_path: str = "",
    expected_type: type | None = None,
    actual_value: Any = None,
    source_location: SourceLocation | None = None,
):
    """Initialize validation error.

    Args:
        message: Human-readable error message
        field_path: Dot-separated path to invalid field
        expected_type: Expected type for the field
        actual_value: The actual value that failed validation
        source_location: Source location where the invalid value was defined
    """
    self.field_path = field_path
    self.expected_type = expected_type
    self.actual_value = actual_value

    # Build detailed message
    full_message = message
    if field_path:
        full_message = f"Validation error at '{field_path}': {message}"
    if expected_type is not None:
        type_name = getattr(expected_type, "__name__", str(expected_type))
        full_message += f"\n  Expected type: {type_name}"
    if actual_value is not None:
        actual_type = type(actual_value).__name__
        full_message += f"\n  Actual type: {actual_type}"
        full_message += f"\n  Actual value: {actual_value!r}"

    super().__init__(full_message, source_location=source_location)

apply_operators(base, override)

Apply configuration changes with composition-by-default semantics.

Default behavior: Compose (merge dicts, extend lists) Operators: =key: value - Replace operator: completely replace value (override default) ~key: null - Remove operator: delete key or list items (idempotent) key: value - Compose (default): merge dict or extend list

Composition-by-Default Philosophy
  • Dicts merge recursively by default (preserves existing keys)
  • Lists extend by default (append new items)
  • Only scalars and type mismatches replace
  • Use = to explicitly replace entire dicts or lists
  • Use ~ to delete keys (idempotent - no error if missing)

Parameters:

Name Type Description Default
base dict[str, Any]

Base configuration dict

required
override dict[str, Any]

Override configuration dict with optional =/~ operators

required

Returns:

Type Description
dict[str, Any]

Merged configuration dict

Raises:

Type Description
ConfigMergeError

If operators are used incorrectly

Examples:

>>> # Default: Dicts merge
>>> base = {"a": 1, "b": {"x": 1, "y": 2}}
>>> override = {"b": {"x": 10}}
>>> apply_operators(base, override)
{"a": 1, "b": {"x": 10, "y": 2}}
>>> # Default: Lists extend
>>> base = {"plugins": ["logger", "metrics"]}
>>> override = {"plugins": ["cache"]}
>>> apply_operators(base, override)
{"plugins": ["logger", "metrics", "cache"]}
>>> # Replace operator: explicit override
>>> base = {"model": {"lr": 0.001, "dropout": 0.1}}
>>> override = {"=model": {"lr": 0.01}}
>>> apply_operators(base, override)
{"model": {"lr": 0.01}}
>>> # Remove operator: delete key (idempotent)
>>> base = {"a": 1, "b": 2, "c": 3}
>>> override = {"b": 5, "~c": None}
>>> apply_operators(base, override)
{"a": 1, "b": 5}
Source code in src/sparkwheel/operators.py
def apply_operators(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
    """Apply configuration changes with composition-by-default semantics.

    Default behavior: Compose (merge dicts, extend lists)
    Operators:
        =key: value   - Replace operator: completely replace value (override default)
        ~key: null    - Remove operator: delete key or list items (idempotent)
        key: value    - Compose (default): merge dict or extend list

    Composition-by-Default Philosophy:
        - Dicts merge recursively by default (preserves existing keys)
        - Lists extend by default (append new items)
        - Only scalars and type mismatches replace
        - Use = to explicitly replace entire dicts or lists
        - Use ~ to delete keys (idempotent - no error if missing)

    Args:
        base: Base configuration dict
        override: Override configuration dict with optional =/~ operators

    Returns:
        Merged configuration dict

    Raises:
        ConfigMergeError: If operators are used incorrectly

    Examples:
        >>> # Default: Dicts merge
        >>> base = {"a": 1, "b": {"x": 1, "y": 2}}
        >>> override = {"b": {"x": 10}}
        >>> apply_operators(base, override)
        {"a": 1, "b": {"x": 10, "y": 2}}

        >>> # Default: Lists extend
        >>> base = {"plugins": ["logger", "metrics"]}
        >>> override = {"plugins": ["cache"]}
        >>> apply_operators(base, override)
        {"plugins": ["logger", "metrics", "cache"]}

        >>> # Replace operator: explicit override
        >>> base = {"model": {"lr": 0.001, "dropout": 0.1}}
        >>> override = {"=model": {"lr": 0.01}}
        >>> apply_operators(base, override)
        {"model": {"lr": 0.01}}

        >>> # Remove operator: delete key (idempotent)
        >>> base = {"a": 1, "b": 2, "c": 3}
        >>> override = {"b": 5, "~c": None}
        >>> apply_operators(base, override)
        {"a": 1, "b": 5}
    """
    if not isinstance(base, dict) or not isinstance(override, dict):
        return deepcopy(override)  # type: ignore[unreachable]

    result = deepcopy(base)

    for key, value in override.items():
        if not isinstance(key, str):
            result[key] = deepcopy(value)  # type: ignore[unreachable]
            continue

        # Process replace operator (=key)
        if key.startswith(REPLACE_KEY):
            actual_key = key[1:]
            result[actual_key] = deepcopy(value)
            continue

        # Process remove operator (~key)
        if key.startswith(REMOVE_KEY):
            actual_key = key[1:]
            _validate_delete_operator(actual_key, value)

            # Idempotent: no error if key doesn't exist
            if actual_key not in result:
                continue  # Silently skip

            # Handle remove entire key (null or empty value)
            if value is None or value == "":
                del result[actual_key]
                continue

            # Handle remove specific items from list or dict (list value)
            if isinstance(value, list):
                base_val = result[actual_key]

                # Remove from list by indices
                if isinstance(base_val, list):
                    list_len = len(base_val)

                    # Validate all items are integers and normalize negative indices
                    normalized_indices = []
                    for idx in value:
                        if not isinstance(idx, int):
                            raise ConfigMergeError(
                                f"Cannot remove from list '{actual_key}': index must be integer, got {type(idx).__name__}",
                                suggestion=f"When removing from a list, provide integer indices.\n\n"
                                f"Example:\n"
                                f"  ~{actual_key}: [0, 2, 4]  # Remove items at indices 0, 2, 4\n"
                                f"  ~{actual_key}: [-1]       # Remove last item",
                            )

                        # Validate index is in bounds
                        if idx >= list_len or idx < -list_len:
                            raise ConfigMergeError(
                                f"Cannot remove from list '{actual_key}': index {idx} out of range (list has {list_len} items)",
                                suggestion=f"Valid indices are 0 to {list_len - 1}, or -{list_len} to -1.\n"
                                f"Use null to remove the entire list:\n"
                                f"  ~{actual_key}: null",
                            )

                        # Normalize negative indices to positive
                        normalized_idx = idx if idx >= 0 else list_len + idx
                        normalized_indices.append(normalized_idx)

                    # Sort indices in descending order and remove duplicates
                    sorted_indices = sorted(set(normalized_indices), reverse=True)

                    # Remove in descending order to avoid shifting issues
                    for idx in sorted_indices:
                        del base_val[idx]

                # Remove from dict by keys
                elif isinstance(base_val, dict):
                    for del_key in value:
                        if del_key not in base_val:
                            raise ConfigMergeError(
                                f"Cannot remove non-existent key '{del_key}' from '{actual_key}'",
                                suggestion=f"The key '{del_key}' does not exist in '{actual_key}'.\n"
                                f"Available keys: {list(base_val.keys())}",
                            )
                        del base_val[del_key]

                else:
                    raise ConfigMergeError(
                        f"Cannot remove items from '{actual_key}': expected list or dict, got {type(base_val).__name__}",
                        suggestion=f"Item removal with '~{actual_key}: [...]' only works for lists and dicts.\n"
                        f"To remove the entire key:\n"
                        f"  ~{actual_key}: null",
                    )

                continue

        # No operator - COMPOSITION-BY-DEFAULT behavior
        if key in result:
            base_val = result[key]

            # For dicts: MERGE (composition)
            if isinstance(base_val, dict) and isinstance(value, dict):
                result[key] = apply_operators(base_val, value)
                continue

            # For lists: EXTEND (composition)
            if isinstance(base_val, list) and isinstance(value, list):
                result[key] = base_val + value
                continue

            # For scalars: REPLACE
            # For type mismatches: REPLACE

        # Set/replace (for new keys or non-matching types)
        result[key] = deepcopy(value)

    return result

enable_colors(enabled=None)

Enable or disable color output.

Parameters:

Name Type Description Default
enabled bool | None

True to enable, False to disable, None for auto-detection

None

Returns:

Type Description
bool

Current color enable status

Examples:

>>> enable_colors(False)  # Disable colors
False
>>> enable_colors(True)   # Force enable colors
True
>>> enable_colors()       # Auto-detect
True  # (if terminal supports it)
Source code in src/sparkwheel/errors/formatters.py
def enable_colors(enabled: bool | None = None) -> bool:
    """Enable or disable color output.

    Args:
        enabled: True to enable, False to disable, None for auto-detection

    Returns:
        Current color enable status

    Examples:
        >>> enable_colors(False)  # Disable colors
        False
        >>> enable_colors(True)   # Force enable colors
        True
        >>> enable_colors()       # Auto-detect
        True  # (if terminal supports it)
    """
    global _COLORS_ENABLED

    if enabled is None:
        _COLORS_ENABLED = _supports_color()
    else:
        _COLORS_ENABLED = enabled

    return _COLORS_ENABLED

parse_overrides(args)

Parse CLI argument overrides with automatic type inference.

Supports only key=value syntax with operator prefixes. Types are automatically inferred using ast.literal_eval().

Parameters:

Name Type Description Default
args list[str]

List of argument strings to parse (e.g., from argparse)

required

Returns:

Type Description
dict[str, Any]

Dictionary of parsed key-value pairs with inferred types.

dict[str, Any]

Keys may have operator prefixes (=key for replace, ~key for delete).

Operators
  • key=value - Normal assignment (composes/merges)
  • =key=value - Replace operator (completely replaces key)
  • ~key - Delete operator (removes key)

Examples:

>>> # Basic overrides (compose/merge)
>>> parse_overrides(["model::lr=0.001", "debug=True"])
{"model::lr": 0.001, "debug": True}
>>> # With operators
>>> parse_overrides(["=model={'_target_': 'ResNet'}", "~old_param"])
{"=model": {'_target_': 'ResNet'}, "~old_param": None}
>>> # Nested paths with operators
>>> parse_overrides(["=optimizer::lr=0.01", "~model::old_param"])
{"=optimizer::lr": 0.01, "~model::old_param": None}
Note

The '=' character serves dual purpose: - In 'key=value' → assignment operator (CLI syntax) - In '=key=value' → replace operator prefix (config operator)

Source code in src/sparkwheel/config.py
def parse_overrides(args: list[str]) -> dict[str, Any]:
    """Parse CLI argument overrides with automatic type inference.

    Supports only key=value syntax with operator prefixes.
    Types are automatically inferred using ast.literal_eval().

    Args:
        args: List of argument strings to parse (e.g., from argparse)

    Returns:
        Dictionary of parsed key-value pairs with inferred types.
        Keys may have operator prefixes (=key for replace, ~key for delete).

    Operators:
        - key=value    - Normal assignment (composes/merges)
        - =key=value   - Replace operator (completely replaces key)
        - ~key         - Delete operator (removes key)

    Examples:
        >>> # Basic overrides (compose/merge)
        >>> parse_overrides(["model::lr=0.001", "debug=True"])
        {"model::lr": 0.001, "debug": True}

        >>> # With operators
        >>> parse_overrides(["=model={'_target_': 'ResNet'}", "~old_param"])
        {"=model": {'_target_': 'ResNet'}, "~old_param": None}

        >>> # Nested paths with operators
        >>> parse_overrides(["=optimizer::lr=0.01", "~model::old_param"])
        {"=optimizer::lr": 0.01, "~model::old_param": None}

    Note:
        The '=' character serves dual purpose:
        - In 'key=value' → assignment operator (CLI syntax)
        - In '=key=value' → replace operator prefix (config operator)
    """
    import ast

    overrides: dict[str, Any] = {}

    for arg in args:
        # Handle delete operator: ~key
        if arg.startswith("~"):
            key = arg  # Keep the ~ prefix
            overrides[key] = None
            continue

        # Handle replace operator: =key=value
        if arg.startswith("=") and "=" in arg[1:]:
            # Remove the = prefix, then split on first =
            rest = arg[1:]  # Remove leading =
            key, value = rest.split("=", 1)
            key = "=" + key  # Add back the = prefix to the key
            try:
                value = ast.literal_eval(value)
            except (ValueError, SyntaxError):
                pass  # Keep as string
            overrides[key] = value
            continue

        # Handle normal assignment: key=value
        if "=" in arg:
            key, value = arg.split("=", 1)
            try:
                value = ast.literal_eval(value)
            except (ValueError, SyntaxError):
                pass  # Keep as string
            overrides[key] = value
            continue

    return overrides

validate(config, schema, field_path='', metadata=None, allow_missing=False, strict=True)

Validate configuration against a dataclass schema.

Performs recursive type checking to ensure the configuration matches the structure and types defined in the dataclass schema.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary to validate

required
schema type

Dataclass type defining the expected structure

required
field_path str

Internal parameter for tracking nested field paths

''
metadata Any

Optional metadata registry for source locations

None
allow_missing bool

If True, allow MISSING sentinel values for partial configs

False
strict bool

If True, reject unexpected fields. If False, ignore them.

True

Raises:

Type Description
ValidationError

If validation fails

TypeError

If schema is not a dataclass

Example
from dataclasses import dataclass
from sparkwheel import Config
from sparkwheel.schema import validate

@dataclass
class AppConfig:
    name: str
    port: int
    debug: bool = False

config = Config().update("app.yaml")
validate(config.get(), AppConfig)
Source code in src/sparkwheel/schema.py
def validate(
    config: dict[str, Any],
    schema: type,
    field_path: str = "",
    metadata: Any = None,
    allow_missing: bool = False,
    strict: bool = True,
) -> None:
    """Validate configuration against a dataclass schema.

    Performs recursive type checking to ensure the configuration matches
    the structure and types defined in the dataclass schema.

    Args:
        config: Configuration dictionary to validate
        schema: Dataclass type defining the expected structure
        field_path: Internal parameter for tracking nested field paths
        metadata: Optional metadata registry for source locations
        allow_missing: If True, allow MISSING sentinel values for partial configs
        strict: If True, reject unexpected fields. If False, ignore them.

    Raises:
        ValidationError: If validation fails
        TypeError: If schema is not a dataclass

    Example:
        ```python
        from dataclasses import dataclass
        from sparkwheel import Config
        from sparkwheel.schema import validate

        @dataclass
        class AppConfig:
            name: str
            port: int
            debug: bool = False

        config = Config().update("app.yaml")
        validate(config.get(), AppConfig)
        ```
    """
    if not dataclasses.is_dataclass(schema):
        raise TypeError(f"Schema must be a dataclass, got {type(schema).__name__}")

    if not isinstance(config, dict):
        source_loc = _get_source_location(metadata, field_path) if metadata else None  # type: ignore[unreachable]
        raise ValidationError(
            f"Expected dict for dataclass {schema.__name__}",
            field_path=field_path,
            expected_type=dict,
            actual_value=config,
            source_location=source_loc,
        )

    # Get all fields from the dataclass
    schema_fields = {f.name: f for f in dataclasses.fields(schema)}

    # Check for required fields
    for field_name, field_info in schema_fields.items():
        current_path = f"{field_path}.{field_name}" if field_path else field_name

        # Check if field is missing
        if field_name not in config:
            # Field has default or default_factory -> optional
            if field_info.default is not dataclasses.MISSING or field_info.default_factory is not dataclasses.MISSING:
                continue
            # No default -> required
            source_loc = _get_source_location(metadata, field_path) if metadata else None
            raise ValidationError(
                f"Missing required field '{field_name}'",
                field_path=current_path,
                expected_type=field_info.type,  # type: ignore[arg-type]
                source_location=source_loc,
            )

        # Validate the field value
        _validate_field(
            config[field_name],
            field_info.type,  # type: ignore[arg-type]
            current_path,
            metadata,
            allow_missing=allow_missing,
        )

    # Check for unexpected fields - only if strict mode
    if strict:
        unexpected_fields = set(config.keys()) - set(schema_fields.keys())
        # Filter out sparkwheel special keys
        special_keys = {"_target_", "_disabled_", "_requires_", "_mode_"}
        unexpected_fields = unexpected_fields - special_keys

        if unexpected_fields:
            first_unexpected = sorted(unexpected_fields)[0]
            current_path = f"{field_path}.{first_unexpected}" if field_path else first_unexpected
            source_loc = _get_source_location(metadata, current_path) if metadata else None
            raise ValidationError(
                f"Unexpected field '{first_unexpected}' not in schema {schema.__name__}",
                field_path=current_path,
                source_location=source_loc,
            )

    # Run custom validators
    _run_validators(config, schema, field_path, metadata)

validate_operators(config, parent_key='')

Validate operator usage in config tree.

With composition-by-default, validation is simpler: 1. Remove operators always work (idempotent delete) 2. Replace operators work on any type 3. No parent context requirements

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dict to validate

required
parent_key str

Parent key path (for error messages)

''

Raises:

Type Description
ConfigMergeError

If operator usage is invalid

Source code in src/sparkwheel/operators.py
def validate_operators(config: dict[str, Any], parent_key: str = "") -> None:
    """Validate operator usage in config tree.

    With composition-by-default, validation is simpler:
    1. Remove operators always work (idempotent delete)
    2. Replace operators work on any type
    3. No parent context requirements

    Args:
        config: Configuration dict to validate
        parent_key: Parent key path (for error messages)

    Raises:
        ConfigMergeError: If operator usage is invalid
    """
    if not isinstance(config, dict):
        return  # type: ignore[unreachable]

    for key, value in config.items():
        if not isinstance(key, str):
            continue  # type: ignore[unreachable]

        actual_key = key
        operator = None

        # Detect operator
        if key.startswith(REPLACE_KEY):
            actual_key = key[1:]
            operator = "replace"
        elif key.startswith(REMOVE_KEY):
            actual_key = key[1:]
            operator = "remove"

        full_key = f"{parent_key}::{actual_key}" if parent_key else actual_key

        # Validate remove operator
        if operator == "remove":
            _validate_delete_operator(actual_key, value)

        # Recurse into nested dicts
        if isinstance(value, dict) and operator != "remove":
            validate_operators(value, full_key)

validator(func)

Decorator to mark a method as a validator.

Validators run after type checking and can validate single fields or relationships between fields. Raise ValueError on failure.

Example

@dataclass class Config: lr: float start: int end: int

@validator
def check_lr(self):
    if not (0 < self.lr < 1):
        raise ValueError("lr must be between 0 and 1")

@validator
def check_range(self):
    if self.end <= self.start:
        raise ValueError("end must be > start")
Source code in src/sparkwheel/schema.py
def validator(func):
    """Decorator to mark a method as a validator.

    Validators run after type checking and can validate single fields
    or relationships between fields. Raise ValueError on failure.

    Example:
        @dataclass
        class Config:
            lr: float
            start: int
            end: int

            @validator
            def check_lr(self):
                if not (0 < self.lr < 1):
                    raise ValueError("lr must be between 0 and 1")

            @validator
            def check_range(self):
                if self.end <= self.start:
                    raise ValueError("end must be > start")
    """
    func.__is_validator__ = True
    return func