poethepoet.task.base

  1import re
  2import sys
  3from os import environ
  4from pathlib import Path
  5from typing import (
  6    TYPE_CHECKING,
  7    Any,
  8    ClassVar,
  9    Dict,
 10    Iterator,
 11    List,
 12    Mapping,
 13    NamedTuple,
 14    Optional,
 15    Sequence,
 16    Tuple,
 17    Type,
 18    Union,
 19)
 20
 21from ..exceptions import ConfigValidationError, PoeException
 22from ..options import PoeOptions
 23
 24if TYPE_CHECKING:
 25    from ..config import ConfigPartition, PoeConfig
 26    from ..context import RunContext
 27    from ..env.manager import EnvVarsManager
 28    from ..ui import PoeUi
 29    from .args import PoeTaskArgs
 30
 31
 32_TASK_NAME_PATTERN = re.compile(r"^\w[\w\d\-\_\+\:]*$")
 33
 34
 35class MetaPoeTask(type):
 36    """
 37    This metaclass makes all descendents of PoeTask (task types) register themselves on
 38    declaration and validates that they include the expected class attributes.
 39    """
 40
 41    def __init__(cls, *args):
 42        super().__init__(*args)
 43        if cls.__name__ == "PoeTask":
 44            return
 45
 46        assert isinstance(getattr(cls, "__key__", None), str)
 47        assert issubclass(getattr(cls, "TaskOptions", None), PoeOptions)
 48        PoeTask._PoeTask__task_types[cls.__key__] = cls
 49
 50        # Give each TaskSpec a reference to its parent PoeTask
 51        if "TaskSpec" in cls.__dict__:
 52            cls.TaskSpec.task_type = cls
 53
 54
 55TaskContent = Union[str, Sequence[Union[str, Mapping[str, Any]]]]
 56
 57TaskDef = Union[str, Mapping[str, Any], Sequence[Union[str, Mapping[str, Any]]]]
 58
 59
 60class TaskSpecFactory:
 61    __cache: Dict[str, "PoeTask.TaskSpec"]
 62    config: "PoeConfig"
 63
 64    def __init__(self, config: "PoeConfig"):
 65        self.__cache = {}
 66        self.config = config
 67
 68    def __contains__(self, other) -> bool:
 69        return other in self.__cache
 70
 71    def get(
 72        self,
 73        task_name: str,
 74        task_def: Optional[TaskDef] = None,
 75        task_type: Optional[str] = None,
 76        parent: Optional["PoeTask.TaskSpec"] = None,
 77    ) -> "PoeTask.TaskSpec":
 78        if task_def and parent:
 79            # This is probably a subtask and will be cached by the parent task_spec
 80            if not task_type:
 81                task_type = PoeTask.resolve_task_type(task_def, self.config)
 82                assert task_type
 83            return self.create(
 84                task_name, task_type, task_def, source=parent.source, parent=parent
 85            )
 86
 87        if task_name not in self.__cache:
 88            self.load(task_name)
 89
 90        return self.__cache[task_name]
 91
 92    def create(
 93        self,
 94        task_name: str,
 95        task_type: str,
 96        task_def: TaskDef,
 97        source: "ConfigPartition",
 98        parent: Optional["PoeTask.TaskSpec"] = None,
 99    ) -> "PoeTask.TaskSpec":
100        """
101        A parent task should be provided when this task is defined inline within another
102        task, for example as part of a sequence.
103        """
104        if not isinstance(task_def, dict):
105            task_def = {task_type: task_def}
106
107        return PoeTask.lookup_task_spec_cls(task_type)(
108            name=task_name,
109            task_def=task_def,
110            factory=self,
111            source=source,
112            parent=parent,
113        )
114
115    def load_all(self):
116        for task_name in self.config.task_names:
117            self.load(task_name)
118
119        return self
120
121    def load(self, task_name: str):
122        task_def, config_partition = self.config.lookup_task(task_name)
123
124        if task_def is None or config_partition is None:
125            raise PoeException(f"Cannot instantiate unknown task {task_name!r}")
126
127        task_type = PoeTask.resolve_task_type(task_def, self.config)
128        if not task_type:
129            raise ConfigValidationError(
130                "Task definition must be a string, a list, or a table including exactly"
131                " one task key\n"
132                f"Available task keys: {set(PoeTask.get_task_types())!r}",
133                task_name=task_name,
134                filename=(
135                    None if config_partition.is_primary else str(config_partition.path)
136                ),
137            )
138
139        self.__cache[task_name] = self.create(
140            task_name, task_type, task_def, source=config_partition
141        )
142
143    def __iter__(self):
144        return iter(self.__cache.values())
145
146
147class TaskContext(NamedTuple):
148    """
149    Collection of contextual config inherited from a parent task to a child task
150    """
151
152    config: "PoeConfig"
153    cwd: str
154    ui: "PoeUi"
155    specs: "TaskSpecFactory"
156
157    @classmethod
158    def from_task(cls, parent_task: "PoeTask"):
159        return cls(
160            config=parent_task.ctx.config,
161            cwd=str(parent_task.spec.options.get("cwd", parent_task.ctx.cwd)),
162            specs=parent_task.ctx.specs,
163            ui=parent_task.ctx.ui,
164        )
165
166
167class PoeTask(metaclass=MetaPoeTask):
168    __key__: ClassVar[str]
169    __content_type__: ClassVar[Type] = str
170
171    class TaskOptions(PoeOptions):
172        args: Optional[Union[dict, list]] = None
173        capture_stdout: Optional[str] = None
174        cwd: Optional[str] = None
175        deps: Optional[Sequence[str]] = None
176        env: Optional[dict] = None
177        envfile: Optional[Union[str, list]] = None
178        executor: Optional[dict] = None
179        help: Optional[str] = None
180        uses: Optional[dict] = None
181
182        def validate(self):
183            """
184            Validation rules that don't require any extra context go here.
185            """
186
187    class TaskSpec:
188        name: str
189        content: TaskContent
190        options: "PoeTask.TaskOptions"
191        task_type: Type["PoeTask"]
192        source: "ConfigPartition"
193        parent: Optional["PoeTask.TaskSpec"] = None
194
195        _args: Optional["PoeTaskArgs"] = None
196
197        def __init__(
198            self,
199            name: str,
200            task_def: Dict[str, Any],
201            factory: TaskSpecFactory,
202            source: "ConfigPartition",
203            parent: Optional["PoeTask.TaskSpec"] = None,
204        ):
205            self.name = name
206            self.content = task_def[self.task_type.__key__]
207            self.options = self._parse_options(task_def)
208            self.source = source
209            self.parent = parent
210
211        def _parse_options(self, task_def: Dict[str, Any]):
212            try:
213                return next(
214                    self.task_type.TaskOptions.parse(
215                        task_def, extra_keys=(self.task_type.__key__,)
216                    )
217                )
218            except ConfigValidationError as error:
219                error.task_name = self.name
220                raise
221
222        def get_task_env(
223            self,
224            parent_env: "EnvVarsManager",
225            uses_values: Optional[Mapping[str, str]] = None,
226        ) -> "EnvVarsManager":
227            """
228            Resolve the EnvVarsManager for this task, relative to the given parent_env
229            """
230            task_envfile = self.options.get("envfile")
231            task_env = self.options.get("env")
232
233            result = parent_env.clone()
234
235            # Include env vars from outputs of upstream dependencies
236            if uses_values:
237                result.update(uses_values)
238
239            result.set("POE_CONF_DIR", str(self.source.config_dir))
240            result.apply_env_config(
241                task_envfile,
242                task_env,
243                config_dir=self.source.config_dir,
244                config_working_dir=self.source.cwd,
245            )
246
247            return result
248
249        @property
250        def args(self) -> Optional["PoeTaskArgs"]:
251            from .args import PoeTaskArgs
252
253            if not self._args and self.options.args:
254                self._args = PoeTaskArgs(self.options.args, self.name)
255
256            return self._args
257
258        def create_task(
259            self,
260            invocation: Tuple[str, ...],
261            ctx: TaskContext,
262            capture_stdout: Union[str, bool] = False,
263        ) -> "PoeTask":
264            return self.task_type(
265                spec=self,
266                invocation=invocation,
267                capture_stdout=capture_stdout,
268                ctx=ctx,
269            )
270
271        def validate(self, config: "PoeConfig", task_specs: TaskSpecFactory):
272            try:
273                self._base_validations(config, task_specs)
274                self._task_validations(config, task_specs)
275            except ConfigValidationError as error:
276                error.task_name = self.name
277                raise
278
279        def _base_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory):
280            """
281            Perform validations on this TaskSpec that apply to all task types
282            """
283            if not (self.name[0].isalpha() or self.name[0] == "_"):
284                raise ConfigValidationError(
285                    "Task names must start with a letter or underscore."
286                )
287
288            if not self.parent and not _TASK_NAME_PATTERN.match(self.name):
289                raise ConfigValidationError(
290                    "Task names characters must be alphanumeric, colon, underscore or "
291                    "dash."
292                )
293
294            if not isinstance(self.content, self.task_type.__content_type__):
295                raise ConfigValidationError(
296                    f"Content for {self.task_type.__name__} must be a "
297                    f"{self.task_type.__content_type__.__name__}"
298                )
299
300            if self.options.deps:
301                for dep in self.options.deps:
302                    dep_task_name = dep.split(" ", 1)[0]
303                    if dep_task_name not in task_specs:
304                        raise ConfigValidationError(
305                            "'deps' option includes reference to unknown task: "
306                            f"{dep_task_name!r}"
307                        )
308
309                    if task_specs.get(dep_task_name).options.get("use_exec", False):
310                        raise ConfigValidationError(
311                            f"'deps' option includes reference to task with "
312                            f"'use_exec' set to true: {dep_task_name!r}"
313                        )
314
315            if self.options.uses:
316                from ..helpers import is_valid_env_var
317
318                for key, dep in self.options.uses.items():
319                    if not is_valid_env_var(key):
320                        raise ConfigValidationError(
321                            f"'uses' option includes invalid key: {key!r}"
322                        )
323
324                    dep_task_name = dep.split(" ", 1)[0]
325                    if dep_task_name not in task_specs:
326                        raise ConfigValidationError(
327                            "'uses' options includes reference to unknown task: "
328                            f"{dep_task_name!r}"
329                        )
330
331                    referenced_task = task_specs.get(dep_task_name)
332                    if referenced_task.options.get("use_exec", False):
333                        raise ConfigValidationError(
334                            f"'uses' option references task with 'use_exec' set to "
335                            f"true: {dep_task_name!r}"
336                        )
337                    if referenced_task.options.get("capture_stdout"):
338                        raise ConfigValidationError(
339                            f"'uses' option references task with 'capture_stdout' "
340                            f"option set: {dep_task_name!r}"
341                        )
342
343        def _task_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory):
344            """
345            Perform validations on this TaskSpec that apply to a specific task type
346            """
347
348    spec: TaskSpec
349    ctx: TaskContext
350    _parsed_args: Optional[Tuple[Dict[str, str], Tuple[str, ...]]] = None
351
352    __task_types: ClassVar[Dict[str, Type["PoeTask"]]] = {}
353    __upstream_invocations: Optional[
354        Dict[str, Union[List[Tuple[str, ...]], Dict[str, Tuple[str, ...]]]]
355    ] = None
356
357    def __init__(
358        self,
359        spec: TaskSpec,
360        invocation: Tuple[str, ...],
361        ctx: TaskContext,
362        capture_stdout: Union[str, bool] = False,
363    ):
364        self.spec = spec
365        self.invocation = invocation
366        self.ctx = ctx
367        self.capture_stdout = spec.options.capture_stdout or capture_stdout
368        self._is_windows = sys.platform == "win32"
369
370    @property
371    def name(self):
372        return self.spec.name
373
374    @classmethod
375    def lookup_task_spec_cls(cls, task_key: str) -> Type[TaskSpec]:
376        return cls.__task_types[task_key].TaskSpec
377
378    @classmethod
379    def resolve_task_type(
380        cls,
381        task_def: TaskDef,
382        config: "PoeConfig",
383        array_item: Union[bool, str] = False,
384    ) -> Optional[str]:
385        if isinstance(task_def, str):
386            if array_item:
387                return (
388                    array_item
389                    if isinstance(array_item, str)
390                    else config.default_array_item_task_type
391                )
392            else:
393                return config.default_task_type
394
395        elif isinstance(task_def, dict):
396            task_type_keys = set(task_def.keys()).intersection(cls.__task_types)
397            if len(task_type_keys) == 1:
398                return next(iter(task_type_keys))
399
400        elif isinstance(task_def, list):
401            return config.default_array_task_type
402
403        return None
404
405    def _parse_named_args(
406        self, extra_args: Sequence[str], env: "EnvVarsManager"
407    ) -> Optional[Dict[str, str]]:
408        if task_args := self.spec.args:
409            return task_args.parse(extra_args, env, self.ctx.ui.program_name)
410
411        return None
412
413    def get_parsed_arguments(
414        self, env: "EnvVarsManager"
415    ) -> Tuple[Dict[str, str], Tuple[str, ...]]:
416        if self._parsed_args is None:
417            all_args = self.invocation[1:]
418
419            if task_args := self.spec.args:
420                try:
421                    split_index = all_args.index("--")
422                    option_args = all_args[:split_index]
423                    extra_args = all_args[split_index + 1 :]
424                except ValueError:
425                    option_args = all_args
426                    extra_args = tuple()
427
428                self._parsed_args = (
429                    task_args.parse(option_args, env, self.ctx.ui.program_name),
430                    extra_args,
431                )
432
433            else:
434                self._parsed_args = ({}, all_args)
435
436        return self._parsed_args
437
438    def run(
439        self,
440        context: "RunContext",
441        parent_env: Optional["EnvVarsManager"] = None,
442    ) -> int:
443        """
444        Run this task
445        """
446
447        if environ.get("POE_DEBUG"):
448            task_type_key = self.__key__  # type: ignore[attr-defined]
449            print(f" * Running     {task_type_key}:{self.name}")
450            print(f" . Invocation  {self.invocation!r}")
451
452        upstream_invocations = self._get_upstream_invocations(context)
453
454        if context.dry and upstream_invocations.get("uses", {}):
455            self._print_action(
456                (
457                    "unresolved dependency task results via uses option for task "
458                    f"{self.name!r}"
459                ),
460                dry=True,
461                unresolved=True,
462            )
463            return 0
464
465        task_env = self.spec.get_task_env(
466            parent_env or context.env,
467            context._get_dep_values(upstream_invocations["uses"]),
468        )
469
470        if environ.get("POE_DEBUG"):
471            named_arg_values, extra_args = self.get_parsed_arguments(task_env)
472            print(f" . Parsed args {named_arg_values!r}")
473            print(f" . Extra args  {extra_args!r}")
474
475        return self._handle_run(context, task_env)
476
477    def _handle_run(
478        self,
479        context: "RunContext",
480        env: "EnvVarsManager",
481    ) -> int:
482        """
483        This method must be implemented by a subclass and return a single executor
484        result.
485        """
486        raise NotImplementedError
487
488    def _get_executor(
489        self,
490        context: "RunContext",
491        env: "EnvVarsManager",
492        *,
493        delegate_dry_run: bool = False,
494    ):
495        return context.get_executor(
496            self.invocation,
497            env,
498            working_dir=self.get_working_dir(env),
499            executor_config=self.spec.options.get("executor"),
500            capture_stdout=self.capture_stdout,
501            delegate_dry_run=delegate_dry_run,
502        )
503
504    def get_working_dir(
505        self,
506        env: "EnvVarsManager",
507    ) -> Path:
508        cwd_option = env.fill_template(self.spec.options.get("cwd", self.ctx.cwd))
509        working_dir = Path(cwd_option)
510
511        if not working_dir.is_absolute():
512            working_dir = self.ctx.config.project_dir / working_dir
513
514        return working_dir
515
516    def iter_upstream_tasks(
517        self, context: "RunContext"
518    ) -> Iterator[Tuple[str, "PoeTask"]]:
519        invocations = self._get_upstream_invocations(context)
520        for invocation in invocations["deps"]:
521            yield ("", self._instantiate_dep(invocation, capture_stdout=False))
522        for key, invocation in invocations["uses"].items():
523            yield (key, self._instantiate_dep(invocation, capture_stdout=True))
524
525    def _get_upstream_invocations(self, context: "RunContext"):
526        """
527        NB. this memoization assumes the context (and contained env vars) will be the
528        same in all instances for the lifetime of this object. Whilst this should be OK
529        for all current usecases is it strictly speaking something that this object
530        should not know enough to safely assume. So we probably want to revisit this.
531        """
532        import shlex
533
534        options = self.spec.options
535
536        if self.__upstream_invocations is None:
537            env = self.spec.get_task_env(context.env)
538            env.update(self.get_parsed_arguments(env)[0])
539
540            self.__upstream_invocations = {
541                "deps": [
542                    tuple(shlex.split(env.fill_template(task_ref)))
543                    for task_ref in options.get("deps", tuple())
544                ],
545                "uses": {
546                    key: tuple(shlex.split(env.fill_template(task_ref)))
547                    for key, task_ref in options.get("uses", {}).items()
548                },
549            }
550
551        return self.__upstream_invocations
552
553    def _instantiate_dep(
554        self, invocation: Tuple[str, ...], capture_stdout: bool
555    ) -> "PoeTask":
556        return self.ctx.specs.get(invocation[0]).create_task(
557            invocation=invocation,
558            ctx=TaskContext(
559                config=self.ctx.config,
560                cwd=str(self.ctx.config.project_dir),
561                specs=self.ctx.specs,
562                ui=self.ctx.ui,
563            ),
564            capture_stdout=capture_stdout,
565        )
566
567    def has_deps(self) -> bool:
568        return bool(
569            self.spec.options.get("deps", False) or self.spec.options.get("uses", False)
570        )
571
572    @classmethod
573    def is_task_type(
574        cls, task_def_key: str, content_type: Optional[Type] = None
575    ) -> bool:
576        """
577        Checks whether the given key identifies a known task type.
578        Optionally also check whether the given content_type matches the type of content
579        for this tasks type.
580        """
581        return task_def_key in cls.__task_types and (
582            content_type is None
583            or cls.__task_types[task_def_key].__content_type__ is content_type
584        )
585
586    @classmethod
587    def get_task_types(cls, content_type: Optional[Type] = None) -> Tuple[str, ...]:
588        if content_type:
589            return tuple(
590                task_type
591                for task_type, task_cls in cls.__task_types.items()
592                if task_cls.__content_type__ is content_type
593            )
594        return tuple(task_type for task_type in cls.__task_types.keys())
595
596    def _print_action(self, action: str, dry: bool, unresolved: bool = False):
597        """
598        Print the action taken by a task just before executing it.
599        """
600        min_verbosity = -1 if dry else 0
601        arrow = "??" if unresolved else "<=" if self.capture_stdout else "=>"
602        self.ctx.ui.print_msg(
603            f"<hl>Poe {arrow}</hl> <action>{action}</action>", min_verbosity
604        )
605
606    class Error(Exception):
607        pass
class MetaPoeTask(builtins.type):
36class MetaPoeTask(type):
37    """
38    This metaclass makes all descendents of PoeTask (task types) register themselves on
39    declaration and validates that they include the expected class attributes.
40    """
41
42    def __init__(cls, *args):
43        super().__init__(*args)
44        if cls.__name__ == "PoeTask":
45            return
46
47        assert isinstance(getattr(cls, "__key__", None), str)
48        assert issubclass(getattr(cls, "TaskOptions", None), PoeOptions)
49        PoeTask._PoeTask__task_types[cls.__key__] = cls
50
51        # Give each TaskSpec a reference to its parent PoeTask
52        if "TaskSpec" in cls.__dict__:
53            cls.TaskSpec.task_type = cls

This metaclass makes all descendents of PoeTask (task types) register themselves on declaration and validates that they include the expected class attributes.

MetaPoeTask(*args)
42    def __init__(cls, *args):
43        super().__init__(*args)
44        if cls.__name__ == "PoeTask":
45            return
46
47        assert isinstance(getattr(cls, "__key__", None), str)
48        assert issubclass(getattr(cls, "TaskOptions", None), PoeOptions)
49        PoeTask._PoeTask__task_types[cls.__key__] = cls
50
51        # Give each TaskSpec a reference to its parent PoeTask
52        if "TaskSpec" in cls.__dict__:
53            cls.TaskSpec.task_type = cls
Inherited Members
builtins.type
mro
class TaskSpecFactory:
 61class TaskSpecFactory:
 62    __cache: Dict[str, "PoeTask.TaskSpec"]
 63    config: "PoeConfig"
 64
 65    def __init__(self, config: "PoeConfig"):
 66        self.__cache = {}
 67        self.config = config
 68
 69    def __contains__(self, other) -> bool:
 70        return other in self.__cache
 71
 72    def get(
 73        self,
 74        task_name: str,
 75        task_def: Optional[TaskDef] = None,
 76        task_type: Optional[str] = None,
 77        parent: Optional["PoeTask.TaskSpec"] = None,
 78    ) -> "PoeTask.TaskSpec":
 79        if task_def and parent:
 80            # This is probably a subtask and will be cached by the parent task_spec
 81            if not task_type:
 82                task_type = PoeTask.resolve_task_type(task_def, self.config)
 83                assert task_type
 84            return self.create(
 85                task_name, task_type, task_def, source=parent.source, parent=parent
 86            )
 87
 88        if task_name not in self.__cache:
 89            self.load(task_name)
 90
 91        return self.__cache[task_name]
 92
 93    def create(
 94        self,
 95        task_name: str,
 96        task_type: str,
 97        task_def: TaskDef,
 98        source: "ConfigPartition",
 99        parent: Optional["PoeTask.TaskSpec"] = None,
100    ) -> "PoeTask.TaskSpec":
101        """
102        A parent task should be provided when this task is defined inline within another
103        task, for example as part of a sequence.
104        """
105        if not isinstance(task_def, dict):
106            task_def = {task_type: task_def}
107
108        return PoeTask.lookup_task_spec_cls(task_type)(
109            name=task_name,
110            task_def=task_def,
111            factory=self,
112            source=source,
113            parent=parent,
114        )
115
116    def load_all(self):
117        for task_name in self.config.task_names:
118            self.load(task_name)
119
120        return self
121
122    def load(self, task_name: str):
123        task_def, config_partition = self.config.lookup_task(task_name)
124
125        if task_def is None or config_partition is None:
126            raise PoeException(f"Cannot instantiate unknown task {task_name!r}")
127
128        task_type = PoeTask.resolve_task_type(task_def, self.config)
129        if not task_type:
130            raise ConfigValidationError(
131                "Task definition must be a string, a list, or a table including exactly"
132                " one task key\n"
133                f"Available task keys: {set(PoeTask.get_task_types())!r}",
134                task_name=task_name,
135                filename=(
136                    None if config_partition.is_primary else str(config_partition.path)
137                ),
138            )
139
140        self.__cache[task_name] = self.create(
141            task_name, task_type, task_def, source=config_partition
142        )
143
144    def __iter__(self):
145        return iter(self.__cache.values())
TaskSpecFactory(config: poethepoet.config.PoeConfig)
65    def __init__(self, config: "PoeConfig"):
66        self.__cache = {}
67        self.config = config
def get( self, task_name: str, task_def: Union[str, Mapping[str, Any], Sequence[Union[str, Mapping[str, Any]]], NoneType] = None, task_type: Optional[str] = None, parent: Optional[PoeTask.TaskSpec] = None) -> PoeTask.TaskSpec:
72    def get(
73        self,
74        task_name: str,
75        task_def: Optional[TaskDef] = None,
76        task_type: Optional[str] = None,
77        parent: Optional["PoeTask.TaskSpec"] = None,
78    ) -> "PoeTask.TaskSpec":
79        if task_def and parent:
80            # This is probably a subtask and will be cached by the parent task_spec
81            if not task_type:
82                task_type = PoeTask.resolve_task_type(task_def, self.config)
83                assert task_type
84            return self.create(
85                task_name, task_type, task_def, source=parent.source, parent=parent
86            )
87
88        if task_name not in self.__cache:
89            self.load(task_name)
90
91        return self.__cache[task_name]
def create( self, task_name: str, task_type: str, task_def: Union[str, Mapping[str, Any], Sequence[Union[str, Mapping[str, Any]]]], source: poethepoet.config.ConfigPartition, parent: Optional[PoeTask.TaskSpec] = None) -> PoeTask.TaskSpec:
 93    def create(
 94        self,
 95        task_name: str,
 96        task_type: str,
 97        task_def: TaskDef,
 98        source: "ConfigPartition",
 99        parent: Optional["PoeTask.TaskSpec"] = None,
100    ) -> "PoeTask.TaskSpec":
101        """
102        A parent task should be provided when this task is defined inline within another
103        task, for example as part of a sequence.
104        """
105        if not isinstance(task_def, dict):
106            task_def = {task_type: task_def}
107
108        return PoeTask.lookup_task_spec_cls(task_type)(
109            name=task_name,
110            task_def=task_def,
111            factory=self,
112            source=source,
113            parent=parent,
114        )

A parent task should be provided when this task is defined inline within another task, for example as part of a sequence.

def load_all(self):
116    def load_all(self):
117        for task_name in self.config.task_names:
118            self.load(task_name)
119
120        return self
def load(self, task_name: str):
122    def load(self, task_name: str):
123        task_def, config_partition = self.config.lookup_task(task_name)
124
125        if task_def is None or config_partition is None:
126            raise PoeException(f"Cannot instantiate unknown task {task_name!r}")
127
128        task_type = PoeTask.resolve_task_type(task_def, self.config)
129        if not task_type:
130            raise ConfigValidationError(
131                "Task definition must be a string, a list, or a table including exactly"
132                " one task key\n"
133                f"Available task keys: {set(PoeTask.get_task_types())!r}",
134                task_name=task_name,
135                filename=(
136                    None if config_partition.is_primary else str(config_partition.path)
137                ),
138            )
139
140        self.__cache[task_name] = self.create(
141            task_name, task_type, task_def, source=config_partition
142        )
class TaskContext(typing.NamedTuple):
148class TaskContext(NamedTuple):
149    """
150    Collection of contextual config inherited from a parent task to a child task
151    """
152
153    config: "PoeConfig"
154    cwd: str
155    ui: "PoeUi"
156    specs: "TaskSpecFactory"
157
158    @classmethod
159    def from_task(cls, parent_task: "PoeTask"):
160        return cls(
161            config=parent_task.ctx.config,
162            cwd=str(parent_task.spec.options.get("cwd", parent_task.ctx.cwd)),
163            specs=parent_task.ctx.specs,
164            ui=parent_task.ctx.ui,
165        )

Collection of contextual config inherited from a parent task to a child task

TaskContext( config: ForwardRef('PoeConfig'), cwd: str, ui: ForwardRef('PoeUi'), specs: ForwardRef('TaskSpecFactory'))

Create new instance of TaskContext(config, cwd, ui, specs)

Alias for field number 0

cwd: str

Alias for field number 1

Alias for field number 2

Alias for field number 3

@classmethod
def from_task(cls, parent_task: PoeTask):
158    @classmethod
159    def from_task(cls, parent_task: "PoeTask"):
160        return cls(
161            config=parent_task.ctx.config,
162            cwd=str(parent_task.spec.options.get("cwd", parent_task.ctx.cwd)),
163            specs=parent_task.ctx.specs,
164            ui=parent_task.ctx.ui,
165        )
Inherited Members
builtins.tuple
index
count
class PoeTask:
168class PoeTask(metaclass=MetaPoeTask):
169    __key__: ClassVar[str]
170    __content_type__: ClassVar[Type] = str
171
172    class TaskOptions(PoeOptions):
173        args: Optional[Union[dict, list]] = None
174        capture_stdout: Optional[str] = None
175        cwd: Optional[str] = None
176        deps: Optional[Sequence[str]] = None
177        env: Optional[dict] = None
178        envfile: Optional[Union[str, list]] = None
179        executor: Optional[dict] = None
180        help: Optional[str] = None
181        uses: Optional[dict] = None
182
183        def validate(self):
184            """
185            Validation rules that don't require any extra context go here.
186            """
187
188    class TaskSpec:
189        name: str
190        content: TaskContent
191        options: "PoeTask.TaskOptions"
192        task_type: Type["PoeTask"]
193        source: "ConfigPartition"
194        parent: Optional["PoeTask.TaskSpec"] = None
195
196        _args: Optional["PoeTaskArgs"] = None
197
198        def __init__(
199            self,
200            name: str,
201            task_def: Dict[str, Any],
202            factory: TaskSpecFactory,
203            source: "ConfigPartition",
204            parent: Optional["PoeTask.TaskSpec"] = None,
205        ):
206            self.name = name
207            self.content = task_def[self.task_type.__key__]
208            self.options = self._parse_options(task_def)
209            self.source = source
210            self.parent = parent
211
212        def _parse_options(self, task_def: Dict[str, Any]):
213            try:
214                return next(
215                    self.task_type.TaskOptions.parse(
216                        task_def, extra_keys=(self.task_type.__key__,)
217                    )
218                )
219            except ConfigValidationError as error:
220                error.task_name = self.name
221                raise
222
223        def get_task_env(
224            self,
225            parent_env: "EnvVarsManager",
226            uses_values: Optional[Mapping[str, str]] = None,
227        ) -> "EnvVarsManager":
228            """
229            Resolve the EnvVarsManager for this task, relative to the given parent_env
230            """
231            task_envfile = self.options.get("envfile")
232            task_env = self.options.get("env")
233
234            result = parent_env.clone()
235
236            # Include env vars from outputs of upstream dependencies
237            if uses_values:
238                result.update(uses_values)
239
240            result.set("POE_CONF_DIR", str(self.source.config_dir))
241            result.apply_env_config(
242                task_envfile,
243                task_env,
244                config_dir=self.source.config_dir,
245                config_working_dir=self.source.cwd,
246            )
247
248            return result
249
250        @property
251        def args(self) -> Optional["PoeTaskArgs"]:
252            from .args import PoeTaskArgs
253
254            if not self._args and self.options.args:
255                self._args = PoeTaskArgs(self.options.args, self.name)
256
257            return self._args
258
259        def create_task(
260            self,
261            invocation: Tuple[str, ...],
262            ctx: TaskContext,
263            capture_stdout: Union[str, bool] = False,
264        ) -> "PoeTask":
265            return self.task_type(
266                spec=self,
267                invocation=invocation,
268                capture_stdout=capture_stdout,
269                ctx=ctx,
270            )
271
272        def validate(self, config: "PoeConfig", task_specs: TaskSpecFactory):
273            try:
274                self._base_validations(config, task_specs)
275                self._task_validations(config, task_specs)
276            except ConfigValidationError as error:
277                error.task_name = self.name
278                raise
279
280        def _base_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory):
281            """
282            Perform validations on this TaskSpec that apply to all task types
283            """
284            if not (self.name[0].isalpha() or self.name[0] == "_"):
285                raise ConfigValidationError(
286                    "Task names must start with a letter or underscore."
287                )
288
289            if not self.parent and not _TASK_NAME_PATTERN.match(self.name):
290                raise ConfigValidationError(
291                    "Task names characters must be alphanumeric, colon, underscore or "
292                    "dash."
293                )
294
295            if not isinstance(self.content, self.task_type.__content_type__):
296                raise ConfigValidationError(
297                    f"Content for {self.task_type.__name__} must be a "
298                    f"{self.task_type.__content_type__.__name__}"
299                )
300
301            if self.options.deps:
302                for dep in self.options.deps:
303                    dep_task_name = dep.split(" ", 1)[0]
304                    if dep_task_name not in task_specs:
305                        raise ConfigValidationError(
306                            "'deps' option includes reference to unknown task: "
307                            f"{dep_task_name!r}"
308                        )
309
310                    if task_specs.get(dep_task_name).options.get("use_exec", False):
311                        raise ConfigValidationError(
312                            f"'deps' option includes reference to task with "
313                            f"'use_exec' set to true: {dep_task_name!r}"
314                        )
315
316            if self.options.uses:
317                from ..helpers import is_valid_env_var
318
319                for key, dep in self.options.uses.items():
320                    if not is_valid_env_var(key):
321                        raise ConfigValidationError(
322                            f"'uses' option includes invalid key: {key!r}"
323                        )
324
325                    dep_task_name = dep.split(" ", 1)[0]
326                    if dep_task_name not in task_specs:
327                        raise ConfigValidationError(
328                            "'uses' options includes reference to unknown task: "
329                            f"{dep_task_name!r}"
330                        )
331
332                    referenced_task = task_specs.get(dep_task_name)
333                    if referenced_task.options.get("use_exec", False):
334                        raise ConfigValidationError(
335                            f"'uses' option references task with 'use_exec' set to "
336                            f"true: {dep_task_name!r}"
337                        )
338                    if referenced_task.options.get("capture_stdout"):
339                        raise ConfigValidationError(
340                            f"'uses' option references task with 'capture_stdout' "
341                            f"option set: {dep_task_name!r}"
342                        )
343
344        def _task_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory):
345            """
346            Perform validations on this TaskSpec that apply to a specific task type
347            """
348
349    spec: TaskSpec
350    ctx: TaskContext
351    _parsed_args: Optional[Tuple[Dict[str, str], Tuple[str, ...]]] = None
352
353    __task_types: ClassVar[Dict[str, Type["PoeTask"]]] = {}
354    __upstream_invocations: Optional[
355        Dict[str, Union[List[Tuple[str, ...]], Dict[str, Tuple[str, ...]]]]
356    ] = None
357
358    def __init__(
359        self,
360        spec: TaskSpec,
361        invocation: Tuple[str, ...],
362        ctx: TaskContext,
363        capture_stdout: Union[str, bool] = False,
364    ):
365        self.spec = spec
366        self.invocation = invocation
367        self.ctx = ctx
368        self.capture_stdout = spec.options.capture_stdout or capture_stdout
369        self._is_windows = sys.platform == "win32"
370
371    @property
372    def name(self):
373        return self.spec.name
374
375    @classmethod
376    def lookup_task_spec_cls(cls, task_key: str) -> Type[TaskSpec]:
377        return cls.__task_types[task_key].TaskSpec
378
379    @classmethod
380    def resolve_task_type(
381        cls,
382        task_def: TaskDef,
383        config: "PoeConfig",
384        array_item: Union[bool, str] = False,
385    ) -> Optional[str]:
386        if isinstance(task_def, str):
387            if array_item:
388                return (
389                    array_item
390                    if isinstance(array_item, str)
391                    else config.default_array_item_task_type
392                )
393            else:
394                return config.default_task_type
395
396        elif isinstance(task_def, dict):
397            task_type_keys = set(task_def.keys()).intersection(cls.__task_types)
398            if len(task_type_keys) == 1:
399                return next(iter(task_type_keys))
400
401        elif isinstance(task_def, list):
402            return config.default_array_task_type
403
404        return None
405
406    def _parse_named_args(
407        self, extra_args: Sequence[str], env: "EnvVarsManager"
408    ) -> Optional[Dict[str, str]]:
409        if task_args := self.spec.args:
410            return task_args.parse(extra_args, env, self.ctx.ui.program_name)
411
412        return None
413
414    def get_parsed_arguments(
415        self, env: "EnvVarsManager"
416    ) -> Tuple[Dict[str, str], Tuple[str, ...]]:
417        if self._parsed_args is None:
418            all_args = self.invocation[1:]
419
420            if task_args := self.spec.args:
421                try:
422                    split_index = all_args.index("--")
423                    option_args = all_args[:split_index]
424                    extra_args = all_args[split_index + 1 :]
425                except ValueError:
426                    option_args = all_args
427                    extra_args = tuple()
428
429                self._parsed_args = (
430                    task_args.parse(option_args, env, self.ctx.ui.program_name),
431                    extra_args,
432                )
433
434            else:
435                self._parsed_args = ({}, all_args)
436
437        return self._parsed_args
438
439    def run(
440        self,
441        context: "RunContext",
442        parent_env: Optional["EnvVarsManager"] = None,
443    ) -> int:
444        """
445        Run this task
446        """
447
448        if environ.get("POE_DEBUG"):
449            task_type_key = self.__key__  # type: ignore[attr-defined]
450            print(f" * Running     {task_type_key}:{self.name}")
451            print(f" . Invocation  {self.invocation!r}")
452
453        upstream_invocations = self._get_upstream_invocations(context)
454
455        if context.dry and upstream_invocations.get("uses", {}):
456            self._print_action(
457                (
458                    "unresolved dependency task results via uses option for task "
459                    f"{self.name!r}"
460                ),
461                dry=True,
462                unresolved=True,
463            )
464            return 0
465
466        task_env = self.spec.get_task_env(
467            parent_env or context.env,
468            context._get_dep_values(upstream_invocations["uses"]),
469        )
470
471        if environ.get("POE_DEBUG"):
472            named_arg_values, extra_args = self.get_parsed_arguments(task_env)
473            print(f" . Parsed args {named_arg_values!r}")
474            print(f" . Extra args  {extra_args!r}")
475
476        return self._handle_run(context, task_env)
477
478    def _handle_run(
479        self,
480        context: "RunContext",
481        env: "EnvVarsManager",
482    ) -> int:
483        """
484        This method must be implemented by a subclass and return a single executor
485        result.
486        """
487        raise NotImplementedError
488
489    def _get_executor(
490        self,
491        context: "RunContext",
492        env: "EnvVarsManager",
493        *,
494        delegate_dry_run: bool = False,
495    ):
496        return context.get_executor(
497            self.invocation,
498            env,
499            working_dir=self.get_working_dir(env),
500            executor_config=self.spec.options.get("executor"),
501            capture_stdout=self.capture_stdout,
502            delegate_dry_run=delegate_dry_run,
503        )
504
505    def get_working_dir(
506        self,
507        env: "EnvVarsManager",
508    ) -> Path:
509        cwd_option = env.fill_template(self.spec.options.get("cwd", self.ctx.cwd))
510        working_dir = Path(cwd_option)
511
512        if not working_dir.is_absolute():
513            working_dir = self.ctx.config.project_dir / working_dir
514
515        return working_dir
516
517    def iter_upstream_tasks(
518        self, context: "RunContext"
519    ) -> Iterator[Tuple[str, "PoeTask"]]:
520        invocations = self._get_upstream_invocations(context)
521        for invocation in invocations["deps"]:
522            yield ("", self._instantiate_dep(invocation, capture_stdout=False))
523        for key, invocation in invocations["uses"].items():
524            yield (key, self._instantiate_dep(invocation, capture_stdout=True))
525
526    def _get_upstream_invocations(self, context: "RunContext"):
527        """
528        NB. this memoization assumes the context (and contained env vars) will be the
529        same in all instances for the lifetime of this object. Whilst this should be OK
530        for all current usecases is it strictly speaking something that this object
531        should not know enough to safely assume. So we probably want to revisit this.
532        """
533        import shlex
534
535        options = self.spec.options
536
537        if self.__upstream_invocations is None:
538            env = self.spec.get_task_env(context.env)
539            env.update(self.get_parsed_arguments(env)[0])
540
541            self.__upstream_invocations = {
542                "deps": [
543                    tuple(shlex.split(env.fill_template(task_ref)))
544                    for task_ref in options.get("deps", tuple())
545                ],
546                "uses": {
547                    key: tuple(shlex.split(env.fill_template(task_ref)))
548                    for key, task_ref in options.get("uses", {}).items()
549                },
550            }
551
552        return self.__upstream_invocations
553
554    def _instantiate_dep(
555        self, invocation: Tuple[str, ...], capture_stdout: bool
556    ) -> "PoeTask":
557        return self.ctx.specs.get(invocation[0]).create_task(
558            invocation=invocation,
559            ctx=TaskContext(
560                config=self.ctx.config,
561                cwd=str(self.ctx.config.project_dir),
562                specs=self.ctx.specs,
563                ui=self.ctx.ui,
564            ),
565            capture_stdout=capture_stdout,
566        )
567
568    def has_deps(self) -> bool:
569        return bool(
570            self.spec.options.get("deps", False) or self.spec.options.get("uses", False)
571        )
572
573    @classmethod
574    def is_task_type(
575        cls, task_def_key: str, content_type: Optional[Type] = None
576    ) -> bool:
577        """
578        Checks whether the given key identifies a known task type.
579        Optionally also check whether the given content_type matches the type of content
580        for this tasks type.
581        """
582        return task_def_key in cls.__task_types and (
583            content_type is None
584            or cls.__task_types[task_def_key].__content_type__ is content_type
585        )
586
587    @classmethod
588    def get_task_types(cls, content_type: Optional[Type] = None) -> Tuple[str, ...]:
589        if content_type:
590            return tuple(
591                task_type
592                for task_type, task_cls in cls.__task_types.items()
593                if task_cls.__content_type__ is content_type
594            )
595        return tuple(task_type for task_type in cls.__task_types.keys())
596
597    def _print_action(self, action: str, dry: bool, unresolved: bool = False):
598        """
599        Print the action taken by a task just before executing it.
600        """
601        min_verbosity = -1 if dry else 0
602        arrow = "??" if unresolved else "<=" if self.capture_stdout else "=>"
603        self.ctx.ui.print_msg(
604            f"<hl>Poe {arrow}</hl> <action>{action}</action>", min_verbosity
605        )
606
607    class Error(Exception):
608        pass
PoeTask( spec: PoeTask.TaskSpec, invocation: Tuple[str, ...], ctx: TaskContext, capture_stdout: Union[str, bool] = False)
358    def __init__(
359        self,
360        spec: TaskSpec,
361        invocation: Tuple[str, ...],
362        ctx: TaskContext,
363        capture_stdout: Union[str, bool] = False,
364    ):
365        self.spec = spec
366        self.invocation = invocation
367        self.ctx = ctx
368        self.capture_stdout = spec.options.capture_stdout or capture_stdout
369        self._is_windows = sys.platform == "win32"
invocation
capture_stdout
name
371    @property
372    def name(self):
373        return self.spec.name
@classmethod
def lookup_task_spec_cls(cls, task_key: str) -> Type[PoeTask.TaskSpec]:
375    @classmethod
376    def lookup_task_spec_cls(cls, task_key: str) -> Type[TaskSpec]:
377        return cls.__task_types[task_key].TaskSpec
@classmethod
def resolve_task_type( cls, task_def: Union[str, Mapping[str, Any], Sequence[Union[str, Mapping[str, Any]]]], config: poethepoet.config.PoeConfig, array_item: Union[bool, str] = False) -> Optional[str]:
379    @classmethod
380    def resolve_task_type(
381        cls,
382        task_def: TaskDef,
383        config: "PoeConfig",
384        array_item: Union[bool, str] = False,
385    ) -> Optional[str]:
386        if isinstance(task_def, str):
387            if array_item:
388                return (
389                    array_item
390                    if isinstance(array_item, str)
391                    else config.default_array_item_task_type
392                )
393            else:
394                return config.default_task_type
395
396        elif isinstance(task_def, dict):
397            task_type_keys = set(task_def.keys()).intersection(cls.__task_types)
398            if len(task_type_keys) == 1:
399                return next(iter(task_type_keys))
400
401        elif isinstance(task_def, list):
402            return config.default_array_task_type
403
404        return None
def get_parsed_arguments( self, env: poethepoet.env.manager.EnvVarsManager) -> Tuple[Dict[str, str], Tuple[str, ...]]:
414    def get_parsed_arguments(
415        self, env: "EnvVarsManager"
416    ) -> Tuple[Dict[str, str], Tuple[str, ...]]:
417        if self._parsed_args is None:
418            all_args = self.invocation[1:]
419
420            if task_args := self.spec.args:
421                try:
422                    split_index = all_args.index("--")
423                    option_args = all_args[:split_index]
424                    extra_args = all_args[split_index + 1 :]
425                except ValueError:
426                    option_args = all_args
427                    extra_args = tuple()
428
429                self._parsed_args = (
430                    task_args.parse(option_args, env, self.ctx.ui.program_name),
431                    extra_args,
432                )
433
434            else:
435                self._parsed_args = ({}, all_args)
436
437        return self._parsed_args
def run( self, context: poethepoet.context.RunContext, parent_env: Optional[poethepoet.env.manager.EnvVarsManager] = None) -> int:
439    def run(
440        self,
441        context: "RunContext",
442        parent_env: Optional["EnvVarsManager"] = None,
443    ) -> int:
444        """
445        Run this task
446        """
447
448        if environ.get("POE_DEBUG"):
449            task_type_key = self.__key__  # type: ignore[attr-defined]
450            print(f" * Running     {task_type_key}:{self.name}")
451            print(f" . Invocation  {self.invocation!r}")
452
453        upstream_invocations = self._get_upstream_invocations(context)
454
455        if context.dry and upstream_invocations.get("uses", {}):
456            self._print_action(
457                (
458                    "unresolved dependency task results via uses option for task "
459                    f"{self.name!r}"
460                ),
461                dry=True,
462                unresolved=True,
463            )
464            return 0
465
466        task_env = self.spec.get_task_env(
467            parent_env or context.env,
468            context._get_dep_values(upstream_invocations["uses"]),
469        )
470
471        if environ.get("POE_DEBUG"):
472            named_arg_values, extra_args = self.get_parsed_arguments(task_env)
473            print(f" . Parsed args {named_arg_values!r}")
474            print(f" . Extra args  {extra_args!r}")
475
476        return self._handle_run(context, task_env)

Run this task

def get_working_dir(self, env: poethepoet.env.manager.EnvVarsManager) -> pathlib.Path:
505    def get_working_dir(
506        self,
507        env: "EnvVarsManager",
508    ) -> Path:
509        cwd_option = env.fill_template(self.spec.options.get("cwd", self.ctx.cwd))
510        working_dir = Path(cwd_option)
511
512        if not working_dir.is_absolute():
513            working_dir = self.ctx.config.project_dir / working_dir
514
515        return working_dir
def iter_upstream_tasks( self, context: poethepoet.context.RunContext) -> Iterator[Tuple[str, PoeTask]]:
517    def iter_upstream_tasks(
518        self, context: "RunContext"
519    ) -> Iterator[Tuple[str, "PoeTask"]]:
520        invocations = self._get_upstream_invocations(context)
521        for invocation in invocations["deps"]:
522            yield ("", self._instantiate_dep(invocation, capture_stdout=False))
523        for key, invocation in invocations["uses"].items():
524            yield (key, self._instantiate_dep(invocation, capture_stdout=True))
def has_deps(self) -> bool:
568    def has_deps(self) -> bool:
569        return bool(
570            self.spec.options.get("deps", False) or self.spec.options.get("uses", False)
571        )
@classmethod
def is_task_type(cls, task_def_key: str, content_type: Optional[Type] = None) -> bool:
573    @classmethod
574    def is_task_type(
575        cls, task_def_key: str, content_type: Optional[Type] = None
576    ) -> bool:
577        """
578        Checks whether the given key identifies a known task type.
579        Optionally also check whether the given content_type matches the type of content
580        for this tasks type.
581        """
582        return task_def_key in cls.__task_types and (
583            content_type is None
584            or cls.__task_types[task_def_key].__content_type__ is content_type
585        )

Checks whether the given key identifies a known task type. Optionally also check whether the given content_type matches the type of content for this tasks type.

@classmethod
def get_task_types(cls, content_type: Optional[Type] = None) -> Tuple[str, ...]:
587    @classmethod
588    def get_task_types(cls, content_type: Optional[Type] = None) -> Tuple[str, ...]:
589        if content_type:
590            return tuple(
591                task_type
592                for task_type, task_cls in cls.__task_types.items()
593                if task_cls.__content_type__ is content_type
594            )
595        return tuple(task_type for task_type in cls.__task_types.keys())
class PoeTask.TaskOptions(poethepoet.options.PoeOptions):
172    class TaskOptions(PoeOptions):
173        args: Optional[Union[dict, list]] = None
174        capture_stdout: Optional[str] = None
175        cwd: Optional[str] = None
176        deps: Optional[Sequence[str]] = None
177        env: Optional[dict] = None
178        envfile: Optional[Union[str, list]] = None
179        executor: Optional[dict] = None
180        help: Optional[str] = None
181        uses: Optional[dict] = None
182
183        def validate(self):
184            """
185            Validation rules that don't require any extra context go here.
186            """

A special kind of config object that parses options ...

args: Union[dict, list, NoneType] = None
capture_stdout: Optional[str] = None
cwd: Optional[str] = None
deps: Optional[Sequence[str]] = None
env: Optional[dict] = None
envfile: Union[str, list, NoneType] = None
executor: Optional[dict] = None
help: Optional[str] = None
uses: Optional[dict] = None
def validate(self):
183        def validate(self):
184            """
185            Validation rules that don't require any extra context go here.
186            """

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
class PoeTask.TaskSpec:
188    class TaskSpec:
189        name: str
190        content: TaskContent
191        options: "PoeTask.TaskOptions"
192        task_type: Type["PoeTask"]
193        source: "ConfigPartition"
194        parent: Optional["PoeTask.TaskSpec"] = None
195
196        _args: Optional["PoeTaskArgs"] = None
197
198        def __init__(
199            self,
200            name: str,
201            task_def: Dict[str, Any],
202            factory: TaskSpecFactory,
203            source: "ConfigPartition",
204            parent: Optional["PoeTask.TaskSpec"] = None,
205        ):
206            self.name = name
207            self.content = task_def[self.task_type.__key__]
208            self.options = self._parse_options(task_def)
209            self.source = source
210            self.parent = parent
211
212        def _parse_options(self, task_def: Dict[str, Any]):
213            try:
214                return next(
215                    self.task_type.TaskOptions.parse(
216                        task_def, extra_keys=(self.task_type.__key__,)
217                    )
218                )
219            except ConfigValidationError as error:
220                error.task_name = self.name
221                raise
222
223        def get_task_env(
224            self,
225            parent_env: "EnvVarsManager",
226            uses_values: Optional[Mapping[str, str]] = None,
227        ) -> "EnvVarsManager":
228            """
229            Resolve the EnvVarsManager for this task, relative to the given parent_env
230            """
231            task_envfile = self.options.get("envfile")
232            task_env = self.options.get("env")
233
234            result = parent_env.clone()
235
236            # Include env vars from outputs of upstream dependencies
237            if uses_values:
238                result.update(uses_values)
239
240            result.set("POE_CONF_DIR", str(self.source.config_dir))
241            result.apply_env_config(
242                task_envfile,
243                task_env,
244                config_dir=self.source.config_dir,
245                config_working_dir=self.source.cwd,
246            )
247
248            return result
249
250        @property
251        def args(self) -> Optional["PoeTaskArgs"]:
252            from .args import PoeTaskArgs
253
254            if not self._args and self.options.args:
255                self._args = PoeTaskArgs(self.options.args, self.name)
256
257            return self._args
258
259        def create_task(
260            self,
261            invocation: Tuple[str, ...],
262            ctx: TaskContext,
263            capture_stdout: Union[str, bool] = False,
264        ) -> "PoeTask":
265            return self.task_type(
266                spec=self,
267                invocation=invocation,
268                capture_stdout=capture_stdout,
269                ctx=ctx,
270            )
271
272        def validate(self, config: "PoeConfig", task_specs: TaskSpecFactory):
273            try:
274                self._base_validations(config, task_specs)
275                self._task_validations(config, task_specs)
276            except ConfigValidationError as error:
277                error.task_name = self.name
278                raise
279
280        def _base_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory):
281            """
282            Perform validations on this TaskSpec that apply to all task types
283            """
284            if not (self.name[0].isalpha() or self.name[0] == "_"):
285                raise ConfigValidationError(
286                    "Task names must start with a letter or underscore."
287                )
288
289            if not self.parent and not _TASK_NAME_PATTERN.match(self.name):
290                raise ConfigValidationError(
291                    "Task names characters must be alphanumeric, colon, underscore or "
292                    "dash."
293                )
294
295            if not isinstance(self.content, self.task_type.__content_type__):
296                raise ConfigValidationError(
297                    f"Content for {self.task_type.__name__} must be a "
298                    f"{self.task_type.__content_type__.__name__}"
299                )
300
301            if self.options.deps:
302                for dep in self.options.deps:
303                    dep_task_name = dep.split(" ", 1)[0]
304                    if dep_task_name not in task_specs:
305                        raise ConfigValidationError(
306                            "'deps' option includes reference to unknown task: "
307                            f"{dep_task_name!r}"
308                        )
309
310                    if task_specs.get(dep_task_name).options.get("use_exec", False):
311                        raise ConfigValidationError(
312                            f"'deps' option includes reference to task with "
313                            f"'use_exec' set to true: {dep_task_name!r}"
314                        )
315
316            if self.options.uses:
317                from ..helpers import is_valid_env_var
318
319                for key, dep in self.options.uses.items():
320                    if not is_valid_env_var(key):
321                        raise ConfigValidationError(
322                            f"'uses' option includes invalid key: {key!r}"
323                        )
324
325                    dep_task_name = dep.split(" ", 1)[0]
326                    if dep_task_name not in task_specs:
327                        raise ConfigValidationError(
328                            "'uses' options includes reference to unknown task: "
329                            f"{dep_task_name!r}"
330                        )
331
332                    referenced_task = task_specs.get(dep_task_name)
333                    if referenced_task.options.get("use_exec", False):
334                        raise ConfigValidationError(
335                            f"'uses' option references task with 'use_exec' set to "
336                            f"true: {dep_task_name!r}"
337                        )
338                    if referenced_task.options.get("capture_stdout"):
339                        raise ConfigValidationError(
340                            f"'uses' option references task with 'capture_stdout' "
341                            f"option set: {dep_task_name!r}"
342                        )
343
344        def _task_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory):
345            """
346            Perform validations on this TaskSpec that apply to a specific task type
347            """
PoeTask.TaskSpec( name: str, task_def: Dict[str, Any], factory: TaskSpecFactory, source: poethepoet.config.ConfigPartition, parent: Optional[PoeTask.TaskSpec] = None)
198        def __init__(
199            self,
200            name: str,
201            task_def: Dict[str, Any],
202            factory: TaskSpecFactory,
203            source: "ConfigPartition",
204            parent: Optional["PoeTask.TaskSpec"] = None,
205        ):
206            self.name = name
207            self.content = task_def[self.task_type.__key__]
208            self.options = self._parse_options(task_def)
209            self.source = source
210            self.parent = parent
name: str
content: Union[str, Sequence[Union[str, Mapping[str, Any]]]]
task_type: Type[PoeTask]
parent: Optional[PoeTask.TaskSpec] = None
def get_task_env( self, parent_env: poethepoet.env.manager.EnvVarsManager, uses_values: Optional[Mapping[str, str]] = None) -> poethepoet.env.manager.EnvVarsManager:
223        def get_task_env(
224            self,
225            parent_env: "EnvVarsManager",
226            uses_values: Optional[Mapping[str, str]] = None,
227        ) -> "EnvVarsManager":
228            """
229            Resolve the EnvVarsManager for this task, relative to the given parent_env
230            """
231            task_envfile = self.options.get("envfile")
232            task_env = self.options.get("env")
233
234            result = parent_env.clone()
235
236            # Include env vars from outputs of upstream dependencies
237            if uses_values:
238                result.update(uses_values)
239
240            result.set("POE_CONF_DIR", str(self.source.config_dir))
241            result.apply_env_config(
242                task_envfile,
243                task_env,
244                config_dir=self.source.config_dir,
245                config_working_dir=self.source.cwd,
246            )
247
248            return result

Resolve the EnvVarsManager for this task, relative to the given parent_env

args: Optional[poethepoet.task.args.PoeTaskArgs]
250        @property
251        def args(self) -> Optional["PoeTaskArgs"]:
252            from .args import PoeTaskArgs
253
254            if not self._args and self.options.args:
255                self._args = PoeTaskArgs(self.options.args, self.name)
256
257            return self._args
def create_task( self, invocation: Tuple[str, ...], ctx: TaskContext, capture_stdout: Union[str, bool] = False) -> PoeTask:
259        def create_task(
260            self,
261            invocation: Tuple[str, ...],
262            ctx: TaskContext,
263            capture_stdout: Union[str, bool] = False,
264        ) -> "PoeTask":
265            return self.task_type(
266                spec=self,
267                invocation=invocation,
268                capture_stdout=capture_stdout,
269                ctx=ctx,
270            )
def validate( self, config: poethepoet.config.PoeConfig, task_specs: TaskSpecFactory):
272        def validate(self, config: "PoeConfig", task_specs: TaskSpecFactory):
273            try:
274                self._base_validations(config, task_specs)
275                self._task_validations(config, task_specs)
276            except ConfigValidationError as error:
277                error.task_name = self.name
278                raise
class PoeTask.Error(builtins.Exception):
607    class Error(Exception):
608        pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
args