Skip to content

resolver

Resolve references between Items.

Resolver

Resolve references between Items.

Manages Items and resolves resolved reference strings (starting with @) by substituting them with their corresponding resolved values (instantiated objects, evaluated expressions, etc.).

Example
from sparkwheel import Item, Component, Resolver

resolver = Resolver()

# Add items
resolver.add_item(Item(config=0.001, id="lr"))
resolver.add_item(Item(config={"lr": "@lr"}, id="config"))

# Resolve
result = resolver.resolve("config")
print(result)  # {"lr": 0.001}

Resolved references can use :: separator for nested access: - Dictionary keys: @config:🔑:subkey - List indices: @list::0::subitem

Source code in src/sparkwheel/resolver.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
class Resolver:
    """Resolve references between Items.

    Manages Items and resolves resolved reference strings (starting with @) by
    substituting them with their corresponding resolved values (instantiated objects,
    evaluated expressions, etc.).

    Example:
        ```python
        from sparkwheel import Item, Component, Resolver

        resolver = Resolver()

        # Add items
        resolver.add_item(Item(config=0.001, id="lr"))
        resolver.add_item(Item(config={"lr": "@lr"}, id="config"))

        # Resolve
        result = resolver.resolve("config")
        print(result)  # {"lr": 0.001}
        ```

    Resolved references can use :: separator for nested access:
    - Dictionary keys: @config::key::subkey
    - List indices: @list::0::subitem
    """

    _vars = "__local_refs"  # Variable name for resolved refs in expression evaluation
    sep = ID_SEP_KEY  # Separator for nested key access
    ref = RESOLVED_REF_KEY  # Resolved reference prefix (@)
    allow_missing_reference = allow_missing_reference
    max_resolution_depth = 100  # Prevent DoS from deeply nested references

    def __init__(self, items: list[Item] | None = None):
        """Initialize resolver with optional items.

        Args:
            items: Optional list of Items to add during initialization
        """
        self._items: dict[str, Item] = {}
        self._resolved: dict[str, Any] = {}

        if items:
            for item in items:
                self.add_item(item)

    def reset(self) -> None:
        """Clear all items and resolved content."""
        self._items = {}
        self._resolved = {}

    def is_resolved(self) -> bool:
        """Check if any items have been resolved."""
        return bool(self._resolved)

    def add_item(self, item: Item) -> None:
        """Add a Item to resolve.

        Args:
            item: Item to add
        """
        id = item.get_id()
        if id in self._items:
            warnings.warn(
                f"Duplicate config item ID '{id}' detected. "
                f"The new item will be ignored and the existing item will be kept. "
                f"This may indicate a configuration error.",
                UserWarning,
                stacklevel=2,
            )
            return
        self._items[id] = item

    def add_items(self, items: list[Item]) -> None:
        """Add multiple Items at once.

        Args:
            items: List of Items to add
        """
        for item in items:
            self.add_item(item)

    def get_item(self, id: str, resolve: bool = False, **kwargs: Any) -> Item | None:
        """Get Item by id, optionally resolved.

        Args:
            id: ID of the config item
            resolve: Whether to resolve the item (default: False)
            **kwargs: Additional arguments for resolution

        Returns:
            Item if found, None otherwise (or resolved value if resolve=True)
        """
        id = self.normalize_id(id)
        if resolve and id not in self._resolved:
            self._resolve_one_item(id=id, **kwargs)
        return self._items.get(id)

    def resolve(
        self,
        id: str = "",
        instantiate: bool = True,
        eval_expr: bool = True,
        default: Any = None,
    ) -> Any:
        """Resolve a config item and return the result.

        Resolves all references, instantiates components (if requested), and
        evaluates expressions (if requested). Results are cached for efficiency.

        Args:
            id: ID of item to resolve (empty string for root)
            instantiate: Whether to instantiate components with _target_
            eval_expr: Whether to evaluate expressions starting with $
            default: Default value if id not found

        Returns:
            Resolved value (instantiated object, evaluated result, or raw value)

        Raises:
            ConfigKeyError: If id not found and no default provided
            CircularReferenceError: If circular reference detected
        """
        return self._resolve_one_item(id=id, instantiate=instantiate, eval_expr=eval_expr, default=default)

    def _resolve_one_item(
        self,
        id: str,
        waiting_list: set[str] | None = None,
        _depth: int = 0,
        instantiate: bool = True,
        eval_expr: bool = True,
        default: Any = None,
    ) -> Any:
        """Internal recursive resolution implementation.

        Args:
            id: ID to resolve
            waiting_list: Set of IDs currently being resolved (for cycle detection)
            _depth: Current recursion depth (for DoS prevention)
            instantiate: Whether to instantiate components
            eval_expr: Whether to evaluate expressions
            default: Default value if not found

        Returns:
            Resolved value

        Raises:
            RecursionError: If max depth exceeded
            CircularReferenceError: If circular reference detected
            ConfigKeyError: If reference not found
        """
        # Prevent stack overflow attacks
        if _depth >= self.max_resolution_depth:
            raise RecursionError(
                f"Maximum reference resolution depth ({self.max_resolution_depth}) exceeded while resolving '{id}'. "
                f"This may indicate an overly complex configuration or a potential DoS attack."
            )

        id = self.normalize_id(id)

        # Return cached result if available
        if id in self._resolved:
            return self._resolved[id]

        # Look up the item
        try:
            item = look_up_option(id, self._items, print_all_options=False, default=default or "no_default")
        except ValueError as err:
            # Provide helpful error with suggestions
            source_location = None
            for config_item in self._items.values():
                if hasattr(config_item, "source_location") and config_item.source_location:
                    source_location = config_item.source_location
                    break

            available_keys = list(self._items.keys())
            config_context = None

            # For nested IDs, try to get parent context to show available keys
            if ID_SEP_KEY in id:
                parent_id = ID_SEP_KEY.join(id.split(ID_SEP_KEY)[:-1])
                try:
                    parent_item = self.get_item(parent_id)
                    if parent_item and isinstance(parent_item.get_config(), dict):
                        config_context = parent_item.get_config()
                except (ValueError, KeyError):
                    pass

            raise ConfigKeyError(
                f"Config ID '{id}' not found in the configuration",
                source_location=source_location,
                missing_key=id,
                available_keys=available_keys,
                config_context=config_context,
            ) from err

        # If default was returned, just return it
        if not isinstance(item, Item):
            return item

        item_config = item.get_config()

        # Initialize waiting list for circular reference detection
        if waiting_list is None:
            waiting_list = set()
        waiting_list.add(id)

        # First, resolve any import expressions (they need to run first)
        for t, v in self._items.items():
            if t not in self._resolved and isinstance(v, Expression) and v.is_import_statement(v.get_config()):
                self._resolved[t] = v.evaluate() if eval_expr else v

        # Find all references in this item's config
        refs = self.find_refs_in_config(config=item_config, id=id)

        # Resolve dependencies first
        for dep_id in refs.keys():
            # Check for circular references
            if dep_id in waiting_list:
                raise CircularReferenceError(
                    f"Circular reference detected: '{dep_id}' references back to '{id}'",
                    source_location=item.source_location if hasattr(item, "source_location") else None,
                )

            # Resolve dependency if not already resolved
            if dep_id not in self._resolved:
                try:
                    look_up_option(dep_id, self._items, print_all_options=False)
                except ValueError as err:
                    msg = f"the referring item `@{dep_id}` is not defined in the config content."
                    if not self.allow_missing_reference:
                        available_keys = list(self._items.keys())
                        raise ConfigKeyError(
                            f"Reference '@{dep_id}' not found in configuration",
                            source_location=item.source_location if hasattr(item, "source_location") else None,
                            missing_key=dep_id,
                            available_keys=available_keys,
                        ) from err
                    warnings.warn(msg, stacklevel=2)
                    continue

                # Recursively resolve dependency
                self._resolve_one_item(
                    id=dep_id,
                    waiting_list=waiting_list,
                    _depth=_depth + 1,
                    instantiate=instantiate,
                    eval_expr=eval_expr,
                )
                waiting_list.discard(dep_id)

        # All dependencies resolved, now resolve this item
        new_config = self.update_config_with_refs(config=item_config, id=id, refs=self._resolved)
        item.update_config(config=new_config)

        # Generate final resolved value based on item type
        if isinstance(item, Component):
            self._resolved[id] = item.instantiate() if instantiate else item
        elif isinstance(item, Expression):
            self._resolved[id] = item.evaluate(globals={f"{self._vars}": self._resolved}) if eval_expr else item
        else:
            self._resolved[id] = new_config

        return self._resolved[id]

    @classmethod
    def normalize_id(cls, id: str | int) -> str:
        """Normalize ID to string format.

        Args:
            id: ID to normalize

        Returns:
            String ID
        """
        return str(id)

    @classmethod
    def split_id(cls, id: str | int, last: bool = False) -> list[str]:
        """Split ID string by separator.

        Args:
            id: ID to split
            last: If True, only split rightmost part

        Returns:
            List of ID components
        """
        if not last:
            return cls.normalize_id(id).split(cls.sep)
        res = cls.normalize_id(id).rsplit(cls.sep, 1)
        return ["".join(res[:-1]), res[-1]]

    @classmethod
    def iter_subconfigs(cls, id: str, config: Any) -> Iterator[tuple[str, str, Any]]:
        """Iterate over sub-configs with IDs.

        Args:
            id: Current ID path
            config: Config to iterate (dict or list)

        Yields:
            Tuples of (key, sub_id, value)
        """
        for k, v in config.items() if isinstance(config, dict) else enumerate(config):
            sub_id = f"{id}{cls.sep}{k}" if id != "" else f"{k}"
            yield k, sub_id, v  # type: ignore[misc]

    @classmethod
    def match_refs_pattern(cls, value: str) -> dict[str, int]:
        """Find reference patterns in a string.

        Args:
            value: String to search for references

        Returns:
            Dict mapping reference IDs to occurrence counts
        """
        value = normalize_id(value)
        return scan_references(value)

    @classmethod
    def update_refs_pattern(cls, value: str, refs: dict[str, Any]) -> str:
        """Replace reference patterns with resolved values.

        Args:
            value: String containing references
            refs: Dict of resolved references

        Returns:
            String with references replaced
        """
        value = normalize_id(value)

        try:
            return replace_references(value, refs, cls._vars)
        except KeyError as e:
            # Extract reference ID from error message
            # The error message format is: "Reference '@ref_id' not found in resolved references"
            ref_id = str(e).split("'")[1].lstrip("@")
            msg = f"can not find expected ID '{ref_id}' in the references."
            if not cls.allow_missing_reference:
                raise KeyError(msg) from e
            warnings.warn(msg, stacklevel=2)
            return value

    @classmethod
    def find_refs_in_config(cls, config: Any, id: str, refs: dict[str, int] | None = None) -> dict[str, int]:
        """Recursively find all references in config.

        Args:
            config: Config to search
            id: Current ID path
            refs: Accumulated references dict

        Returns:
            Dict of reference IDs to counts
        """
        refs_ = refs or {}

        # Check string values for reference patterns
        if isinstance(config, str):
            for ref_id, count in cls.match_refs_pattern(value=config).items():
                refs_[ref_id] = refs_.get(ref_id, 0) + count

        # Recursively search nested structures
        if isinstance(config, (list, dict)):
            for _, sub_id, v in cls.iter_subconfigs(id, config):
                # Instantiable and expression items are also dependencies
                if (Component.is_instantiable(v) or Expression.is_expression(v)) and sub_id not in refs_:
                    refs_[sub_id] = 1
                refs_ = cls.find_refs_in_config(v, sub_id, refs_)

        return refs_

    @classmethod
    def update_config_with_refs(cls, config: Any, id: str, refs: dict[str, Any] | None = None) -> Any:
        """Update config by replacing references with resolved values.

        Args:
            config: Config to update
            id: Current ID path
            refs: Dict of resolved references

        Returns:
            Config with references replaced
        """
        refs_ = refs or {}

        # Replace references in strings
        if isinstance(config, str):
            return cls.update_refs_pattern(config, refs_)

        # Return non-container types as-is
        if not isinstance(config, (list, dict)):
            return config

        # Recursively update nested structures
        ret = type(config)()
        for idx, sub_id, v in cls.iter_subconfigs(id, config):
            if Component.is_instantiable(v) or Expression.is_expression(v):
                updated = refs_[sub_id]
                # Skip disabled components
                if Component.is_instantiable(v) and updated is None:
                    continue
            else:
                updated = cls.update_config_with_refs(v, sub_id, refs_)

            if isinstance(ret, dict):
                ret[idx] = updated
            else:
                ret.append(updated)

        return ret

__init__(items=None)

Initialize resolver with optional items.

Parameters:

Name Type Description Default
items list[Item] | None

Optional list of Items to add during initialization

None
Source code in src/sparkwheel/resolver.py
def __init__(self, items: list[Item] | None = None):
    """Initialize resolver with optional items.

    Args:
        items: Optional list of Items to add during initialization
    """
    self._items: dict[str, Item] = {}
    self._resolved: dict[str, Any] = {}

    if items:
        for item in items:
            self.add_item(item)

_resolve_one_item(id, waiting_list=None, _depth=0, instantiate=True, eval_expr=True, default=None)

Internal recursive resolution implementation.

Parameters:

Name Type Description Default
id str

ID to resolve

required
waiting_list set[str] | None

Set of IDs currently being resolved (for cycle detection)

None
_depth int

Current recursion depth (for DoS prevention)

0
instantiate bool

Whether to instantiate components

True
eval_expr bool

Whether to evaluate expressions

True
default Any

Default value if not found

None

Returns:

Type Description
Any

Resolved value

Raises:

Type Description
RecursionError

If max depth exceeded

CircularReferenceError

If circular reference detected

ConfigKeyError

If reference not found

Source code in src/sparkwheel/resolver.py
def _resolve_one_item(
    self,
    id: str,
    waiting_list: set[str] | None = None,
    _depth: int = 0,
    instantiate: bool = True,
    eval_expr: bool = True,
    default: Any = None,
) -> Any:
    """Internal recursive resolution implementation.

    Args:
        id: ID to resolve
        waiting_list: Set of IDs currently being resolved (for cycle detection)
        _depth: Current recursion depth (for DoS prevention)
        instantiate: Whether to instantiate components
        eval_expr: Whether to evaluate expressions
        default: Default value if not found

    Returns:
        Resolved value

    Raises:
        RecursionError: If max depth exceeded
        CircularReferenceError: If circular reference detected
        ConfigKeyError: If reference not found
    """
    # Prevent stack overflow attacks
    if _depth >= self.max_resolution_depth:
        raise RecursionError(
            f"Maximum reference resolution depth ({self.max_resolution_depth}) exceeded while resolving '{id}'. "
            f"This may indicate an overly complex configuration or a potential DoS attack."
        )

    id = self.normalize_id(id)

    # Return cached result if available
    if id in self._resolved:
        return self._resolved[id]

    # Look up the item
    try:
        item = look_up_option(id, self._items, print_all_options=False, default=default or "no_default")
    except ValueError as err:
        # Provide helpful error with suggestions
        source_location = None
        for config_item in self._items.values():
            if hasattr(config_item, "source_location") and config_item.source_location:
                source_location = config_item.source_location
                break

        available_keys = list(self._items.keys())
        config_context = None

        # For nested IDs, try to get parent context to show available keys
        if ID_SEP_KEY in id:
            parent_id = ID_SEP_KEY.join(id.split(ID_SEP_KEY)[:-1])
            try:
                parent_item = self.get_item(parent_id)
                if parent_item and isinstance(parent_item.get_config(), dict):
                    config_context = parent_item.get_config()
            except (ValueError, KeyError):
                pass

        raise ConfigKeyError(
            f"Config ID '{id}' not found in the configuration",
            source_location=source_location,
            missing_key=id,
            available_keys=available_keys,
            config_context=config_context,
        ) from err

    # If default was returned, just return it
    if not isinstance(item, Item):
        return item

    item_config = item.get_config()

    # Initialize waiting list for circular reference detection
    if waiting_list is None:
        waiting_list = set()
    waiting_list.add(id)

    # First, resolve any import expressions (they need to run first)
    for t, v in self._items.items():
        if t not in self._resolved and isinstance(v, Expression) and v.is_import_statement(v.get_config()):
            self._resolved[t] = v.evaluate() if eval_expr else v

    # Find all references in this item's config
    refs = self.find_refs_in_config(config=item_config, id=id)

    # Resolve dependencies first
    for dep_id in refs.keys():
        # Check for circular references
        if dep_id in waiting_list:
            raise CircularReferenceError(
                f"Circular reference detected: '{dep_id}' references back to '{id}'",
                source_location=item.source_location if hasattr(item, "source_location") else None,
            )

        # Resolve dependency if not already resolved
        if dep_id not in self._resolved:
            try:
                look_up_option(dep_id, self._items, print_all_options=False)
            except ValueError as err:
                msg = f"the referring item `@{dep_id}` is not defined in the config content."
                if not self.allow_missing_reference:
                    available_keys = list(self._items.keys())
                    raise ConfigKeyError(
                        f"Reference '@{dep_id}' not found in configuration",
                        source_location=item.source_location if hasattr(item, "source_location") else None,
                        missing_key=dep_id,
                        available_keys=available_keys,
                    ) from err
                warnings.warn(msg, stacklevel=2)
                continue

            # Recursively resolve dependency
            self._resolve_one_item(
                id=dep_id,
                waiting_list=waiting_list,
                _depth=_depth + 1,
                instantiate=instantiate,
                eval_expr=eval_expr,
            )
            waiting_list.discard(dep_id)

    # All dependencies resolved, now resolve this item
    new_config = self.update_config_with_refs(config=item_config, id=id, refs=self._resolved)
    item.update_config(config=new_config)

    # Generate final resolved value based on item type
    if isinstance(item, Component):
        self._resolved[id] = item.instantiate() if instantiate else item
    elif isinstance(item, Expression):
        self._resolved[id] = item.evaluate(globals={f"{self._vars}": self._resolved}) if eval_expr else item
    else:
        self._resolved[id] = new_config

    return self._resolved[id]

add_item(item)

Add a Item to resolve.

Parameters:

Name Type Description Default
item Item

Item to add

required
Source code in src/sparkwheel/resolver.py
def add_item(self, item: Item) -> None:
    """Add a Item to resolve.

    Args:
        item: Item to add
    """
    id = item.get_id()
    if id in self._items:
        warnings.warn(
            f"Duplicate config item ID '{id}' detected. "
            f"The new item will be ignored and the existing item will be kept. "
            f"This may indicate a configuration error.",
            UserWarning,
            stacklevel=2,
        )
        return
    self._items[id] = item

add_items(items)

Add multiple Items at once.

Parameters:

Name Type Description Default
items list[Item]

List of Items to add

required
Source code in src/sparkwheel/resolver.py
def add_items(self, items: list[Item]) -> None:
    """Add multiple Items at once.

    Args:
        items: List of Items to add
    """
    for item in items:
        self.add_item(item)

find_refs_in_config(config, id, refs=None) classmethod

Recursively find all references in config.

Parameters:

Name Type Description Default
config Any

Config to search

required
id str

Current ID path

required
refs dict[str, int] | None

Accumulated references dict

None

Returns:

Type Description
dict[str, int]

Dict of reference IDs to counts

Source code in src/sparkwheel/resolver.py
@classmethod
def find_refs_in_config(cls, config: Any, id: str, refs: dict[str, int] | None = None) -> dict[str, int]:
    """Recursively find all references in config.

    Args:
        config: Config to search
        id: Current ID path
        refs: Accumulated references dict

    Returns:
        Dict of reference IDs to counts
    """
    refs_ = refs or {}

    # Check string values for reference patterns
    if isinstance(config, str):
        for ref_id, count in cls.match_refs_pattern(value=config).items():
            refs_[ref_id] = refs_.get(ref_id, 0) + count

    # Recursively search nested structures
    if isinstance(config, (list, dict)):
        for _, sub_id, v in cls.iter_subconfigs(id, config):
            # Instantiable and expression items are also dependencies
            if (Component.is_instantiable(v) or Expression.is_expression(v)) and sub_id not in refs_:
                refs_[sub_id] = 1
            refs_ = cls.find_refs_in_config(v, sub_id, refs_)

    return refs_

get_item(id, resolve=False, **kwargs)

Get Item by id, optionally resolved.

Parameters:

Name Type Description Default
id str

ID of the config item

required
resolve bool

Whether to resolve the item (default: False)

False
**kwargs Any

Additional arguments for resolution

{}

Returns:

Type Description
Item | None

Item if found, None otherwise (or resolved value if resolve=True)

Source code in src/sparkwheel/resolver.py
def get_item(self, id: str, resolve: bool = False, **kwargs: Any) -> Item | None:
    """Get Item by id, optionally resolved.

    Args:
        id: ID of the config item
        resolve: Whether to resolve the item (default: False)
        **kwargs: Additional arguments for resolution

    Returns:
        Item if found, None otherwise (or resolved value if resolve=True)
    """
    id = self.normalize_id(id)
    if resolve and id not in self._resolved:
        self._resolve_one_item(id=id, **kwargs)
    return self._items.get(id)

is_resolved()

Check if any items have been resolved.

Source code in src/sparkwheel/resolver.py
def is_resolved(self) -> bool:
    """Check if any items have been resolved."""
    return bool(self._resolved)

iter_subconfigs(id, config) classmethod

Iterate over sub-configs with IDs.

Parameters:

Name Type Description Default
id str

Current ID path

required
config Any

Config to iterate (dict or list)

required

Yields:

Type Description
tuple[str, str, Any]

Tuples of (key, sub_id, value)

Source code in src/sparkwheel/resolver.py
@classmethod
def iter_subconfigs(cls, id: str, config: Any) -> Iterator[tuple[str, str, Any]]:
    """Iterate over sub-configs with IDs.

    Args:
        id: Current ID path
        config: Config to iterate (dict or list)

    Yields:
        Tuples of (key, sub_id, value)
    """
    for k, v in config.items() if isinstance(config, dict) else enumerate(config):
        sub_id = f"{id}{cls.sep}{k}" if id != "" else f"{k}"
        yield k, sub_id, v  # type: ignore[misc]

match_refs_pattern(value) classmethod

Find reference patterns in a string.

Parameters:

Name Type Description Default
value str

String to search for references

required

Returns:

Type Description
dict[str, int]

Dict mapping reference IDs to occurrence counts

Source code in src/sparkwheel/resolver.py
@classmethod
def match_refs_pattern(cls, value: str) -> dict[str, int]:
    """Find reference patterns in a string.

    Args:
        value: String to search for references

    Returns:
        Dict mapping reference IDs to occurrence counts
    """
    value = normalize_id(value)
    return scan_references(value)

normalize_id(id) classmethod

Normalize ID to string format.

Parameters:

Name Type Description Default
id str | int

ID to normalize

required

Returns:

Type Description
str

String ID

Source code in src/sparkwheel/resolver.py
@classmethod
def normalize_id(cls, id: str | int) -> str:
    """Normalize ID to string format.

    Args:
        id: ID to normalize

    Returns:
        String ID
    """
    return str(id)

reset()

Clear all items and resolved content.

Source code in src/sparkwheel/resolver.py
def reset(self) -> None:
    """Clear all items and resolved content."""
    self._items = {}
    self._resolved = {}

resolve(id='', instantiate=True, eval_expr=True, default=None)

Resolve a config item and return the result.

Resolves all references, instantiates components (if requested), and evaluates expressions (if requested). Results are cached for efficiency.

Parameters:

Name Type Description Default
id str

ID of item to resolve (empty string for root)

''
instantiate bool

Whether to instantiate components with target

True
eval_expr bool

Whether to evaluate expressions starting with $

True
default Any

Default value if id not found

None

Returns:

Type Description
Any

Resolved value (instantiated object, evaluated result, or raw value)

Raises:

Type Description
ConfigKeyError

If id not found and no default provided

CircularReferenceError

If circular reference detected

Source code in src/sparkwheel/resolver.py
def resolve(
    self,
    id: str = "",
    instantiate: bool = True,
    eval_expr: bool = True,
    default: Any = None,
) -> Any:
    """Resolve a config item and return the result.

    Resolves all references, instantiates components (if requested), and
    evaluates expressions (if requested). Results are cached for efficiency.

    Args:
        id: ID of item to resolve (empty string for root)
        instantiate: Whether to instantiate components with _target_
        eval_expr: Whether to evaluate expressions starting with $
        default: Default value if id not found

    Returns:
        Resolved value (instantiated object, evaluated result, or raw value)

    Raises:
        ConfigKeyError: If id not found and no default provided
        CircularReferenceError: If circular reference detected
    """
    return self._resolve_one_item(id=id, instantiate=instantiate, eval_expr=eval_expr, default=default)

split_id(id, last=False) classmethod

Split ID string by separator.

Parameters:

Name Type Description Default
id str | int

ID to split

required
last bool

If True, only split rightmost part

False

Returns:

Type Description
list[str]

List of ID components

Source code in src/sparkwheel/resolver.py
@classmethod
def split_id(cls, id: str | int, last: bool = False) -> list[str]:
    """Split ID string by separator.

    Args:
        id: ID to split
        last: If True, only split rightmost part

    Returns:
        List of ID components
    """
    if not last:
        return cls.normalize_id(id).split(cls.sep)
    res = cls.normalize_id(id).rsplit(cls.sep, 1)
    return ["".join(res[:-1]), res[-1]]

update_config_with_refs(config, id, refs=None) classmethod

Update config by replacing references with resolved values.

Parameters:

Name Type Description Default
config Any

Config to update

required
id str

Current ID path

required
refs dict[str, Any] | None

Dict of resolved references

None

Returns:

Type Description
Any

Config with references replaced

Source code in src/sparkwheel/resolver.py
@classmethod
def update_config_with_refs(cls, config: Any, id: str, refs: dict[str, Any] | None = None) -> Any:
    """Update config by replacing references with resolved values.

    Args:
        config: Config to update
        id: Current ID path
        refs: Dict of resolved references

    Returns:
        Config with references replaced
    """
    refs_ = refs or {}

    # Replace references in strings
    if isinstance(config, str):
        return cls.update_refs_pattern(config, refs_)

    # Return non-container types as-is
    if not isinstance(config, (list, dict)):
        return config

    # Recursively update nested structures
    ret = type(config)()
    for idx, sub_id, v in cls.iter_subconfigs(id, config):
        if Component.is_instantiable(v) or Expression.is_expression(v):
            updated = refs_[sub_id]
            # Skip disabled components
            if Component.is_instantiable(v) and updated is None:
                continue
        else:
            updated = cls.update_config_with_refs(v, sub_id, refs_)

        if isinstance(ret, dict):
            ret[idx] = updated
        else:
            ret.append(updated)

    return ret

update_refs_pattern(value, refs) classmethod

Replace reference patterns with resolved values.

Parameters:

Name Type Description Default
value str

String containing references

required
refs dict[str, Any]

Dict of resolved references

required

Returns:

Type Description
str

String with references replaced

Source code in src/sparkwheel/resolver.py
@classmethod
def update_refs_pattern(cls, value: str, refs: dict[str, Any]) -> str:
    """Replace reference patterns with resolved values.

    Args:
        value: String containing references
        refs: Dict of resolved references

    Returns:
        String with references replaced
    """
    value = normalize_id(value)

    try:
        return replace_references(value, refs, cls._vars)
    except KeyError as e:
        # Extract reference ID from error message
        # The error message format is: "Reference '@ref_id' not found in resolved references"
        ref_id = str(e).split("'")[1].lstrip("@")
        msg = f"can not find expected ID '{ref_id}' in the references."
        if not cls.allow_missing_reference:
            raise KeyError(msg) from e
        warnings.warn(msg, stacklevel=2)
        return value