poethepoet.task

 1from .cmd import CmdTask
 2from .expr import ExprTask
 3from .ref import RefTask
 4from .script import ScriptTask
 5from .sequence import SequenceTask
 6from .shell import ShellTask
 7from .switch import SwitchTask
 8
 9__all__ = [
10    "CmdTask",
11    "ExprTask",
12    "RefTask",
13    "ScriptTask",
14    "SequenceTask",
15    "ShellTask",
16    "SwitchTask",
17]
class CmdTask(poethepoet.task.base.PoeTask):
15class CmdTask(PoeTask):
16    """
17    A task consisting of a reference to a shell command
18    """
19
20    __key__ = "cmd"
21
22    class TaskOptions(PoeTask.TaskOptions):
23        use_exec: bool = False
24
25        def validate(self):
26            """
27            Validation rules that don't require any extra context go here.
28            """
29            super().validate()
30            if self.use_exec and self.capture_stdout:
31                raise ConfigValidationError(
32                    "'use_exec' and 'capture_stdout'"
33                    " options cannot be both provided on the same task."
34                )
35
36    class TaskSpec(PoeTask.TaskSpec):
37        content: str
38        options: "CmdTask.TaskOptions"
39
40        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
41            """
42            Perform validations on this TaskSpec that apply to a specific task type
43            """
44            if not self.content.strip():
45                raise ConfigValidationError("Task has no content")
46
47    spec: TaskSpec
48
49    def _handle_run(
50        self,
51        context: "RunContext",
52        env: "EnvVarsManager",
53    ) -> int:
54        named_arg_values, extra_args = self.get_parsed_arguments(env)
55        env.update(named_arg_values)
56
57        cmd = (*self._resolve_commandline(context, env), *extra_args)
58
59        self._print_action(shlex.join(cmd), context.dry)
60
61        return self._get_executor(context, env).execute(
62            cmd, use_exec=self.spec.options.get("use_exec", False)
63        )
64
65    def _resolve_commandline(self, context: "RunContext", env: "EnvVarsManager"):
66        from ..helpers.command import parse_poe_cmd, resolve_command_tokens
67        from ..helpers.command.ast_core import ParseError
68
69        try:
70            command_lines = parse_poe_cmd(self.spec.content).command_lines
71        except ParseError as error:
72            raise PoeException(
73                f"Couldn't parse command line for task {self.name!r}: {error.args[0]}"
74            ) from error
75
76        if not command_lines:
77            raise PoeException(
78                f"Invalid cmd task {self.name!r} does not include any command lines"
79            )
80        if any(line._terminator == ";" for line in command_lines[:-1]):
81            # lines terminated by a line break or comment are implicitly joined
82            raise PoeException(
83                f"Invalid cmd task {self.name!r} includes multiple command lines"
84            )
85
86        working_dir = self.get_working_dir(env)
87
88        result = []
89        for cmd_token, has_glob in resolve_command_tokens(command_lines, env):
90            if has_glob:
91                # Resolve glob pattern from the working directory
92                result.extend([str(match) for match in working_dir.glob(cmd_token)])
93            else:
94                result.append(cmd_token)
95
96        return result

A task consisting of a reference to a shell command

class CmdTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
22    class TaskOptions(PoeTask.TaskOptions):
23        use_exec: bool = False
24
25        def validate(self):
26            """
27            Validation rules that don't require any extra context go here.
28            """
29            super().validate()
30            if self.use_exec and self.capture_stdout:
31                raise ConfigValidationError(
32                    "'use_exec' and 'capture_stdout'"
33                    " options cannot be both provided on the same task."
34                )

A special kind of config object that parses options ...

use_exec: bool = False
def validate(self):
25        def validate(self):
26            """
27            Validation rules that don't require any extra context go here.
28            """
29            super().validate()
30            if self.use_exec and self.capture_stdout:
31                raise ConfigValidationError(
32                    "'use_exec' and 'capture_stdout'"
33                    " options cannot be both provided on the same task."
34                )

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
class CmdTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
36    class TaskSpec(PoeTask.TaskSpec):
37        content: str
38        options: "CmdTask.TaskOptions"
39
40        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
41            """
42            Perform validations on this TaskSpec that apply to a specific task type
43            """
44            if not self.content.strip():
45                raise ConfigValidationError("Task has no content")
content: str
task_type: Type[poethepoet.task.base.PoeTask] = <class 'CmdTask'>
class ExprTask(poethepoet.task.base.PoeTask):
 25class ExprTask(PoeTask):
 26    """
 27    A task consisting of a python expression
 28    """
 29
 30    content: str
 31
 32    __key__ = "expr"
 33
 34    class TaskOptions(PoeTask.TaskOptions):
 35        imports: Sequence[str] = tuple()
 36        assert_: Union[bool, int] = False
 37        use_exec: bool = False
 38
 39        def validate(self):
 40            super().validate()
 41            if self.use_exec and self.capture_stdout:
 42                raise ConfigValidationError(
 43                    "'use_exec' and 'capture_stdout'"
 44                    " options cannot be both provided on the same task."
 45                )
 46
 47    class TaskSpec(PoeTask.TaskSpec):
 48        content: str
 49        options: "ExprTask.TaskOptions"
 50
 51        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
 52            """
 53            Perform validations on this TaskSpec that apply to a specific task type
 54            """
 55            try:
 56                # ruff: noqa: E501
 57                self.task_type._substitute_env_vars(self.content.strip(), {})  # type: ignore[attr-defined]
 58            except (ValueError, ExpressionParseError) as error:
 59                raise ConfigValidationError(f"Invalid expression: {error}")
 60
 61    spec: TaskSpec
 62
 63    def _handle_run(
 64        self,
 65        context: "RunContext",
 66        env: "EnvVarsManager",
 67    ) -> int:
 68        from ..helpers.python import format_class
 69
 70        named_arg_values, extra_args = self.get_parsed_arguments(env)
 71        env.update(named_arg_values)
 72
 73        imports = self.spec.options.imports
 74
 75        expr, env_values = self.parse_content(named_arg_values, env, imports)
 76        argv = [
 77            self.spec.name,
 78            *(env.fill_template(token) for token in self.invocation[1:]),
 79        ]
 80
 81        script = [
 82            f"import sys;" f"sys.path.append('src');" f"sys.argv = {argv!r};",
 83            (f"import {', '.join(imports)}; " if imports else ""),
 84            f"{format_class(named_arg_values)}",
 85            f"{format_class(env_values, classname='__env')}",
 86            f"result = ({expr});",
 87            "print(result);",
 88        ]
 89
 90        falsy_return_code = int(self.spec.options.get("assert"))
 91        if falsy_return_code:
 92            script.append(f"exit(0 if result else {falsy_return_code});")
 93
 94        # Exactly which python executable to use is usually resolved by the executor
 95        # It's important that the script contains no line breaks to avoid issues on
 96        # windows
 97        cmd = ("python", "-c", "".join(script))
 98
 99        self._print_action(self.spec.content.strip(), context.dry)
100        return self._get_executor(context, env).execute(
101            cmd, use_exec=self.spec.options.use_exec
102        )
103
104    def parse_content(
105        self,
106        args: Optional[Dict[str, Any]],
107        env: "EnvVarsManager",
108        imports=Iterable[str],
109    ) -> Tuple[str, Dict[str, str]]:
110        """
111        Returns the expression to evaluate and the subset of env vars that it references
112
113        Templated referenced to env vars are resolve before parsing the expression.
114
115        Will raise an exception if the content contains invalid syntax or references
116        python variables that are not in scope.
117        """
118
119        from ..helpers.python import resolve_expression
120
121        expression, accessed_vars = self._substitute_env_vars(
122            self.spec.content.strip(), env.to_dict()
123        )
124
125        expression = resolve_expression(
126            source=expression,
127            arguments=set(args or tuple()),
128            allowed_vars={"sys", "__env", *imports},
129        )
130        # Strip out any new lines because they can be problematic on windows
131        expression = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", expression)
132        expression = re.sub(r"(\r\n|\r|\n)", " ", expression)
133
134        return expression, accessed_vars
135
136    @classmethod
137    def _substitute_env_vars(cls, content: str, env: Mapping[str, str]):
138        """
139        Substitute ${template} references to env vars with a reference to a python class
140        attribute like __env.var, and collect the accessed env vars so we can construct
141        that class with the required attributes later.
142        """
143
144        from ..env.template import SpyDict, apply_envvars_to_template
145
146        # Spy on access to the env, so that instead of replacing template ${keys} with
147        # the corresponding value, replace them with a python name and keep track of
148        # referenced env vars.
149        accessed_vars: Dict[str, str] = {}
150
151        def getitem_spy(obj: SpyDict, key: str, value: str):
152            accessed_vars[key] = value
153            return f"__env.{key}"
154
155        expression = apply_envvars_to_template(
156            content=content,
157            env=SpyDict(env, getitem_spy=getitem_spy),
158            require_braces=True,
159        )
160
161        return expression, accessed_vars

A task consisting of a python expression

content: str
def parse_content( self, args: Optional[Dict[str, Any]], env: poethepoet.env.manager.EnvVarsManager, imports=typing.Iterable[str]) -> Tuple[str, Dict[str, str]]:
104    def parse_content(
105        self,
106        args: Optional[Dict[str, Any]],
107        env: "EnvVarsManager",
108        imports=Iterable[str],
109    ) -> Tuple[str, Dict[str, str]]:
110        """
111        Returns the expression to evaluate and the subset of env vars that it references
112
113        Templated referenced to env vars are resolve before parsing the expression.
114
115        Will raise an exception if the content contains invalid syntax or references
116        python variables that are not in scope.
117        """
118
119        from ..helpers.python import resolve_expression
120
121        expression, accessed_vars = self._substitute_env_vars(
122            self.spec.content.strip(), env.to_dict()
123        )
124
125        expression = resolve_expression(
126            source=expression,
127            arguments=set(args or tuple()),
128            allowed_vars={"sys", "__env", *imports},
129        )
130        # Strip out any new lines because they can be problematic on windows
131        expression = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", expression)
132        expression = re.sub(r"(\r\n|\r|\n)", " ", expression)
133
134        return expression, accessed_vars

Returns the expression to evaluate and the subset of env vars that it references

Templated referenced to env vars are resolve before parsing the expression.

Will raise an exception if the content contains invalid syntax or references python variables that are not in scope.

class ExprTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
34    class TaskOptions(PoeTask.TaskOptions):
35        imports: Sequence[str] = tuple()
36        assert_: Union[bool, int] = False
37        use_exec: bool = False
38
39        def validate(self):
40            super().validate()
41            if self.use_exec and self.capture_stdout:
42                raise ConfigValidationError(
43                    "'use_exec' and 'capture_stdout'"
44                    " options cannot be both provided on the same task."
45                )

A special kind of config object that parses options ...

imports: Sequence[str] = ()
assert_: Union[bool, int] = False
use_exec: bool = False
def validate(self):
39        def validate(self):
40            super().validate()
41            if self.use_exec and self.capture_stdout:
42                raise ConfigValidationError(
43                    "'use_exec' and 'capture_stdout'"
44                    " options cannot be both provided on the same task."
45                )

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
class ExprTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
47    class TaskSpec(PoeTask.TaskSpec):
48        content: str
49        options: "ExprTask.TaskOptions"
50
51        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
52            """
53            Perform validations on this TaskSpec that apply to a specific task type
54            """
55            try:
56                # ruff: noqa: E501
57                self.task_type._substitute_env_vars(self.content.strip(), {})  # type: ignore[attr-defined]
58            except (ValueError, ExpressionParseError) as error:
59                raise ConfigValidationError(f"Invalid expression: {error}")
content: str
task_type: Type[poethepoet.task.base.PoeTask] = <class 'ExprTask'>
class RefTask(poethepoet.task.base.PoeTask):
 14class RefTask(PoeTask):
 15    """
 16    A task consisting of a reference to another task
 17    """
 18
 19    __key__ = "ref"
 20
 21    class TaskOptions(PoeTask.TaskOptions):
 22        def validate(self):
 23            """
 24            Validation rules that don't require any extra context go here.
 25            """
 26            if self.executor:
 27                raise ConfigValidationError(
 28                    "Option 'executor' cannot be set on a ref task"
 29                )
 30            if self.capture_stdout:
 31                raise ConfigValidationError(
 32                    "Option 'capture_stdout' cannot be set on a ref task"
 33                )
 34
 35    class TaskSpec(PoeTask.TaskSpec):
 36        content: str
 37        options: "RefTask.TaskOptions"
 38
 39        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
 40            """
 41            Perform validations on this TaskSpec that apply to a specific task type
 42            """
 43
 44            import shlex
 45
 46            task_name_ref = shlex.split(self.content)[0]
 47
 48            if task_name_ref not in task_specs:
 49                raise ConfigValidationError(
 50                    f"Includes reference to unknown task {task_name_ref!r}"
 51                )
 52
 53            if task_specs.get(task_name_ref).options.get("use_exec", False):
 54                raise ConfigValidationError(
 55                    f"Illegal reference to task with "
 56                    f"'use_exec' set to true: {task_name_ref!r}"
 57                )
 58
 59    spec: TaskSpec
 60
 61    def _handle_run(
 62        self,
 63        context: "RunContext",
 64        env: "EnvVarsManager",
 65    ) -> int:
 66        """
 67        Lookup and delegate to the referenced task
 68        """
 69        import shlex
 70
 71        named_arg_values, extra_args = self.get_parsed_arguments(env)
 72        env.update(named_arg_values)
 73
 74        ref_invocation = (
 75            *(
 76                env.fill_template(token)
 77                for token in shlex.split(env.fill_template(self.spec.content.strip()))
 78            ),
 79            *extra_args,
 80        )
 81
 82        task = self.ctx.specs.get(ref_invocation[0]).create_task(
 83            invocation=ref_invocation, ctx=TaskContext.from_task(self)
 84        )
 85
 86        if task.has_deps():
 87            return self._run_task_graph(task, context, env)
 88
 89        return task.run(context=context, parent_env=env)
 90
 91    def _run_task_graph(
 92        self,
 93        task: "PoeTask",
 94        context: "RunContext",
 95        env: "EnvVarsManager",
 96    ) -> int:
 97        from ..exceptions import ExecutionError
 98        from .graph import TaskExecutionGraph
 99
100        graph = TaskExecutionGraph(task, context)
101        plan = graph.get_execution_plan()
102        for stage in plan:
103            for stage_task in stage:
104                if stage_task == task:
105                    # The final sink task gets special treatment
106                    return task.run(context=context, parent_env=env)
107
108                task_result = stage_task.run(context=context)
109                if task_result:
110                    raise ExecutionError(
111                        f"Task graph aborted after failed task {stage_task.name!r}"
112                    )
113        return 0

A task consisting of a reference to another task

class RefTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
21    class TaskOptions(PoeTask.TaskOptions):
22        def validate(self):
23            """
24            Validation rules that don't require any extra context go here.
25            """
26            if self.executor:
27                raise ConfigValidationError(
28                    "Option 'executor' cannot be set on a ref task"
29                )
30            if self.capture_stdout:
31                raise ConfigValidationError(
32                    "Option 'capture_stdout' cannot be set on a ref task"
33                )

A special kind of config object that parses options ...

def validate(self):
22        def validate(self):
23            """
24            Validation rules that don't require any extra context go here.
25            """
26            if self.executor:
27                raise ConfigValidationError(
28                    "Option 'executor' cannot be set on a ref task"
29                )
30            if self.capture_stdout:
31                raise ConfigValidationError(
32                    "Option 'capture_stdout' cannot be set on a ref task"
33                )

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
class RefTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
35    class TaskSpec(PoeTask.TaskSpec):
36        content: str
37        options: "RefTask.TaskOptions"
38
39        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
40            """
41            Perform validations on this TaskSpec that apply to a specific task type
42            """
43
44            import shlex
45
46            task_name_ref = shlex.split(self.content)[0]
47
48            if task_name_ref not in task_specs:
49                raise ConfigValidationError(
50                    f"Includes reference to unknown task {task_name_ref!r}"
51                )
52
53            if task_specs.get(task_name_ref).options.get("use_exec", False):
54                raise ConfigValidationError(
55                    f"Illegal reference to task with "
56                    f"'use_exec' set to true: {task_name_ref!r}"
57                )
content: str
task_type: Type[poethepoet.task.base.PoeTask] = <class 'RefTask'>
class ScriptTask(poethepoet.task.base.PoeTask):
 16class ScriptTask(PoeTask):
 17    """
 18    A task consisting of a reference to a python script
 19    """
 20
 21    content: str
 22
 23    __key__ = "script"
 24
 25    class TaskOptions(PoeTask.TaskOptions):
 26        use_exec: bool = False
 27        print_result: bool = False
 28
 29        def validate(self):
 30            super().validate()
 31            if self.use_exec and self.capture_stdout:
 32                raise ConfigValidationError(
 33                    "'use_exec' and 'capture_stdout'"
 34                    " options cannot be both provided on the same task."
 35                )
 36
 37    class TaskSpec(PoeTask.TaskSpec):
 38        content: str
 39        options: "ScriptTask.TaskOptions"
 40
 41        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
 42            """
 43            Perform validations on this TaskSpec that apply to a specific task type
 44            """
 45            from ..helpers.python import parse_and_validate
 46
 47            try:
 48                target_module, target_ref = self.content.split(":", 1)
 49                if not target_ref.isidentifier():
 50                    parse_and_validate(target_ref, call_only=True)
 51            except (ValueError, ExpressionParseError):
 52                raise ConfigValidationError(
 53                    f"Invalid callable reference {self.content!r}\n"
 54                    "(expected something like `module:callable` or `module:callable()`)"
 55                )
 56
 57    spec: TaskSpec
 58
 59    def _handle_run(
 60        self,
 61        context: "RunContext",
 62        env: "EnvVarsManager",
 63    ) -> int:
 64        from ..helpers.python import format_class
 65
 66        named_arg_values, extra_args = self.get_parsed_arguments(env)
 67        env.update(named_arg_values)
 68
 69        # TODO: do something about extra_args, error?
 70
 71        target_module, function_call = self.parse_content(named_arg_values)
 72        function_ref = function_call.function_ref
 73
 74        argv = [
 75            self.name,
 76            *(env.fill_template(token) for token in self.invocation[1:]),
 77        ]
 78
 79        # TODO: check whether the project really does use src layout, and don't do
 80        #       sys.path.append('src') if it doesn't
 81
 82        has_dry_run_ref = "_dry_run" in function_call.referenced_globals
 83        dry_run = self.ctx.ui["dry_run"]
 84
 85        script = [
 86            "import asyncio,os,sys;",
 87            "from inspect import iscoroutinefunction as _c;",
 88            "from os import environ;",
 89            "from importlib import import_module as _i;",
 90            f"_dry_run = {'True' if dry_run else 'False'};" if has_dry_run_ref else "",
 91            f"sys.argv = {argv!r}; sys.path.append('src');",
 92            f"{format_class(named_arg_values)}",
 93            f"_m = _i('{target_module}');",
 94            f"_r = asyncio.run(_m.{function_call.expression}) if _c(_m.{function_ref})",
 95            f" else _m.{function_call.expression};",
 96        ]
 97
 98        if self.spec.options.get("print_result"):
 99            script.append("_r is not None and print(_r);")
100
101        # Exactly which python executable to use is usually resolved by the executor
102        # It's important that the script contains no line breaks to avoid issues on
103        # windows
104        cmd = ("python", "-c", "".join(script))
105
106        self._print_action(shlex.join(argv), context.dry)
107        return self._get_executor(
108            context, env, delegate_dry_run=has_dry_run_ref
109        ).execute(cmd, use_exec=self.spec.options.get("use_exec", False))
110
111    def parse_content(
112        self, args: Optional[Dict[str, Any]]
113    ) -> Tuple[str, "FunctionCall"]:
114        """
115        Returns the module to load, and the function call to execute.
116
117        Will raise an exception if the function call contains invalid syntax or
118        references variables that are not in scope.
119        """
120
121        from ..helpers.python import FunctionCall
122
123        try:
124            target_module, target_ref = self.spec.content.strip().split(":", 1)
125        except ValueError:
126            raise ExpressionParseError(
127                f"Invalid task content: {self.spec.content.strip()!r}"
128            )
129
130        if target_ref.isidentifier():
131            if args:
132                function_call = FunctionCall(f"{target_ref}(**({args}))", target_ref)
133            else:
134                function_call = FunctionCall(f"{target_ref}()", target_ref)
135        else:
136            function_call = FunctionCall.parse(
137                source=target_ref,
138                arguments=set(args or tuple()),
139                allowed_vars={"sys", "os", "environ", "_dry_run"},
140            )
141
142        return target_module, function_call

A task consisting of a reference to a python script

content: str
def parse_content( self, args: Optional[Dict[str, Any]]) -> Tuple[str, poethepoet.helpers.python.FunctionCall]:
111    def parse_content(
112        self, args: Optional[Dict[str, Any]]
113    ) -> Tuple[str, "FunctionCall"]:
114        """
115        Returns the module to load, and the function call to execute.
116
117        Will raise an exception if the function call contains invalid syntax or
118        references variables that are not in scope.
119        """
120
121        from ..helpers.python import FunctionCall
122
123        try:
124            target_module, target_ref = self.spec.content.strip().split(":", 1)
125        except ValueError:
126            raise ExpressionParseError(
127                f"Invalid task content: {self.spec.content.strip()!r}"
128            )
129
130        if target_ref.isidentifier():
131            if args:
132                function_call = FunctionCall(f"{target_ref}(**({args}))", target_ref)
133            else:
134                function_call = FunctionCall(f"{target_ref}()", target_ref)
135        else:
136            function_call = FunctionCall.parse(
137                source=target_ref,
138                arguments=set(args or tuple()),
139                allowed_vars={"sys", "os", "environ", "_dry_run"},
140            )
141
142        return target_module, function_call

Returns the module to load, and the function call to execute.

Will raise an exception if the function call contains invalid syntax or references variables that are not in scope.

class ScriptTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
25    class TaskOptions(PoeTask.TaskOptions):
26        use_exec: bool = False
27        print_result: bool = False
28
29        def validate(self):
30            super().validate()
31            if self.use_exec and self.capture_stdout:
32                raise ConfigValidationError(
33                    "'use_exec' and 'capture_stdout'"
34                    " options cannot be both provided on the same task."
35                )

A special kind of config object that parses options ...

use_exec: bool = False
print_result: bool = False
def validate(self):
29        def validate(self):
30            super().validate()
31            if self.use_exec and self.capture_stdout:
32                raise ConfigValidationError(
33                    "'use_exec' and 'capture_stdout'"
34                    " options cannot be both provided on the same task."
35                )

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
class ScriptTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
37    class TaskSpec(PoeTask.TaskSpec):
38        content: str
39        options: "ScriptTask.TaskOptions"
40
41        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
42            """
43            Perform validations on this TaskSpec that apply to a specific task type
44            """
45            from ..helpers.python import parse_and_validate
46
47            try:
48                target_module, target_ref = self.content.split(":", 1)
49                if not target_ref.isidentifier():
50                    parse_and_validate(target_ref, call_only=True)
51            except (ValueError, ExpressionParseError):
52                raise ConfigValidationError(
53                    f"Invalid callable reference {self.content!r}\n"
54                    "(expected something like `module:callable` or `module:callable()`)"
55                )
content: str
task_type: Type[poethepoet.task.base.PoeTask] = <class 'ScriptTask'>
class SequenceTask(poethepoet.task.base.PoeTask):
 26class SequenceTask(PoeTask):
 27    """
 28    A task consisting of a sequence of other tasks
 29    """
 30
 31    content: List[Union[str, Dict[str, Any]]]
 32
 33    __key__ = "sequence"
 34    __content_type__: ClassVar[Type] = list
 35
 36    class TaskOptions(PoeTask.TaskOptions):
 37        ignore_fail: Literal[True, False, "return_zero", "return_non_zero"] = False
 38        default_item_type: Optional[str] = None
 39
 40        def validate(self):
 41            """
 42            Validation rules that don't require any extra context go here.
 43            """
 44            super().validate()
 45            if self.default_item_type is not None and not PoeTask.is_task_type(
 46                self.default_item_type, content_type=str
 47            ):
 48                raise ConfigValidationError(
 49                    "Unsupported value for option `default_item_type`,\n"
 50                    f"Expected one of {PoeTask.get_task_types(content_type=str)}"
 51                )
 52
 53    class TaskSpec(PoeTask.TaskSpec):
 54        content: list
 55        options: "SequenceTask.TaskOptions"
 56        subtasks: Sequence[PoeTask.TaskSpec]
 57
 58        def __init__(
 59            self,
 60            name: str,
 61            task_def: Dict[str, Any],
 62            factory: "TaskSpecFactory",
 63            source: "ConfigPartition",
 64            parent: Optional["PoeTask.TaskSpec"] = None,
 65        ):
 66            super().__init__(name, task_def, factory, source, parent)
 67
 68            self.subtasks = []
 69            for index, sub_task_def in enumerate(task_def[SequenceTask.__key__]):
 70                if not isinstance(sub_task_def, (str, dict, list)):
 71                    raise ConfigValidationError(
 72                        f"Item #{index} in sequence task should be a value of "
 73                        "type: str | dict | list",
 74                        task_name=self.name,
 75                    )
 76
 77                subtask_name = (
 78                    sub_task_def
 79                    if (
 80                        isinstance(sub_task_def, str)
 81                        and (sub_task_def[0].isalpha() or sub_task_def[0] == "_")
 82                    )
 83                    else SequenceTask._subtask_name(name, index)
 84                )
 85                task_type_key = self.task_type.resolve_task_type(
 86                    sub_task_def,
 87                    factory.config,
 88                    array_item=task_def.get("default_item_type", True),
 89                )
 90
 91                try:
 92                    self.subtasks.append(
 93                        factory.get(
 94                            subtask_name, sub_task_def, task_type_key, parent=self
 95                        )
 96                    )
 97                except PoeException:
 98                    raise ConfigValidationError(
 99                        f"Failed to interpret subtask #{index} in sequence",
100                        task_name=self.name,
101                    )
102
103        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
104            """
105            Perform validations on this TaskSpec that apply to a specific task type
106            """
107            for index, subtask in enumerate(self.subtasks):
108                if subtask.args:
109                    raise ConfigValidationError(
110                        "Unsupported option 'args' for task declared inside sequence"
111                    )
112
113                subtask.validate(config, task_specs)
114
115    spec: TaskSpec
116
117    def __init__(
118        self,
119        spec: TaskSpec,
120        invocation: Tuple[str, ...],
121        ctx: TaskContext,
122        capture_stdout: bool = False,
123    ):
124        assert capture_stdout in (False, None)
125        super().__init__(spec, invocation, ctx)
126        self.subtasks = [
127            task_spec.create_task(
128                invocation=(self._subtask_name(task_spec.name, index),),
129                ctx=TaskContext.from_task(self),
130            )
131            for index, task_spec in enumerate(spec.subtasks)
132        ]
133
134    def _handle_run(
135        self,
136        context: "RunContext",
137        env: "EnvVarsManager",
138    ) -> int:
139        named_arg_values, extra_args = self.get_parsed_arguments(env)
140        env.update(named_arg_values)
141
142        if not named_arg_values and any(arg.strip() for arg in self.invocation[1:]):
143            raise PoeException(f"Sequence task {self.name!r} does not accept arguments")
144
145        if len(self.subtasks) > 1:
146            # Indicate on the global context that there are multiple stages
147            context.multistage = True
148
149        ignore_fail = self.spec.options.ignore_fail
150        non_zero_subtasks: List[str] = list()
151        for subtask in self.subtasks:
152            try:
153                task_result = subtask.run(context=context, parent_env=env)
154            except ExecutionError as error:
155                if ignore_fail:
156                    print("Warning:", error.msg)
157                    non_zero_subtasks.append(subtask.name)
158                else:
159                    raise
160
161            if task_result:
162                if not ignore_fail:
163                    raise ExecutionError(
164                        f"Sequence aborted after failed subtask {subtask.name!r}"
165                    )
166                non_zero_subtasks.append(subtask.name)
167
168        if non_zero_subtasks and ignore_fail == "return_non_zero":
169            plural = "s" if len(non_zero_subtasks) > 1 else ""
170            raise ExecutionError(
171                f"Subtask{plural} {', '.join(repr(st) for st in non_zero_subtasks)} "
172                "returned non-zero exit status"
173            )
174        return 0
175
176    @classmethod
177    def _subtask_name(cls, task_name: str, index: int):
178        return f"{task_name}[{index}]"

A task consisting of a sequence of other tasks

SequenceTask( spec: SequenceTask.TaskSpec, invocation: Tuple[str, ...], ctx: poethepoet.task.base.TaskContext, capture_stdout: bool = False)
117    def __init__(
118        self,
119        spec: TaskSpec,
120        invocation: Tuple[str, ...],
121        ctx: TaskContext,
122        capture_stdout: bool = False,
123    ):
124        assert capture_stdout in (False, None)
125        super().__init__(spec, invocation, ctx)
126        self.subtasks = [
127            task_spec.create_task(
128                invocation=(self._subtask_name(task_spec.name, index),),
129                ctx=TaskContext.from_task(self),
130            )
131            for index, task_spec in enumerate(spec.subtasks)
132        ]
content: List[Union[str, Dict[str, Any]]]
subtasks
class SequenceTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
36    class TaskOptions(PoeTask.TaskOptions):
37        ignore_fail: Literal[True, False, "return_zero", "return_non_zero"] = False
38        default_item_type: Optional[str] = None
39
40        def validate(self):
41            """
42            Validation rules that don't require any extra context go here.
43            """
44            super().validate()
45            if self.default_item_type is not None and not PoeTask.is_task_type(
46                self.default_item_type, content_type=str
47            ):
48                raise ConfigValidationError(
49                    "Unsupported value for option `default_item_type`,\n"
50                    f"Expected one of {PoeTask.get_task_types(content_type=str)}"
51                )

A special kind of config object that parses options ...

ignore_fail: Literal[True, False, 'return_zero', 'return_non_zero'] = False
default_item_type: Optional[str] = None
def validate(self):
40        def validate(self):
41            """
42            Validation rules that don't require any extra context go here.
43            """
44            super().validate()
45            if self.default_item_type is not None and not PoeTask.is_task_type(
46                self.default_item_type, content_type=str
47            ):
48                raise ConfigValidationError(
49                    "Unsupported value for option `default_item_type`,\n"
50                    f"Expected one of {PoeTask.get_task_types(content_type=str)}"
51                )

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
class SequenceTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
 53    class TaskSpec(PoeTask.TaskSpec):
 54        content: list
 55        options: "SequenceTask.TaskOptions"
 56        subtasks: Sequence[PoeTask.TaskSpec]
 57
 58        def __init__(
 59            self,
 60            name: str,
 61            task_def: Dict[str, Any],
 62            factory: "TaskSpecFactory",
 63            source: "ConfigPartition",
 64            parent: Optional["PoeTask.TaskSpec"] = None,
 65        ):
 66            super().__init__(name, task_def, factory, source, parent)
 67
 68            self.subtasks = []
 69            for index, sub_task_def in enumerate(task_def[SequenceTask.__key__]):
 70                if not isinstance(sub_task_def, (str, dict, list)):
 71                    raise ConfigValidationError(
 72                        f"Item #{index} in sequence task should be a value of "
 73                        "type: str | dict | list",
 74                        task_name=self.name,
 75                    )
 76
 77                subtask_name = (
 78                    sub_task_def
 79                    if (
 80                        isinstance(sub_task_def, str)
 81                        and (sub_task_def[0].isalpha() or sub_task_def[0] == "_")
 82                    )
 83                    else SequenceTask._subtask_name(name, index)
 84                )
 85                task_type_key = self.task_type.resolve_task_type(
 86                    sub_task_def,
 87                    factory.config,
 88                    array_item=task_def.get("default_item_type", True),
 89                )
 90
 91                try:
 92                    self.subtasks.append(
 93                        factory.get(
 94                            subtask_name, sub_task_def, task_type_key, parent=self
 95                        )
 96                    )
 97                except PoeException:
 98                    raise ConfigValidationError(
 99                        f"Failed to interpret subtask #{index} in sequence",
100                        task_name=self.name,
101                    )
102
103        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
104            """
105            Perform validations on this TaskSpec that apply to a specific task type
106            """
107            for index, subtask in enumerate(self.subtasks):
108                if subtask.args:
109                    raise ConfigValidationError(
110                        "Unsupported option 'args' for task declared inside sequence"
111                    )
112
113                subtask.validate(config, task_specs)
SequenceTask.TaskSpec( name: str, task_def: Dict[str, Any], factory: poethepoet.task.base.TaskSpecFactory, source: poethepoet.config.ConfigPartition, parent: Optional[poethepoet.task.base.PoeTask.TaskSpec] = None)
 58        def __init__(
 59            self,
 60            name: str,
 61            task_def: Dict[str, Any],
 62            factory: "TaskSpecFactory",
 63            source: "ConfigPartition",
 64            parent: Optional["PoeTask.TaskSpec"] = None,
 65        ):
 66            super().__init__(name, task_def, factory, source, parent)
 67
 68            self.subtasks = []
 69            for index, sub_task_def in enumerate(task_def[SequenceTask.__key__]):
 70                if not isinstance(sub_task_def, (str, dict, list)):
 71                    raise ConfigValidationError(
 72                        f"Item #{index} in sequence task should be a value of "
 73                        "type: str | dict | list",
 74                        task_name=self.name,
 75                    )
 76
 77                subtask_name = (
 78                    sub_task_def
 79                    if (
 80                        isinstance(sub_task_def, str)
 81                        and (sub_task_def[0].isalpha() or sub_task_def[0] == "_")
 82                    )
 83                    else SequenceTask._subtask_name(name, index)
 84                )
 85                task_type_key = self.task_type.resolve_task_type(
 86                    sub_task_def,
 87                    factory.config,
 88                    array_item=task_def.get("default_item_type", True),
 89                )
 90
 91                try:
 92                    self.subtasks.append(
 93                        factory.get(
 94                            subtask_name, sub_task_def, task_type_key, parent=self
 95                        )
 96                    )
 97                except PoeException:
 98                    raise ConfigValidationError(
 99                        f"Failed to interpret subtask #{index} in sequence",
100                        task_name=self.name,
101                    )
content: list
task_type: Type[poethepoet.task.base.PoeTask] = <class 'SequenceTask'>
class ShellTask(poethepoet.task.base.PoeTask):
 20class ShellTask(PoeTask):
 21    """
 22    A task consisting of a reference to a shell command
 23    """
 24
 25    content: str
 26
 27    __key__ = "shell"
 28
 29    class TaskOptions(PoeTask.TaskOptions):
 30        interpreter: Optional[Union[str, list]] = None
 31
 32        def validate(self):
 33            super().validate()
 34
 35            from ..config import KNOWN_SHELL_INTERPRETERS as VALID_INTERPRETERS
 36
 37            if (
 38                isinstance(self.interpreter, str)
 39                and self.interpreter not in VALID_INTERPRETERS
 40            ):
 41                raise ConfigValidationError(
 42                    "Invalid value for option 'interpreter',\n"
 43                    f"Expected one of {VALID_INTERPRETERS}"
 44                )
 45
 46            if isinstance(self.interpreter, list):
 47                if len(self.interpreter) == 0:
 48                    raise ConfigValidationError(
 49                        "Invalid value for option 'interpreter',\n"
 50                        "Expected at least one item in list."
 51                    )
 52                for item in self.interpreter:
 53                    if item not in VALID_INTERPRETERS:
 54                        raise ConfigValidationError(
 55                            f"Invalid item {item!r} in option 'interpreter',\n"
 56                            f"Expected one of {VALID_INTERPRETERS!r}"
 57                        )
 58
 59    class TaskSpec(PoeTask.TaskSpec):
 60        content: str
 61        options: "ShellTask.TaskOptions"
 62
 63    spec: TaskSpec
 64
 65    def _handle_run(
 66        self,
 67        context: "RunContext",
 68        env: "EnvVarsManager",
 69    ) -> int:
 70        named_arg_values, extra_args = self.get_parsed_arguments(env)
 71        env.update(named_arg_values)
 72
 73        if not named_arg_values and any(arg.strip() for arg in self.invocation[1:]):
 74            raise PoeException(
 75                f"Shell task {self.spec.name!r} does not accept arguments"
 76            )
 77
 78        interpreter_cmd = self.resolve_interpreter_cmd()
 79        if not interpreter_cmd:
 80            config_value = self._get_interpreter_config()
 81            message = (
 82                f"Couldn't locate interpreter executable for {config_value!r} to run "
 83                "shell task. "
 84            )
 85            if self._is_windows and set(config_value).issubset({"posix", "bash"}):
 86                message += "Installing Git Bash or using WSL should fix this."
 87            else:
 88                message += "Some dependencies may be missing from your system."
 89            raise PoeException(message)
 90
 91        content = _unindent_code(self.spec.content).rstrip()
 92
 93        self._print_action(content, context.dry)
 94
 95        return self._get_executor(context, env).execute(
 96            interpreter_cmd, input=content.encode()
 97        )
 98
 99    def _get_interpreter_config(self) -> Tuple[str, ...]:
100        result: Union[str, Tuple[str, ...]] = self.spec.options.get(
101            "interpreter", self.ctx.config.shell_interpreter
102        )
103        if isinstance(result, str):
104            return (result,)
105        return tuple(result)
106
107    def resolve_interpreter_cmd(self) -> Optional[List[str]]:
108        """
109        Return a formatted command for the first specified interpreter that can be
110        located.
111        """
112        for item in self._get_interpreter_config():
113            executable = self._locate_interpreter(item)
114            if executable is None:
115                continue
116
117            if item in ("pwsh", "powershell"):
118                return [executable, "-NoLogo", "-Command", "-"]
119
120            return [executable]
121
122        return None
123
124    def _locate_interpreter(self, interpreter: str) -> Optional[str]:
125        from shutil import which
126
127        result = None
128        prog_files = environ.get("PROGRAMFILES", "C:\\Program Files")
129
130        # Try use $SHELL from the environment as a hint
131        shell_var = environ.get("SHELL", "")
132        if shell_var.endswith(f"/{interpreter}") and which(shell_var) == shell_var:
133            result = shell_var
134
135        elif interpreter == "posix":
136            # look for any known posix shell
137            result = (
138                self._locate_interpreter("sh")
139                or self._locate_interpreter("bash")
140                or self._locate_interpreter("zsh")
141            )
142
143        elif interpreter == "sh":
144            result = which("sh") or which("/bin/sh")
145
146            # Specifically look for git sh on windows
147            if result is None and self._is_windows:
148                result = which(f"{prog_files}\\Git\\bin\\sh.exe")
149
150        elif interpreter == "bash":
151            if self._is_windows:
152                # Specifically look for git bash on windows as the preferred option
153                # Don't trust bash from the path because it might be a useless decoy
154                result = (
155                    which(f"{prog_files}\\Git\\bin\\bash.exe")
156                    or which("/bin/bash")
157                    or which("bash")
158                )
159            else:
160                result = which("bash") or which("/bin/bash")
161
162        elif interpreter == "zsh":
163            result = which("zsh") or which("/bin/zsh")
164
165        elif interpreter == "fish":
166            result = which("fish") or which("/bin/fish")
167
168        elif interpreter in ("pwsh", "powershell"):
169            # Look for the pwsh executable and verify the version matches
170            result = (
171                which("pwsh")
172                or which(f"{prog_files}\\PowerShell\\7\\pwsh.exe")
173                or which(f"{prog_files}\\PowerShell\\6\\pwsh.exe")
174            )
175
176            if result is None and interpreter == "powershell" and self._is_windows:
177                # Look for older versions of powershell
178                result = which("powershell") or which(
179                    environ.get("WINDIR", "C:\\Windows")
180                    + "\\System32\\WindowsPowerShell\\v1.0\\powershell.EXE"
181                )
182
183        elif interpreter == "python":
184            # Exactly which python executable to use is usually resolved by the executor
185            result = "python"
186
187        return result

A task consisting of a reference to a shell command

content: str
def resolve_interpreter_cmd(self) -> Optional[List[str]]:
107    def resolve_interpreter_cmd(self) -> Optional[List[str]]:
108        """
109        Return a formatted command for the first specified interpreter that can be
110        located.
111        """
112        for item in self._get_interpreter_config():
113            executable = self._locate_interpreter(item)
114            if executable is None:
115                continue
116
117            if item in ("pwsh", "powershell"):
118                return [executable, "-NoLogo", "-Command", "-"]
119
120            return [executable]
121
122        return None

Return a formatted command for the first specified interpreter that can be located.

class ShellTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
29    class TaskOptions(PoeTask.TaskOptions):
30        interpreter: Optional[Union[str, list]] = None
31
32        def validate(self):
33            super().validate()
34
35            from ..config import KNOWN_SHELL_INTERPRETERS as VALID_INTERPRETERS
36
37            if (
38                isinstance(self.interpreter, str)
39                and self.interpreter not in VALID_INTERPRETERS
40            ):
41                raise ConfigValidationError(
42                    "Invalid value for option 'interpreter',\n"
43                    f"Expected one of {VALID_INTERPRETERS}"
44                )
45
46            if isinstance(self.interpreter, list):
47                if len(self.interpreter) == 0:
48                    raise ConfigValidationError(
49                        "Invalid value for option 'interpreter',\n"
50                        "Expected at least one item in list."
51                    )
52                for item in self.interpreter:
53                    if item not in VALID_INTERPRETERS:
54                        raise ConfigValidationError(
55                            f"Invalid item {item!r} in option 'interpreter',\n"
56                            f"Expected one of {VALID_INTERPRETERS!r}"
57                        )

A special kind of config object that parses options ...

interpreter: Union[str, list, NoneType] = None
def validate(self):
32        def validate(self):
33            super().validate()
34
35            from ..config import KNOWN_SHELL_INTERPRETERS as VALID_INTERPRETERS
36
37            if (
38                isinstance(self.interpreter, str)
39                and self.interpreter not in VALID_INTERPRETERS
40            ):
41                raise ConfigValidationError(
42                    "Invalid value for option 'interpreter',\n"
43                    f"Expected one of {VALID_INTERPRETERS}"
44                )
45
46            if isinstance(self.interpreter, list):
47                if len(self.interpreter) == 0:
48                    raise ConfigValidationError(
49                        "Invalid value for option 'interpreter',\n"
50                        "Expected at least one item in list."
51                    )
52                for item in self.interpreter:
53                    if item not in VALID_INTERPRETERS:
54                        raise ConfigValidationError(
55                            f"Invalid item {item!r} in option 'interpreter',\n"
56                            f"Expected one of {VALID_INTERPRETERS!r}"
57                        )

Validation rules that don't require any extra context go here.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
normalize
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
class ShellTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
59    class TaskSpec(PoeTask.TaskSpec):
60        content: str
61        options: "ShellTask.TaskOptions"
content: str
task_type: Type[poethepoet.task.base.PoeTask] = <class 'ShellTask'>
class SwitchTask(poethepoet.task.base.PoeTask):
 29class SwitchTask(PoeTask):
 30    """
 31    A task that runs one of several `case` subtasks depending on the output of a
 32    `switch` subtask.
 33    """
 34
 35    __key__ = "switch"
 36    __content_type__: ClassVar[Type] = list
 37
 38    class TaskOptions(PoeTask.TaskOptions):
 39        control: Union[str, dict]
 40        default: Literal["pass", "fail"] = "fail"
 41
 42        @classmethod
 43        def normalize(
 44            cls,
 45            config: Any,
 46            strict: bool = True,
 47        ):
 48            """
 49            Perform validations that require access to to the raw config.
 50            """
 51            if strict and isinstance(config, dict):
 52                # Subtasks may not declare certain options
 53                for subtask_def in config.get("switch", tuple()):
 54                    for banned_option in SUBTASK_OPTIONS_BLOCKLIST:
 55                        if banned_option in subtask_def:
 56                            if "case" not in subtask_def:
 57                                raise ConfigValidationError(
 58                                    "Default case includes incompatible option "
 59                                    f"{banned_option!r}"
 60                                )
 61                            raise ConfigValidationError(
 62                                f"Case {subtask_def.get('case')!r} includes "
 63                                f"incompatible option {banned_option!r}"
 64                            )
 65
 66            return super().normalize(config, strict)
 67
 68    class TaskSpec(PoeTask.TaskSpec):
 69        control_task_spec: PoeTask.TaskSpec
 70        case_task_specs: Tuple[Tuple[Tuple[Any, ...], PoeTask.TaskSpec], ...]
 71        options: "SwitchTask.TaskOptions"
 72
 73        def __init__(
 74            self,
 75            name: str,
 76            task_def: Dict[str, Any],
 77            factory: "TaskSpecFactory",
 78            source: "ConfigPartition",
 79            parent: Optional["PoeTask.TaskSpec"] = None,
 80        ):
 81            super().__init__(name, task_def, factory, source, parent)
 82
 83            switch_args = task_def.get("args")
 84            control_task_def = task_def["control"]
 85
 86            if switch_args:
 87                if isinstance(control_task_def, str):
 88                    control_task_def = {
 89                        factory.config.default_task_type: control_task_def
 90                    }
 91                control_task_def = dict(control_task_def, args=switch_args)
 92
 93            self.control_task_spec = factory.get(
 94                task_name=f"{name}[__control__]", task_def=control_task_def, parent=self
 95            )
 96
 97            case_task_specs = []
 98            for switch_item in task_def["switch"]:
 99                case_task_def = dict(switch_item, args=switch_args)
100                case = case_task_def.pop("case", DEFAULT_CASE)
101                case_tuple = tuple(case) if isinstance(case, list) else (case,)
102                case_task_index = ",".join(str(value) for value in case_tuple)
103                case_task_specs.append(
104                    (
105                        case_tuple,
106                        factory.get(
107                            task_name=f"{name}[{case_task_index}]",
108                            task_def=case_task_def,
109                            parent=self,
110                        ),
111                    )
112                )
113
114            self.case_task_specs = tuple(case_task_specs)
115
116        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
117            from collections import defaultdict
118
119            allowed_control_task_types = ("expr", "cmd", "script")
120            if (
121                self.control_task_spec.task_type.__key__
122                not in allowed_control_task_types
123            ):
124                raise ConfigValidationError(
125                    f"Control task must have a type that is one of "
126                    f"{allowed_control_task_types!r}"
127                )
128
129            cases: MutableMapping[Any, int] = defaultdict(int)
130            for case_keys, case_task_spec in self.case_task_specs:
131                for case_key in case_keys:
132                    cases[case_key] += 1
133
134            # Ensure case keys don't overlap (and only one default case)
135            for case, count in cases.items():
136                if count > 1:
137                    if case is DEFAULT_CASE:
138                        raise ConfigValidationError(
139                            "Switch array includes more than one default case"
140                        )
141                    raise ConfigValidationError(
142                        f"Switch array includes more than one case for {case!r}"
143                    )
144
145            if self.options.default != "fail" and DEFAULT_CASE in cases:
146                raise ConfigValidationError(
147                    "switch tasks should not declare both a default case and the "
148                    "'default' option"
149                )
150
151            # Validate subtask specs
152            self.control_task_spec.validate(config, task_specs)
153            for _, case_task_spec in self.case_task_specs:
154                case_task_spec.validate(config, task_specs)
155
156    spec: TaskSpec
157    control_task: PoeTask
158    switch_tasks: Dict[str, PoeTask]
159
160    def __init__(
161        self,
162        spec: TaskSpec,
163        invocation: Tuple[str, ...],
164        ctx: TaskContext,
165        capture_stdout: bool = False,
166    ):
167        super().__init__(spec, invocation, ctx, capture_stdout)
168
169        control_task_name = f"{spec.name}[__control__]"
170        control_invocation: Tuple[str, ...] = (control_task_name,)
171        options = self.spec.options
172        if options.get("args"):
173            control_invocation = (*control_invocation, *invocation[1:])
174
175        self.control_task = self.spec.control_task_spec.create_task(
176            invocation=control_invocation,
177            ctx=TaskContext.from_task(self),
178            capture_stdout=True,
179        )
180
181        self.switch_tasks = {}
182        for case_keys, case_spec in spec.case_task_specs:
183            task_invocation: Tuple[str, ...] = (f"{spec.name}[{','.join(case_keys)}]",)
184            if options.get("args"):
185                task_invocation = (*task_invocation, *invocation[1:])
186
187            case_task = case_spec.create_task(
188                invocation=task_invocation,
189                ctx=TaskContext.from_task(self),
190                capture_stdout=self.capture_stdout,
191            )
192            for case_key in case_keys:
193                self.switch_tasks[case_key] = case_task
194
195    def _handle_run(
196        self,
197        context: "RunContext",
198        env: "EnvVarsManager",
199    ) -> int:
200        named_arg_values, extra_args = self.get_parsed_arguments(env)
201        env.update(named_arg_values)
202
203        if not named_arg_values and any(arg.strip() for arg in self.invocation[1:]):
204            raise PoeException(f"Switch task {self.name!r} does not accept arguments")
205
206        # Indicate on the global context that there are multiple stages to this task
207        context.multistage = True
208
209        task_result = self.control_task.run(context=context, parent_env=env)
210        if task_result:
211            raise ExecutionError(
212                f"Switch task {self.name!r} aborted after failed control task"
213            )
214
215        if context.dry:
216            self._print_action(
217                "unresolved case for switch task", dry=True, unresolved=True
218            )
219            return 0
220
221        control_task_output = context.get_task_output(self.control_task.invocation)
222        case_task = self.switch_tasks.get(
223            control_task_output, self.switch_tasks.get(DEFAULT_CASE)
224        )
225
226        if case_task is None:
227            if self.spec.options.default == "pass":
228                return 0
229            raise ExecutionError(
230                f"Control value {control_task_output!r} did not match any cases in "
231                f"switch task {self.name!r}."
232            )
233
234        result = case_task.run(context=context, parent_env=env)
235
236        if self.capture_stdout is True:
237            # The executor saved output for the case task, but we need it to be
238            # registered for this switch task as well
239            context.save_task_output(
240                self.invocation, context.get_task_output(case_task.invocation).encode()
241            )
242
243        return result

A task that runs one of several case subtasks depending on the output of a switch subtask.

SwitchTask( spec: SwitchTask.TaskSpec, invocation: Tuple[str, ...], ctx: poethepoet.task.base.TaskContext, capture_stdout: bool = False)
160    def __init__(
161        self,
162        spec: TaskSpec,
163        invocation: Tuple[str, ...],
164        ctx: TaskContext,
165        capture_stdout: bool = False,
166    ):
167        super().__init__(spec, invocation, ctx, capture_stdout)
168
169        control_task_name = f"{spec.name}[__control__]"
170        control_invocation: Tuple[str, ...] = (control_task_name,)
171        options = self.spec.options
172        if options.get("args"):
173            control_invocation = (*control_invocation, *invocation[1:])
174
175        self.control_task = self.spec.control_task_spec.create_task(
176            invocation=control_invocation,
177            ctx=TaskContext.from_task(self),
178            capture_stdout=True,
179        )
180
181        self.switch_tasks = {}
182        for case_keys, case_spec in spec.case_task_specs:
183            task_invocation: Tuple[str, ...] = (f"{spec.name}[{','.join(case_keys)}]",)
184            if options.get("args"):
185                task_invocation = (*task_invocation, *invocation[1:])
186
187            case_task = case_spec.create_task(
188                invocation=task_invocation,
189                ctx=TaskContext.from_task(self),
190                capture_stdout=self.capture_stdout,
191            )
192            for case_key in case_keys:
193                self.switch_tasks[case_key] = case_task
switch_tasks: Dict[str, poethepoet.task.base.PoeTask]
class SwitchTask.TaskOptions(poethepoet.task.base.PoeTask.TaskOptions):
38    class TaskOptions(PoeTask.TaskOptions):
39        control: Union[str, dict]
40        default: Literal["pass", "fail"] = "fail"
41
42        @classmethod
43        def normalize(
44            cls,
45            config: Any,
46            strict: bool = True,
47        ):
48            """
49            Perform validations that require access to to the raw config.
50            """
51            if strict and isinstance(config, dict):
52                # Subtasks may not declare certain options
53                for subtask_def in config.get("switch", tuple()):
54                    for banned_option in SUBTASK_OPTIONS_BLOCKLIST:
55                        if banned_option in subtask_def:
56                            if "case" not in subtask_def:
57                                raise ConfigValidationError(
58                                    "Default case includes incompatible option "
59                                    f"{banned_option!r}"
60                                )
61                            raise ConfigValidationError(
62                                f"Case {subtask_def.get('case')!r} includes "
63                                f"incompatible option {banned_option!r}"
64                            )
65
66            return super().normalize(config, strict)

A special kind of config object that parses options ...

control: Union[str, dict]
default: Literal['pass', 'fail'] = 'fail'
@classmethod
def normalize(cls, config: Any, strict: bool = True):
42        @classmethod
43        def normalize(
44            cls,
45            config: Any,
46            strict: bool = True,
47        ):
48            """
49            Perform validations that require access to to the raw config.
50            """
51            if strict and isinstance(config, dict):
52                # Subtasks may not declare certain options
53                for subtask_def in config.get("switch", tuple()):
54                    for banned_option in SUBTASK_OPTIONS_BLOCKLIST:
55                        if banned_option in subtask_def:
56                            if "case" not in subtask_def:
57                                raise ConfigValidationError(
58                                    "Default case includes incompatible option "
59                                    f"{banned_option!r}"
60                                )
61                            raise ConfigValidationError(
62                                f"Case {subtask_def.get('case')!r} includes "
63                                f"incompatible option {banned_option!r}"
64                            )
65
66            return super().normalize(config, strict)

Perform validations that require access to to the raw config.

Inherited Members
poethepoet.options.PoeOptions
PoeOptions
parse
get
update
type_of
get_annotation
get_fields
poethepoet.task.base.PoeTask.TaskOptions
args
capture_stdout
cwd
deps
env
envfile
executor
help
uses
validate
class SwitchTask.TaskSpec(poethepoet.task.base.PoeTask.TaskSpec):
 68    class TaskSpec(PoeTask.TaskSpec):
 69        control_task_spec: PoeTask.TaskSpec
 70        case_task_specs: Tuple[Tuple[Tuple[Any, ...], PoeTask.TaskSpec], ...]
 71        options: "SwitchTask.TaskOptions"
 72
 73        def __init__(
 74            self,
 75            name: str,
 76            task_def: Dict[str, Any],
 77            factory: "TaskSpecFactory",
 78            source: "ConfigPartition",
 79            parent: Optional["PoeTask.TaskSpec"] = None,
 80        ):
 81            super().__init__(name, task_def, factory, source, parent)
 82
 83            switch_args = task_def.get("args")
 84            control_task_def = task_def["control"]
 85
 86            if switch_args:
 87                if isinstance(control_task_def, str):
 88                    control_task_def = {
 89                        factory.config.default_task_type: control_task_def
 90                    }
 91                control_task_def = dict(control_task_def, args=switch_args)
 92
 93            self.control_task_spec = factory.get(
 94                task_name=f"{name}[__control__]", task_def=control_task_def, parent=self
 95            )
 96
 97            case_task_specs = []
 98            for switch_item in task_def["switch"]:
 99                case_task_def = dict(switch_item, args=switch_args)
100                case = case_task_def.pop("case", DEFAULT_CASE)
101                case_tuple = tuple(case) if isinstance(case, list) else (case,)
102                case_task_index = ",".join(str(value) for value in case_tuple)
103                case_task_specs.append(
104                    (
105                        case_tuple,
106                        factory.get(
107                            task_name=f"{name}[{case_task_index}]",
108                            task_def=case_task_def,
109                            parent=self,
110                        ),
111                    )
112                )
113
114            self.case_task_specs = tuple(case_task_specs)
115
116        def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"):
117            from collections import defaultdict
118
119            allowed_control_task_types = ("expr", "cmd", "script")
120            if (
121                self.control_task_spec.task_type.__key__
122                not in allowed_control_task_types
123            ):
124                raise ConfigValidationError(
125                    f"Control task must have a type that is one of "
126                    f"{allowed_control_task_types!r}"
127                )
128
129            cases: MutableMapping[Any, int] = defaultdict(int)
130            for case_keys, case_task_spec in self.case_task_specs:
131                for case_key in case_keys:
132                    cases[case_key] += 1
133
134            # Ensure case keys don't overlap (and only one default case)
135            for case, count in cases.items():
136                if count > 1:
137                    if case is DEFAULT_CASE:
138                        raise ConfigValidationError(
139                            "Switch array includes more than one default case"
140                        )
141                    raise ConfigValidationError(
142                        f"Switch array includes more than one case for {case!r}"
143                    )
144
145            if self.options.default != "fail" and DEFAULT_CASE in cases:
146                raise ConfigValidationError(
147                    "switch tasks should not declare both a default case and the "
148                    "'default' option"
149                )
150
151            # Validate subtask specs
152            self.control_task_spec.validate(config, task_specs)
153            for _, case_task_spec in self.case_task_specs:
154                case_task_spec.validate(config, task_specs)
SwitchTask.TaskSpec( name: str, task_def: Dict[str, Any], factory: poethepoet.task.base.TaskSpecFactory, source: poethepoet.config.ConfigPartition, parent: Optional[poethepoet.task.base.PoeTask.TaskSpec] = None)
 73        def __init__(
 74            self,
 75            name: str,
 76            task_def: Dict[str, Any],
 77            factory: "TaskSpecFactory",
 78            source: "ConfigPartition",
 79            parent: Optional["PoeTask.TaskSpec"] = None,
 80        ):
 81            super().__init__(name, task_def, factory, source, parent)
 82
 83            switch_args = task_def.get("args")
 84            control_task_def = task_def["control"]
 85
 86            if switch_args:
 87                if isinstance(control_task_def, str):
 88                    control_task_def = {
 89                        factory.config.default_task_type: control_task_def
 90                    }
 91                control_task_def = dict(control_task_def, args=switch_args)
 92
 93            self.control_task_spec = factory.get(
 94                task_name=f"{name}[__control__]", task_def=control_task_def, parent=self
 95            )
 96
 97            case_task_specs = []
 98            for switch_item in task_def["switch"]:
 99                case_task_def = dict(switch_item, args=switch_args)
100                case = case_task_def.pop("case", DEFAULT_CASE)
101                case_tuple = tuple(case) if isinstance(case, list) else (case,)
102                case_task_index = ",".join(str(value) for value in case_tuple)
103                case_task_specs.append(
104                    (
105                        case_tuple,
106                        factory.get(
107                            task_name=f"{name}[{case_task_index}]",
108                            task_def=case_task_def,
109                            parent=self,
110                        ),
111                    )
112                )
113
114            self.case_task_specs = tuple(case_task_specs)
case_task_specs: Tuple[Tuple[Tuple[Any, ...], poethepoet.task.base.PoeTask.TaskSpec], ...]
task_type: Type[poethepoet.task.base.PoeTask] = <class 'SwitchTask'>