jinja2.async_utils

 1import inspect
 2import typing as t
 3from functools import WRAPPER_ASSIGNMENTS
 4from functools import wraps
 5
 6from .utils import _PassArg
 7from .utils import pass_eval_context
 8
 9if t.TYPE_CHECKING:
10    import typing_extensions as te
11
12V = t.TypeVar("V")
13
14
15def async_variant(normal_func):  # type: ignore
16    def decorator(async_func):  # type: ignore
17        pass_arg = _PassArg.from_obj(normal_func)
18        need_eval_context = pass_arg is None
19
20        if pass_arg is _PassArg.environment:
21
22            def is_async(args: t.Any) -> bool:
23                return t.cast(bool, args[0].is_async)
24
25        else:
26
27            def is_async(args: t.Any) -> bool:
28                return t.cast(bool, args[0].environment.is_async)
29
30        # Take the doc and annotations from the sync function, but the
31        # name from the async function. Pallets-Sphinx-Themes
32        # build_function_directive expects __wrapped__ to point to the
33        # sync function.
34        async_func_attrs = ("__module__", "__name__", "__qualname__")
35        normal_func_attrs = tuple(set(WRAPPER_ASSIGNMENTS).difference(async_func_attrs))
36
37        @wraps(normal_func, assigned=normal_func_attrs)
38        @wraps(async_func, assigned=async_func_attrs, updated=())
39        def wrapper(*args, **kwargs):  # type: ignore
40            b = is_async(args)
41
42            if need_eval_context:
43                args = args[1:]
44
45            if b:
46                return async_func(*args, **kwargs)
47
48            return normal_func(*args, **kwargs)
49
50        if need_eval_context:
51            wrapper = pass_eval_context(wrapper)
52
53        wrapper.jinja_async_variant = True  # type: ignore[attr-defined]
54        return wrapper
55
56    return decorator
57
58
59_common_primitives = {int, float, bool, str, list, dict, tuple, type(None)}
60
61
62async def auto_await(value: t.Union[t.Awaitable["V"], "V"]) -> "V":
63    # Avoid a costly call to isawaitable
64    if type(value) in _common_primitives:
65        return t.cast("V", value)
66
67    if inspect.isawaitable(value):
68        return await t.cast("t.Awaitable[V]", value)
69
70    return value
71
72
73class _IteratorToAsyncIterator(t.Generic[V]):
74    def __init__(self, iterator: "t.Iterator[V]"):
75        self._iterator = iterator
76
77    def __aiter__(self) -> "te.Self":
78        return self
79
80    async def __anext__(self) -> V:
81        try:
82            return next(self._iterator)
83        except StopIteration as e:
84            raise StopAsyncIteration(e.value) from e
85
86
87def auto_aiter(
88    iterable: "t.Union[t.AsyncIterable[V], t.Iterable[V]]",
89) -> "t.AsyncIterator[V]":
90    if hasattr(iterable, "__aiter__"):
91        return iterable.__aiter__()
92    else:
93        return _IteratorToAsyncIterator(iter(iterable))
94
95
96async def auto_to_list(
97    value: "t.Union[t.AsyncIterable[V], t.Iterable[V]]",
98) -> t.List["V"]:
99    return [x async for x in auto_aiter(value)]
def async_variant(normal_func):
16def async_variant(normal_func):  # type: ignore
17    def decorator(async_func):  # type: ignore
18        pass_arg = _PassArg.from_obj(normal_func)
19        need_eval_context = pass_arg is None
20
21        if pass_arg is _PassArg.environment:
22
23            def is_async(args: t.Any) -> bool:
24                return t.cast(bool, args[0].is_async)
25
26        else:
27
28            def is_async(args: t.Any) -> bool:
29                return t.cast(bool, args[0].environment.is_async)
30
31        # Take the doc and annotations from the sync function, but the
32        # name from the async function. Pallets-Sphinx-Themes
33        # build_function_directive expects __wrapped__ to point to the
34        # sync function.
35        async_func_attrs = ("__module__", "__name__", "__qualname__")
36        normal_func_attrs = tuple(set(WRAPPER_ASSIGNMENTS).difference(async_func_attrs))
37
38        @wraps(normal_func, assigned=normal_func_attrs)
39        @wraps(async_func, assigned=async_func_attrs, updated=())
40        def wrapper(*args, **kwargs):  # type: ignore
41            b = is_async(args)
42
43            if need_eval_context:
44                args = args[1:]
45
46            if b:
47                return async_func(*args, **kwargs)
48
49            return normal_func(*args, **kwargs)
50
51        if need_eval_context:
52            wrapper = pass_eval_context(wrapper)
53
54        wrapper.jinja_async_variant = True  # type: ignore[attr-defined]
55        return wrapper
56
57    return decorator
async def auto_await(value: Union[Awaitable[~V], ~V]) -> ~V:
63async def auto_await(value: t.Union[t.Awaitable["V"], "V"]) -> "V":
64    # Avoid a costly call to isawaitable
65    if type(value) in _common_primitives:
66        return t.cast("V", value)
67
68    if inspect.isawaitable(value):
69        return await t.cast("t.Awaitable[V]", value)
70
71    return value
def auto_aiter(iterable: Union[AsyncIterable[~V], Iterable[~V]]) -> AsyncIterator[~V]:
88def auto_aiter(
89    iterable: "t.Union[t.AsyncIterable[V], t.Iterable[V]]",
90) -> "t.AsyncIterator[V]":
91    if hasattr(iterable, "__aiter__"):
92        return iterable.__aiter__()
93    else:
94        return _IteratorToAsyncIterator(iter(iterable))
async def auto_to_list(value: Union[AsyncIterable[~V], Iterable[~V]]) -> List[~V]:
 97async def auto_to_list(
 98    value: "t.Union[t.AsyncIterable[V], t.Iterable[V]]",
 99) -> t.List["V"]:
100    return [x async for x in auto_aiter(value)]