1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
"""Define a controller class for operating on a `config_ninja.settings.ObjectSpec`."""
from __future__ import annotations
import logging
import typing
import jinja2
from config_ninja import settings, systemd
from config_ninja.backend import FormatT, dumps, loads
from config_ninja.settings import ObjectSpec
try:
from typing import TypeAlias # type: ignore[attr-defined,unused-ignore]
except ImportError: # pragma: no cover
from typing_extensions import TypeAlias # type: ignore[assignment,attr-defined,unused-ignore]
__all__ = ['BackendController', 'ErrorHandler']
logger = logging.getLogger(__name__)
ActionType: TypeAlias = typing.Callable[[str], typing.Any]
ErrorHandler: TypeAlias = typing.Callable[[typing.Dict[typing.Any, typing.Any]], typing.ContextManager[None]]
class BackendController:
"""Define logic for operating a configuration `ObjectSpec`."""
key: str
"""The key of the configuration object in the settings file."""
logger: logging.Logger
"""Each `BackendController` has its own logger (named `"config_ninja.controller:{key}"`)."""
spec: ObjectSpec
"""Full specification for the configuration object."""
def __init__(self, spec: ObjectSpec, key: str) -> None:
"""Ensure the parent directory of the destination path exists; initialize a logger."""
self.logger = logging.getLogger(f'{__name__}:{key}')
self.key = key
self.spec = spec
spec.dest.path.parent.mkdir(parents=True, exist_ok=True)
def __str__(self) -> str:
"""Represent the controller as its backend populating its destination."""
return f'{self.spec.source.backend} ({self.spec.source.format}) -> {self.spec.dest}'
def _do(self, action: ActionType, data: dict[str, typing.Any]) -> None:
if self.spec.dest.is_template:
assert isinstance(self.spec.dest.format, jinja2.Template) # noqa: S101 # 👈 for static analysis
action(self.spec.dest.format.render(data))
else:
fmt: FormatT = self.spec.dest.format # type: ignore[assignment]
action(dumps(fmt, data))
@classmethod
def from_settings(
cls, conf_settings: settings.Config, key: str, handle_key_errors: ErrorHandler
) -> BackendController:
"""Create a `BackendController` instance from the given settings."""
cfg_obj = conf_settings.settings.OBJECTS[key]
with handle_key_errors(cfg_obj): # type: ignore[arg-type]
spec = ObjectSpec.from_primitives(cfg_obj, conf_settings.engine)
return cls(spec, key)
def get(self, do_print: typing.Callable[[str], typing.Any]) -> None:
"""Retrieve and print the value of the configuration object.
Any `config_ninja.settings.poe.Hook`s will be skipped; they are only executed by the `BackendController.write()`
and `BackendController.awrite()` methods.
"""
data = loads(self.spec.source.format, self.spec.source.backend.get())
self._do(do_print, data)
for hook in self.spec.hooks:
self.logger.debug('would execute: %s', hook)
async def aget(self, do_print: typing.Callable[[str], typing.Any]) -> None:
"""Poll to retrieve the latest configuration object, and print on each update.
Any `config_ninja.settings.poe.Hook`s will be skipped; they are only executed by the `BackendController.write()`
and `BackendController.awrite()` methods.
"""
if systemd.AVAILABLE: # pragma: no cover
systemd.notify()
async for content in self.spec.source.backend.poll():
data = loads(self.spec.source.format, content)
self._do(do_print, data)
for hook in self.spec.hooks:
self.logger.debug('would execute: %s', hook)
def write(self) -> None:
"""Retrieve the latest value of the configuration object, and write to file.
If the `config_ninja.settings.ObjectSpec` provides any `config_ninja.settings.poe.Hook`s, they will be executed
after writing the configuration object to the destination file.
"""
data = loads(self.spec.source.format, self.spec.source.backend.get())
self._do(self.spec.dest.path.write_text, data)
for hook in self.spec.hooks:
hook()
async def awrite(self) -> None:
"""Poll to retrieve the latest configuration object, and write to file on each update.
If the `config_ninja.settings.ObjectSpec` provides any `config_ninja.settings.poe.Hook`s, they will be executed
after writing the configuration object to the destination file.
"""
if systemd.AVAILABLE: # pragma: no cover
systemd.notify()
async for content in self.spec.source.backend.poll():
data = loads(self.spec.source.format, content)
self._do(self.spec.dest.path.write_text, data)
for hook in self.spec.hooks:
hook()
logger.debug('successfully imported %s', __name__)
|