poethepoet.task
1from .cmd import CmdTask 2from .expr import ExprTask 3from .ref import RefTask 4from .script import ScriptTask 5from .sequence import SequenceTask 6from .shell import ShellTask 7from .switch import SwitchTask 8 9__all__ = [ 10 "CmdTask", 11 "ExprTask", 12 "RefTask", 13 "ScriptTask", 14 "SequenceTask", 15 "ShellTask", 16 "SwitchTask", 17]
15class CmdTask(PoeTask): 16 """ 17 A task consisting of a reference to a shell command 18 """ 19 20 __key__ = "cmd" 21 22 class TaskOptions(PoeTask.TaskOptions): 23 use_exec: bool = False 24 25 def validate(self): 26 """ 27 Validation rules that don't require any extra context go here. 28 """ 29 super().validate() 30 if self.use_exec and self.capture_stdout: 31 raise ConfigValidationError( 32 "'use_exec' and 'capture_stdout'" 33 " options cannot be both provided on the same task." 34 ) 35 36 class TaskSpec(PoeTask.TaskSpec): 37 content: str 38 options: "CmdTask.TaskOptions" 39 40 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 41 """ 42 Perform validations on this TaskSpec that apply to a specific task type 43 """ 44 if not self.content.strip(): 45 raise ConfigValidationError("Task has no content") 46 47 spec: TaskSpec 48 49 def _handle_run( 50 self, 51 context: "RunContext", 52 env: "EnvVarsManager", 53 ) -> int: 54 named_arg_values, extra_args = self.get_parsed_arguments(env) 55 env.update(named_arg_values) 56 57 cmd = (*self._resolve_commandline(context, env), *extra_args) 58 59 self._print_action(shlex.join(cmd), context.dry) 60 61 return self._get_executor(context, env).execute( 62 cmd, use_exec=self.spec.options.get("use_exec", False) 63 ) 64 65 def _resolve_commandline(self, context: "RunContext", env: "EnvVarsManager"): 66 from ..helpers.command import parse_poe_cmd, resolve_command_tokens 67 from ..helpers.command.ast_core import ParseError 68 69 try: 70 command_lines = parse_poe_cmd(self.spec.content).command_lines 71 except ParseError as error: 72 raise PoeException( 73 f"Couldn't parse command line for task {self.name!r}: {error.args[0]}" 74 ) from error 75 76 if not command_lines: 77 raise PoeException( 78 f"Invalid cmd task {self.name!r} does not include any command lines" 79 ) 80 if any(line._terminator == ";" for line in command_lines[:-1]): 81 # lines terminated by a line break or comment are implicitly joined 82 raise PoeException( 83 f"Invalid cmd task {self.name!r} includes multiple command lines" 84 ) 85 86 working_dir = self.get_working_dir(env) 87 88 result = [] 89 for cmd_token, has_glob in resolve_command_tokens(command_lines, env): 90 if has_glob: 91 # Resolve glob pattern from the working directory 92 result.extend([str(match) for match in working_dir.glob(cmd_token)]) 93 else: 94 result.append(cmd_token) 95 96 return result
A task consisting of a reference to a shell command
22 class TaskOptions(PoeTask.TaskOptions): 23 use_exec: bool = False 24 25 def validate(self): 26 """ 27 Validation rules that don't require any extra context go here. 28 """ 29 super().validate() 30 if self.use_exec and self.capture_stdout: 31 raise ConfigValidationError( 32 "'use_exec' and 'capture_stdout'" 33 " options cannot be both provided on the same task." 34 )
A special kind of config object that parses options ...
25 def validate(self): 26 """ 27 Validation rules that don't require any extra context go here. 28 """ 29 super().validate() 30 if self.use_exec and self.capture_stdout: 31 raise ConfigValidationError( 32 "'use_exec' and 'capture_stdout'" 33 " options cannot be both provided on the same task." 34 )
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
36 class TaskSpec(PoeTask.TaskSpec): 37 content: str 38 options: "CmdTask.TaskOptions" 39 40 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 41 """ 42 Perform validations on this TaskSpec that apply to a specific task type 43 """ 44 if not self.content.strip(): 45 raise ConfigValidationError("Task has no content")
Inherited Members
25class ExprTask(PoeTask): 26 """ 27 A task consisting of a python expression 28 """ 29 30 content: str 31 32 __key__ = "expr" 33 34 class TaskOptions(PoeTask.TaskOptions): 35 imports: Sequence[str] = tuple() 36 assert_: Union[bool, int] = False 37 use_exec: bool = False 38 39 def validate(self): 40 super().validate() 41 if self.use_exec and self.capture_stdout: 42 raise ConfigValidationError( 43 "'use_exec' and 'capture_stdout'" 44 " options cannot be both provided on the same task." 45 ) 46 47 class TaskSpec(PoeTask.TaskSpec): 48 content: str 49 options: "ExprTask.TaskOptions" 50 51 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 52 """ 53 Perform validations on this TaskSpec that apply to a specific task type 54 """ 55 try: 56 # ruff: noqa: E501 57 self.task_type._substitute_env_vars(self.content.strip(), {}) # type: ignore[attr-defined] 58 except (ValueError, ExpressionParseError) as error: 59 raise ConfigValidationError(f"Invalid expression: {error}") 60 61 spec: TaskSpec 62 63 def _handle_run( 64 self, 65 context: "RunContext", 66 env: "EnvVarsManager", 67 ) -> int: 68 from ..helpers.python import format_class 69 70 named_arg_values, extra_args = self.get_parsed_arguments(env) 71 env.update(named_arg_values) 72 73 imports = self.spec.options.imports 74 75 expr, env_values = self.parse_content(named_arg_values, env, imports) 76 argv = [ 77 self.spec.name, 78 *(env.fill_template(token) for token in self.invocation[1:]), 79 ] 80 81 script = [ 82 f"import sys;" f"sys.path.append('src');" f"sys.argv = {argv!r};", 83 (f"import {', '.join(imports)}; " if imports else ""), 84 f"{format_class(named_arg_values)}", 85 f"{format_class(env_values, classname='__env')}", 86 f"result = ({expr});", 87 "print(result);", 88 ] 89 90 falsy_return_code = int(self.spec.options.get("assert")) 91 if falsy_return_code: 92 script.append(f"exit(0 if result else {falsy_return_code});") 93 94 # Exactly which python executable to use is usually resolved by the executor 95 # It's important that the script contains no line breaks to avoid issues on 96 # windows 97 cmd = ("python", "-c", "".join(script)) 98 99 self._print_action(self.spec.content.strip(), context.dry) 100 return self._get_executor(context, env).execute( 101 cmd, use_exec=self.spec.options.use_exec 102 ) 103 104 def parse_content( 105 self, 106 args: Optional[Dict[str, Any]], 107 env: "EnvVarsManager", 108 imports=Iterable[str], 109 ) -> Tuple[str, Dict[str, str]]: 110 """ 111 Returns the expression to evaluate and the subset of env vars that it references 112 113 Templated referenced to env vars are resolve before parsing the expression. 114 115 Will raise an exception if the content contains invalid syntax or references 116 python variables that are not in scope. 117 """ 118 119 from ..helpers.python import resolve_expression 120 121 expression, accessed_vars = self._substitute_env_vars( 122 self.spec.content.strip(), env.to_dict() 123 ) 124 125 expression = resolve_expression( 126 source=expression, 127 arguments=set(args or tuple()), 128 allowed_vars={"sys", "__env", *imports}, 129 ) 130 # Strip out any new lines because they can be problematic on windows 131 expression = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", expression) 132 expression = re.sub(r"(\r\n|\r|\n)", " ", expression) 133 134 return expression, accessed_vars 135 136 @classmethod 137 def _substitute_env_vars(cls, content: str, env: Mapping[str, str]): 138 """ 139 Substitute ${template} references to env vars with a reference to a python class 140 attribute like __env.var, and collect the accessed env vars so we can construct 141 that class with the required attributes later. 142 """ 143 144 from ..env.template import SpyDict, apply_envvars_to_template 145 146 # Spy on access to the env, so that instead of replacing template ${keys} with 147 # the corresponding value, replace them with a python name and keep track of 148 # referenced env vars. 149 accessed_vars: Dict[str, str] = {} 150 151 def getitem_spy(obj: SpyDict, key: str, value: str): 152 accessed_vars[key] = value 153 return f"__env.{key}" 154 155 expression = apply_envvars_to_template( 156 content=content, 157 env=SpyDict(env, getitem_spy=getitem_spy), 158 require_braces=True, 159 ) 160 161 return expression, accessed_vars
A task consisting of a python expression
104 def parse_content( 105 self, 106 args: Optional[Dict[str, Any]], 107 env: "EnvVarsManager", 108 imports=Iterable[str], 109 ) -> Tuple[str, Dict[str, str]]: 110 """ 111 Returns the expression to evaluate and the subset of env vars that it references 112 113 Templated referenced to env vars are resolve before parsing the expression. 114 115 Will raise an exception if the content contains invalid syntax or references 116 python variables that are not in scope. 117 """ 118 119 from ..helpers.python import resolve_expression 120 121 expression, accessed_vars = self._substitute_env_vars( 122 self.spec.content.strip(), env.to_dict() 123 ) 124 125 expression = resolve_expression( 126 source=expression, 127 arguments=set(args or tuple()), 128 allowed_vars={"sys", "__env", *imports}, 129 ) 130 # Strip out any new lines because they can be problematic on windows 131 expression = re.sub(r"((\r\n|\r|\n) | (\r\n|\r|\n))", " ", expression) 132 expression = re.sub(r"(\r\n|\r|\n)", " ", expression) 133 134 return expression, accessed_vars
Returns the expression to evaluate and the subset of env vars that it references
Templated referenced to env vars are resolve before parsing the expression.
Will raise an exception if the content contains invalid syntax or references python variables that are not in scope.
34 class TaskOptions(PoeTask.TaskOptions): 35 imports: Sequence[str] = tuple() 36 assert_: Union[bool, int] = False 37 use_exec: bool = False 38 39 def validate(self): 40 super().validate() 41 if self.use_exec and self.capture_stdout: 42 raise ConfigValidationError( 43 "'use_exec' and 'capture_stdout'" 44 " options cannot be both provided on the same task." 45 )
A special kind of config object that parses options ...
39 def validate(self): 40 super().validate() 41 if self.use_exec and self.capture_stdout: 42 raise ConfigValidationError( 43 "'use_exec' and 'capture_stdout'" 44 " options cannot be both provided on the same task." 45 )
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
47 class TaskSpec(PoeTask.TaskSpec): 48 content: str 49 options: "ExprTask.TaskOptions" 50 51 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 52 """ 53 Perform validations on this TaskSpec that apply to a specific task type 54 """ 55 try: 56 # ruff: noqa: E501 57 self.task_type._substitute_env_vars(self.content.strip(), {}) # type: ignore[attr-defined] 58 except (ValueError, ExpressionParseError) as error: 59 raise ConfigValidationError(f"Invalid expression: {error}")
Inherited Members
14class RefTask(PoeTask): 15 """ 16 A task consisting of a reference to another task 17 """ 18 19 __key__ = "ref" 20 21 class TaskOptions(PoeTask.TaskOptions): 22 def validate(self): 23 """ 24 Validation rules that don't require any extra context go here. 25 """ 26 if self.executor: 27 raise ConfigValidationError( 28 "Option 'executor' cannot be set on a ref task" 29 ) 30 if self.capture_stdout: 31 raise ConfigValidationError( 32 "Option 'capture_stdout' cannot be set on a ref task" 33 ) 34 35 class TaskSpec(PoeTask.TaskSpec): 36 content: str 37 options: "RefTask.TaskOptions" 38 39 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 40 """ 41 Perform validations on this TaskSpec that apply to a specific task type 42 """ 43 44 import shlex 45 46 task_name_ref = shlex.split(self.content)[0] 47 48 if task_name_ref not in task_specs: 49 raise ConfigValidationError( 50 f"Includes reference to unknown task {task_name_ref!r}" 51 ) 52 53 if task_specs.get(task_name_ref).options.get("use_exec", False): 54 raise ConfigValidationError( 55 f"Illegal reference to task with " 56 f"'use_exec' set to true: {task_name_ref!r}" 57 ) 58 59 spec: TaskSpec 60 61 def _handle_run( 62 self, 63 context: "RunContext", 64 env: "EnvVarsManager", 65 ) -> int: 66 """ 67 Lookup and delegate to the referenced task 68 """ 69 import shlex 70 71 named_arg_values, extra_args = self.get_parsed_arguments(env) 72 env.update(named_arg_values) 73 74 ref_invocation = ( 75 *( 76 env.fill_template(token) 77 for token in shlex.split(env.fill_template(self.spec.content.strip())) 78 ), 79 *extra_args, 80 ) 81 82 task = self.ctx.specs.get(ref_invocation[0]).create_task( 83 invocation=ref_invocation, ctx=TaskContext.from_task(self) 84 ) 85 86 if task.has_deps(): 87 return self._run_task_graph(task, context, env) 88 89 return task.run(context=context, parent_env=env) 90 91 def _run_task_graph( 92 self, 93 task: "PoeTask", 94 context: "RunContext", 95 env: "EnvVarsManager", 96 ) -> int: 97 from ..exceptions import ExecutionError 98 from .graph import TaskExecutionGraph 99 100 graph = TaskExecutionGraph(task, context) 101 plan = graph.get_execution_plan() 102 for stage in plan: 103 for stage_task in stage: 104 if stage_task == task: 105 # The final sink task gets special treatment 106 return task.run(context=context, parent_env=env) 107 108 task_result = stage_task.run(context=context) 109 if task_result: 110 raise ExecutionError( 111 f"Task graph aborted after failed task {stage_task.name!r}" 112 ) 113 return 0
A task consisting of a reference to another task
21 class TaskOptions(PoeTask.TaskOptions): 22 def validate(self): 23 """ 24 Validation rules that don't require any extra context go here. 25 """ 26 if self.executor: 27 raise ConfigValidationError( 28 "Option 'executor' cannot be set on a ref task" 29 ) 30 if self.capture_stdout: 31 raise ConfigValidationError( 32 "Option 'capture_stdout' cannot be set on a ref task" 33 )
A special kind of config object that parses options ...
22 def validate(self): 23 """ 24 Validation rules that don't require any extra context go here. 25 """ 26 if self.executor: 27 raise ConfigValidationError( 28 "Option 'executor' cannot be set on a ref task" 29 ) 30 if self.capture_stdout: 31 raise ConfigValidationError( 32 "Option 'capture_stdout' cannot be set on a ref task" 33 )
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
35 class TaskSpec(PoeTask.TaskSpec): 36 content: str 37 options: "RefTask.TaskOptions" 38 39 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 40 """ 41 Perform validations on this TaskSpec that apply to a specific task type 42 """ 43 44 import shlex 45 46 task_name_ref = shlex.split(self.content)[0] 47 48 if task_name_ref not in task_specs: 49 raise ConfigValidationError( 50 f"Includes reference to unknown task {task_name_ref!r}" 51 ) 52 53 if task_specs.get(task_name_ref).options.get("use_exec", False): 54 raise ConfigValidationError( 55 f"Illegal reference to task with " 56 f"'use_exec' set to true: {task_name_ref!r}" 57 )
Inherited Members
16class ScriptTask(PoeTask): 17 """ 18 A task consisting of a reference to a python script 19 """ 20 21 content: str 22 23 __key__ = "script" 24 25 class TaskOptions(PoeTask.TaskOptions): 26 use_exec: bool = False 27 print_result: bool = False 28 29 def validate(self): 30 super().validate() 31 if self.use_exec and self.capture_stdout: 32 raise ConfigValidationError( 33 "'use_exec' and 'capture_stdout'" 34 " options cannot be both provided on the same task." 35 ) 36 37 class TaskSpec(PoeTask.TaskSpec): 38 content: str 39 options: "ScriptTask.TaskOptions" 40 41 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 42 """ 43 Perform validations on this TaskSpec that apply to a specific task type 44 """ 45 from ..helpers.python import parse_and_validate 46 47 try: 48 target_module, target_ref = self.content.split(":", 1) 49 if not target_ref.isidentifier(): 50 parse_and_validate(target_ref, call_only=True) 51 except (ValueError, ExpressionParseError): 52 raise ConfigValidationError( 53 f"Invalid callable reference {self.content!r}\n" 54 "(expected something like `module:callable` or `module:callable()`)" 55 ) 56 57 spec: TaskSpec 58 59 def _handle_run( 60 self, 61 context: "RunContext", 62 env: "EnvVarsManager", 63 ) -> int: 64 from ..helpers.python import format_class 65 66 named_arg_values, extra_args = self.get_parsed_arguments(env) 67 env.update(named_arg_values) 68 69 # TODO: do something about extra_args, error? 70 71 target_module, function_call = self.parse_content(named_arg_values) 72 function_ref = function_call.function_ref 73 74 argv = [ 75 self.name, 76 *(env.fill_template(token) for token in self.invocation[1:]), 77 ] 78 79 # TODO: check whether the project really does use src layout, and don't do 80 # sys.path.append('src') if it doesn't 81 82 has_dry_run_ref = "_dry_run" in function_call.referenced_globals 83 dry_run = self.ctx.ui["dry_run"] 84 85 script = [ 86 "import asyncio,os,sys;", 87 "from inspect import iscoroutinefunction as _c;", 88 "from os import environ;", 89 "from importlib import import_module as _i;", 90 f"_dry_run = {'True' if dry_run else 'False'};" if has_dry_run_ref else "", 91 f"sys.argv = {argv!r}; sys.path.append('src');", 92 f"{format_class(named_arg_values)}", 93 f"_m = _i('{target_module}');", 94 f"_r = asyncio.run(_m.{function_call.expression}) if _c(_m.{function_ref})", 95 f" else _m.{function_call.expression};", 96 ] 97 98 if self.spec.options.get("print_result"): 99 script.append("_r is not None and print(_r);") 100 101 # Exactly which python executable to use is usually resolved by the executor 102 # It's important that the script contains no line breaks to avoid issues on 103 # windows 104 cmd = ("python", "-c", "".join(script)) 105 106 self._print_action(shlex.join(argv), context.dry) 107 return self._get_executor( 108 context, env, delegate_dry_run=has_dry_run_ref 109 ).execute(cmd, use_exec=self.spec.options.get("use_exec", False)) 110 111 def parse_content( 112 self, args: Optional[Dict[str, Any]] 113 ) -> Tuple[str, "FunctionCall"]: 114 """ 115 Returns the module to load, and the function call to execute. 116 117 Will raise an exception if the function call contains invalid syntax or 118 references variables that are not in scope. 119 """ 120 121 from ..helpers.python import FunctionCall 122 123 try: 124 target_module, target_ref = self.spec.content.strip().split(":", 1) 125 except ValueError: 126 raise ExpressionParseError( 127 f"Invalid task content: {self.spec.content.strip()!r}" 128 ) 129 130 if target_ref.isidentifier(): 131 if args: 132 function_call = FunctionCall(f"{target_ref}(**({args}))", target_ref) 133 else: 134 function_call = FunctionCall(f"{target_ref}()", target_ref) 135 else: 136 function_call = FunctionCall.parse( 137 source=target_ref, 138 arguments=set(args or tuple()), 139 allowed_vars={"sys", "os", "environ", "_dry_run"}, 140 ) 141 142 return target_module, function_call
A task consisting of a reference to a python script
111 def parse_content( 112 self, args: Optional[Dict[str, Any]] 113 ) -> Tuple[str, "FunctionCall"]: 114 """ 115 Returns the module to load, and the function call to execute. 116 117 Will raise an exception if the function call contains invalid syntax or 118 references variables that are not in scope. 119 """ 120 121 from ..helpers.python import FunctionCall 122 123 try: 124 target_module, target_ref = self.spec.content.strip().split(":", 1) 125 except ValueError: 126 raise ExpressionParseError( 127 f"Invalid task content: {self.spec.content.strip()!r}" 128 ) 129 130 if target_ref.isidentifier(): 131 if args: 132 function_call = FunctionCall(f"{target_ref}(**({args}))", target_ref) 133 else: 134 function_call = FunctionCall(f"{target_ref}()", target_ref) 135 else: 136 function_call = FunctionCall.parse( 137 source=target_ref, 138 arguments=set(args or tuple()), 139 allowed_vars={"sys", "os", "environ", "_dry_run"}, 140 ) 141 142 return target_module, function_call
Returns the module to load, and the function call to execute.
Will raise an exception if the function call contains invalid syntax or references variables that are not in scope.
25 class TaskOptions(PoeTask.TaskOptions): 26 use_exec: bool = False 27 print_result: bool = False 28 29 def validate(self): 30 super().validate() 31 if self.use_exec and self.capture_stdout: 32 raise ConfigValidationError( 33 "'use_exec' and 'capture_stdout'" 34 " options cannot be both provided on the same task." 35 )
A special kind of config object that parses options ...
29 def validate(self): 30 super().validate() 31 if self.use_exec and self.capture_stdout: 32 raise ConfigValidationError( 33 "'use_exec' and 'capture_stdout'" 34 " options cannot be both provided on the same task." 35 )
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
37 class TaskSpec(PoeTask.TaskSpec): 38 content: str 39 options: "ScriptTask.TaskOptions" 40 41 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 42 """ 43 Perform validations on this TaskSpec that apply to a specific task type 44 """ 45 from ..helpers.python import parse_and_validate 46 47 try: 48 target_module, target_ref = self.content.split(":", 1) 49 if not target_ref.isidentifier(): 50 parse_and_validate(target_ref, call_only=True) 51 except (ValueError, ExpressionParseError): 52 raise ConfigValidationError( 53 f"Invalid callable reference {self.content!r}\n" 54 "(expected something like `module:callable` or `module:callable()`)" 55 )
Inherited Members
26class SequenceTask(PoeTask): 27 """ 28 A task consisting of a sequence of other tasks 29 """ 30 31 content: List[Union[str, Dict[str, Any]]] 32 33 __key__ = "sequence" 34 __content_type__: ClassVar[Type] = list 35 36 class TaskOptions(PoeTask.TaskOptions): 37 ignore_fail: Literal[True, False, "return_zero", "return_non_zero"] = False 38 default_item_type: Optional[str] = None 39 40 def validate(self): 41 """ 42 Validation rules that don't require any extra context go here. 43 """ 44 super().validate() 45 if self.default_item_type is not None and not PoeTask.is_task_type( 46 self.default_item_type, content_type=str 47 ): 48 raise ConfigValidationError( 49 "Unsupported value for option `default_item_type`,\n" 50 f"Expected one of {PoeTask.get_task_types(content_type=str)}" 51 ) 52 53 class TaskSpec(PoeTask.TaskSpec): 54 content: list 55 options: "SequenceTask.TaskOptions" 56 subtasks: Sequence[PoeTask.TaskSpec] 57 58 def __init__( 59 self, 60 name: str, 61 task_def: Dict[str, Any], 62 factory: "TaskSpecFactory", 63 source: "ConfigPartition", 64 parent: Optional["PoeTask.TaskSpec"] = None, 65 ): 66 super().__init__(name, task_def, factory, source, parent) 67 68 self.subtasks = [] 69 for index, sub_task_def in enumerate(task_def[SequenceTask.__key__]): 70 if not isinstance(sub_task_def, (str, dict, list)): 71 raise ConfigValidationError( 72 f"Item #{index} in sequence task should be a value of " 73 "type: str | dict | list", 74 task_name=self.name, 75 ) 76 77 subtask_name = ( 78 sub_task_def 79 if ( 80 isinstance(sub_task_def, str) 81 and (sub_task_def[0].isalpha() or sub_task_def[0] == "_") 82 ) 83 else SequenceTask._subtask_name(name, index) 84 ) 85 task_type_key = self.task_type.resolve_task_type( 86 sub_task_def, 87 factory.config, 88 array_item=task_def.get("default_item_type", True), 89 ) 90 91 try: 92 self.subtasks.append( 93 factory.get( 94 subtask_name, sub_task_def, task_type_key, parent=self 95 ) 96 ) 97 except PoeException: 98 raise ConfigValidationError( 99 f"Failed to interpret subtask #{index} in sequence", 100 task_name=self.name, 101 ) 102 103 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 104 """ 105 Perform validations on this TaskSpec that apply to a specific task type 106 """ 107 for index, subtask in enumerate(self.subtasks): 108 if subtask.args: 109 raise ConfigValidationError( 110 "Unsupported option 'args' for task declared inside sequence" 111 ) 112 113 subtask.validate(config, task_specs) 114 115 spec: TaskSpec 116 117 def __init__( 118 self, 119 spec: TaskSpec, 120 invocation: Tuple[str, ...], 121 ctx: TaskContext, 122 capture_stdout: bool = False, 123 ): 124 assert capture_stdout in (False, None) 125 super().__init__(spec, invocation, ctx) 126 self.subtasks = [ 127 task_spec.create_task( 128 invocation=(self._subtask_name(task_spec.name, index),), 129 ctx=TaskContext.from_task(self), 130 ) 131 for index, task_spec in enumerate(spec.subtasks) 132 ] 133 134 def _handle_run( 135 self, 136 context: "RunContext", 137 env: "EnvVarsManager", 138 ) -> int: 139 named_arg_values, extra_args = self.get_parsed_arguments(env) 140 env.update(named_arg_values) 141 142 if not named_arg_values and any(arg.strip() for arg in self.invocation[1:]): 143 raise PoeException(f"Sequence task {self.name!r} does not accept arguments") 144 145 if len(self.subtasks) > 1: 146 # Indicate on the global context that there are multiple stages 147 context.multistage = True 148 149 ignore_fail = self.spec.options.ignore_fail 150 non_zero_subtasks: List[str] = list() 151 for subtask in self.subtasks: 152 try: 153 task_result = subtask.run(context=context, parent_env=env) 154 except ExecutionError as error: 155 if ignore_fail: 156 print("Warning:", error.msg) 157 non_zero_subtasks.append(subtask.name) 158 else: 159 raise 160 161 if task_result: 162 if not ignore_fail: 163 raise ExecutionError( 164 f"Sequence aborted after failed subtask {subtask.name!r}" 165 ) 166 non_zero_subtasks.append(subtask.name) 167 168 if non_zero_subtasks and ignore_fail == "return_non_zero": 169 plural = "s" if len(non_zero_subtasks) > 1 else "" 170 raise ExecutionError( 171 f"Subtask{plural} {', '.join(repr(st) for st in non_zero_subtasks)} " 172 "returned non-zero exit status" 173 ) 174 return 0 175 176 @classmethod 177 def _subtask_name(cls, task_name: str, index: int): 178 return f"{task_name}[{index}]"
A task consisting of a sequence of other tasks
117 def __init__( 118 self, 119 spec: TaskSpec, 120 invocation: Tuple[str, ...], 121 ctx: TaskContext, 122 capture_stdout: bool = False, 123 ): 124 assert capture_stdout in (False, None) 125 super().__init__(spec, invocation, ctx) 126 self.subtasks = [ 127 task_spec.create_task( 128 invocation=(self._subtask_name(task_spec.name, index),), 129 ctx=TaskContext.from_task(self), 130 ) 131 for index, task_spec in enumerate(spec.subtasks) 132 ]
36 class TaskOptions(PoeTask.TaskOptions): 37 ignore_fail: Literal[True, False, "return_zero", "return_non_zero"] = False 38 default_item_type: Optional[str] = None 39 40 def validate(self): 41 """ 42 Validation rules that don't require any extra context go here. 43 """ 44 super().validate() 45 if self.default_item_type is not None and not PoeTask.is_task_type( 46 self.default_item_type, content_type=str 47 ): 48 raise ConfigValidationError( 49 "Unsupported value for option `default_item_type`,\n" 50 f"Expected one of {PoeTask.get_task_types(content_type=str)}" 51 )
A special kind of config object that parses options ...
40 def validate(self): 41 """ 42 Validation rules that don't require any extra context go here. 43 """ 44 super().validate() 45 if self.default_item_type is not None and not PoeTask.is_task_type( 46 self.default_item_type, content_type=str 47 ): 48 raise ConfigValidationError( 49 "Unsupported value for option `default_item_type`,\n" 50 f"Expected one of {PoeTask.get_task_types(content_type=str)}" 51 )
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
53 class TaskSpec(PoeTask.TaskSpec): 54 content: list 55 options: "SequenceTask.TaskOptions" 56 subtasks: Sequence[PoeTask.TaskSpec] 57 58 def __init__( 59 self, 60 name: str, 61 task_def: Dict[str, Any], 62 factory: "TaskSpecFactory", 63 source: "ConfigPartition", 64 parent: Optional["PoeTask.TaskSpec"] = None, 65 ): 66 super().__init__(name, task_def, factory, source, parent) 67 68 self.subtasks = [] 69 for index, sub_task_def in enumerate(task_def[SequenceTask.__key__]): 70 if not isinstance(sub_task_def, (str, dict, list)): 71 raise ConfigValidationError( 72 f"Item #{index} in sequence task should be a value of " 73 "type: str | dict | list", 74 task_name=self.name, 75 ) 76 77 subtask_name = ( 78 sub_task_def 79 if ( 80 isinstance(sub_task_def, str) 81 and (sub_task_def[0].isalpha() or sub_task_def[0] == "_") 82 ) 83 else SequenceTask._subtask_name(name, index) 84 ) 85 task_type_key = self.task_type.resolve_task_type( 86 sub_task_def, 87 factory.config, 88 array_item=task_def.get("default_item_type", True), 89 ) 90 91 try: 92 self.subtasks.append( 93 factory.get( 94 subtask_name, sub_task_def, task_type_key, parent=self 95 ) 96 ) 97 except PoeException: 98 raise ConfigValidationError( 99 f"Failed to interpret subtask #{index} in sequence", 100 task_name=self.name, 101 ) 102 103 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 104 """ 105 Perform validations on this TaskSpec that apply to a specific task type 106 """ 107 for index, subtask in enumerate(self.subtasks): 108 if subtask.args: 109 raise ConfigValidationError( 110 "Unsupported option 'args' for task declared inside sequence" 111 ) 112 113 subtask.validate(config, task_specs)
58 def __init__( 59 self, 60 name: str, 61 task_def: Dict[str, Any], 62 factory: "TaskSpecFactory", 63 source: "ConfigPartition", 64 parent: Optional["PoeTask.TaskSpec"] = None, 65 ): 66 super().__init__(name, task_def, factory, source, parent) 67 68 self.subtasks = [] 69 for index, sub_task_def in enumerate(task_def[SequenceTask.__key__]): 70 if not isinstance(sub_task_def, (str, dict, list)): 71 raise ConfigValidationError( 72 f"Item #{index} in sequence task should be a value of " 73 "type: str | dict | list", 74 task_name=self.name, 75 ) 76 77 subtask_name = ( 78 sub_task_def 79 if ( 80 isinstance(sub_task_def, str) 81 and (sub_task_def[0].isalpha() or sub_task_def[0] == "_") 82 ) 83 else SequenceTask._subtask_name(name, index) 84 ) 85 task_type_key = self.task_type.resolve_task_type( 86 sub_task_def, 87 factory.config, 88 array_item=task_def.get("default_item_type", True), 89 ) 90 91 try: 92 self.subtasks.append( 93 factory.get( 94 subtask_name, sub_task_def, task_type_key, parent=self 95 ) 96 ) 97 except PoeException: 98 raise ConfigValidationError( 99 f"Failed to interpret subtask #{index} in sequence", 100 task_name=self.name, 101 )
Inherited Members
20class ShellTask(PoeTask): 21 """ 22 A task consisting of a reference to a shell command 23 """ 24 25 content: str 26 27 __key__ = "shell" 28 29 class TaskOptions(PoeTask.TaskOptions): 30 interpreter: Optional[Union[str, list]] = None 31 32 def validate(self): 33 super().validate() 34 35 from ..config import KNOWN_SHELL_INTERPRETERS as VALID_INTERPRETERS 36 37 if ( 38 isinstance(self.interpreter, str) 39 and self.interpreter not in VALID_INTERPRETERS 40 ): 41 raise ConfigValidationError( 42 "Invalid value for option 'interpreter',\n" 43 f"Expected one of {VALID_INTERPRETERS}" 44 ) 45 46 if isinstance(self.interpreter, list): 47 if len(self.interpreter) == 0: 48 raise ConfigValidationError( 49 "Invalid value for option 'interpreter',\n" 50 "Expected at least one item in list." 51 ) 52 for item in self.interpreter: 53 if item not in VALID_INTERPRETERS: 54 raise ConfigValidationError( 55 f"Invalid item {item!r} in option 'interpreter',\n" 56 f"Expected one of {VALID_INTERPRETERS!r}" 57 ) 58 59 class TaskSpec(PoeTask.TaskSpec): 60 content: str 61 options: "ShellTask.TaskOptions" 62 63 spec: TaskSpec 64 65 def _handle_run( 66 self, 67 context: "RunContext", 68 env: "EnvVarsManager", 69 ) -> int: 70 named_arg_values, extra_args = self.get_parsed_arguments(env) 71 env.update(named_arg_values) 72 73 if not named_arg_values and any(arg.strip() for arg in self.invocation[1:]): 74 raise PoeException( 75 f"Shell task {self.spec.name!r} does not accept arguments" 76 ) 77 78 interpreter_cmd = self.resolve_interpreter_cmd() 79 if not interpreter_cmd: 80 config_value = self._get_interpreter_config() 81 message = ( 82 f"Couldn't locate interpreter executable for {config_value!r} to run " 83 "shell task. " 84 ) 85 if self._is_windows and set(config_value).issubset({"posix", "bash"}): 86 message += "Installing Git Bash or using WSL should fix this." 87 else: 88 message += "Some dependencies may be missing from your system." 89 raise PoeException(message) 90 91 content = _unindent_code(self.spec.content).rstrip() 92 93 self._print_action(content, context.dry) 94 95 return self._get_executor(context, env).execute( 96 interpreter_cmd, input=content.encode() 97 ) 98 99 def _get_interpreter_config(self) -> Tuple[str, ...]: 100 result: Union[str, Tuple[str, ...]] = self.spec.options.get( 101 "interpreter", self.ctx.config.shell_interpreter 102 ) 103 if isinstance(result, str): 104 return (result,) 105 return tuple(result) 106 107 def resolve_interpreter_cmd(self) -> Optional[List[str]]: 108 """ 109 Return a formatted command for the first specified interpreter that can be 110 located. 111 """ 112 for item in self._get_interpreter_config(): 113 executable = self._locate_interpreter(item) 114 if executable is None: 115 continue 116 117 if item in ("pwsh", "powershell"): 118 return [executable, "-NoLogo", "-Command", "-"] 119 120 return [executable] 121 122 return None 123 124 def _locate_interpreter(self, interpreter: str) -> Optional[str]: 125 from shutil import which 126 127 result = None 128 prog_files = environ.get("PROGRAMFILES", "C:\\Program Files") 129 130 # Try use $SHELL from the environment as a hint 131 shell_var = environ.get("SHELL", "") 132 if shell_var.endswith(f"/{interpreter}") and which(shell_var) == shell_var: 133 result = shell_var 134 135 elif interpreter == "posix": 136 # look for any known posix shell 137 result = ( 138 self._locate_interpreter("sh") 139 or self._locate_interpreter("bash") 140 or self._locate_interpreter("zsh") 141 ) 142 143 elif interpreter == "sh": 144 result = which("sh") or which("/bin/sh") 145 146 # Specifically look for git sh on windows 147 if result is None and self._is_windows: 148 result = which(f"{prog_files}\\Git\\bin\\sh.exe") 149 150 elif interpreter == "bash": 151 if self._is_windows: 152 # Specifically look for git bash on windows as the preferred option 153 # Don't trust bash from the path because it might be a useless decoy 154 result = ( 155 which(f"{prog_files}\\Git\\bin\\bash.exe") 156 or which("/bin/bash") 157 or which("bash") 158 ) 159 else: 160 result = which("bash") or which("/bin/bash") 161 162 elif interpreter == "zsh": 163 result = which("zsh") or which("/bin/zsh") 164 165 elif interpreter == "fish": 166 result = which("fish") or which("/bin/fish") 167 168 elif interpreter in ("pwsh", "powershell"): 169 # Look for the pwsh executable and verify the version matches 170 result = ( 171 which("pwsh") 172 or which(f"{prog_files}\\PowerShell\\7\\pwsh.exe") 173 or which(f"{prog_files}\\PowerShell\\6\\pwsh.exe") 174 ) 175 176 if result is None and interpreter == "powershell" and self._is_windows: 177 # Look for older versions of powershell 178 result = which("powershell") or which( 179 environ.get("WINDIR", "C:\\Windows") 180 + "\\System32\\WindowsPowerShell\\v1.0\\powershell.EXE" 181 ) 182 183 elif interpreter == "python": 184 # Exactly which python executable to use is usually resolved by the executor 185 result = "python" 186 187 return result
A task consisting of a reference to a shell command
107 def resolve_interpreter_cmd(self) -> Optional[List[str]]: 108 """ 109 Return a formatted command for the first specified interpreter that can be 110 located. 111 """ 112 for item in self._get_interpreter_config(): 113 executable = self._locate_interpreter(item) 114 if executable is None: 115 continue 116 117 if item in ("pwsh", "powershell"): 118 return [executable, "-NoLogo", "-Command", "-"] 119 120 return [executable] 121 122 return None
Return a formatted command for the first specified interpreter that can be located.
29 class TaskOptions(PoeTask.TaskOptions): 30 interpreter: Optional[Union[str, list]] = None 31 32 def validate(self): 33 super().validate() 34 35 from ..config import KNOWN_SHELL_INTERPRETERS as VALID_INTERPRETERS 36 37 if ( 38 isinstance(self.interpreter, str) 39 and self.interpreter not in VALID_INTERPRETERS 40 ): 41 raise ConfigValidationError( 42 "Invalid value for option 'interpreter',\n" 43 f"Expected one of {VALID_INTERPRETERS}" 44 ) 45 46 if isinstance(self.interpreter, list): 47 if len(self.interpreter) == 0: 48 raise ConfigValidationError( 49 "Invalid value for option 'interpreter',\n" 50 "Expected at least one item in list." 51 ) 52 for item in self.interpreter: 53 if item not in VALID_INTERPRETERS: 54 raise ConfigValidationError( 55 f"Invalid item {item!r} in option 'interpreter',\n" 56 f"Expected one of {VALID_INTERPRETERS!r}" 57 )
A special kind of config object that parses options ...
32 def validate(self): 33 super().validate() 34 35 from ..config import KNOWN_SHELL_INTERPRETERS as VALID_INTERPRETERS 36 37 if ( 38 isinstance(self.interpreter, str) 39 and self.interpreter not in VALID_INTERPRETERS 40 ): 41 raise ConfigValidationError( 42 "Invalid value for option 'interpreter',\n" 43 f"Expected one of {VALID_INTERPRETERS}" 44 ) 45 46 if isinstance(self.interpreter, list): 47 if len(self.interpreter) == 0: 48 raise ConfigValidationError( 49 "Invalid value for option 'interpreter',\n" 50 "Expected at least one item in list." 51 ) 52 for item in self.interpreter: 53 if item not in VALID_INTERPRETERS: 54 raise ConfigValidationError( 55 f"Invalid item {item!r} in option 'interpreter',\n" 56 f"Expected one of {VALID_INTERPRETERS!r}" 57 )
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
Inherited Members
29class SwitchTask(PoeTask): 30 """ 31 A task that runs one of several `case` subtasks depending on the output of a 32 `switch` subtask. 33 """ 34 35 __key__ = "switch" 36 __content_type__: ClassVar[Type] = list 37 38 class TaskOptions(PoeTask.TaskOptions): 39 control: Union[str, dict] 40 default: Literal["pass", "fail"] = "fail" 41 42 @classmethod 43 def normalize( 44 cls, 45 config: Any, 46 strict: bool = True, 47 ): 48 """ 49 Perform validations that require access to to the raw config. 50 """ 51 if strict and isinstance(config, dict): 52 # Subtasks may not declare certain options 53 for subtask_def in config.get("switch", tuple()): 54 for banned_option in SUBTASK_OPTIONS_BLOCKLIST: 55 if banned_option in subtask_def: 56 if "case" not in subtask_def: 57 raise ConfigValidationError( 58 "Default case includes incompatible option " 59 f"{banned_option!r}" 60 ) 61 raise ConfigValidationError( 62 f"Case {subtask_def.get('case')!r} includes " 63 f"incompatible option {banned_option!r}" 64 ) 65 66 return super().normalize(config, strict) 67 68 class TaskSpec(PoeTask.TaskSpec): 69 control_task_spec: PoeTask.TaskSpec 70 case_task_specs: Tuple[Tuple[Tuple[Any, ...], PoeTask.TaskSpec], ...] 71 options: "SwitchTask.TaskOptions" 72 73 def __init__( 74 self, 75 name: str, 76 task_def: Dict[str, Any], 77 factory: "TaskSpecFactory", 78 source: "ConfigPartition", 79 parent: Optional["PoeTask.TaskSpec"] = None, 80 ): 81 super().__init__(name, task_def, factory, source, parent) 82 83 switch_args = task_def.get("args") 84 control_task_def = task_def["control"] 85 86 if switch_args: 87 if isinstance(control_task_def, str): 88 control_task_def = { 89 factory.config.default_task_type: control_task_def 90 } 91 control_task_def = dict(control_task_def, args=switch_args) 92 93 self.control_task_spec = factory.get( 94 task_name=f"{name}[__control__]", task_def=control_task_def, parent=self 95 ) 96 97 case_task_specs = [] 98 for switch_item in task_def["switch"]: 99 case_task_def = dict(switch_item, args=switch_args) 100 case = case_task_def.pop("case", DEFAULT_CASE) 101 case_tuple = tuple(case) if isinstance(case, list) else (case,) 102 case_task_index = ",".join(str(value) for value in case_tuple) 103 case_task_specs.append( 104 ( 105 case_tuple, 106 factory.get( 107 task_name=f"{name}[{case_task_index}]", 108 task_def=case_task_def, 109 parent=self, 110 ), 111 ) 112 ) 113 114 self.case_task_specs = tuple(case_task_specs) 115 116 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 117 from collections import defaultdict 118 119 allowed_control_task_types = ("expr", "cmd", "script") 120 if ( 121 self.control_task_spec.task_type.__key__ 122 not in allowed_control_task_types 123 ): 124 raise ConfigValidationError( 125 f"Control task must have a type that is one of " 126 f"{allowed_control_task_types!r}" 127 ) 128 129 cases: MutableMapping[Any, int] = defaultdict(int) 130 for case_keys, case_task_spec in self.case_task_specs: 131 for case_key in case_keys: 132 cases[case_key] += 1 133 134 # Ensure case keys don't overlap (and only one default case) 135 for case, count in cases.items(): 136 if count > 1: 137 if case is DEFAULT_CASE: 138 raise ConfigValidationError( 139 "Switch array includes more than one default case" 140 ) 141 raise ConfigValidationError( 142 f"Switch array includes more than one case for {case!r}" 143 ) 144 145 if self.options.default != "fail" and DEFAULT_CASE in cases: 146 raise ConfigValidationError( 147 "switch tasks should not declare both a default case and the " 148 "'default' option" 149 ) 150 151 # Validate subtask specs 152 self.control_task_spec.validate(config, task_specs) 153 for _, case_task_spec in self.case_task_specs: 154 case_task_spec.validate(config, task_specs) 155 156 spec: TaskSpec 157 control_task: PoeTask 158 switch_tasks: Dict[str, PoeTask] 159 160 def __init__( 161 self, 162 spec: TaskSpec, 163 invocation: Tuple[str, ...], 164 ctx: TaskContext, 165 capture_stdout: bool = False, 166 ): 167 super().__init__(spec, invocation, ctx, capture_stdout) 168 169 control_task_name = f"{spec.name}[__control__]" 170 control_invocation: Tuple[str, ...] = (control_task_name,) 171 options = self.spec.options 172 if options.get("args"): 173 control_invocation = (*control_invocation, *invocation[1:]) 174 175 self.control_task = self.spec.control_task_spec.create_task( 176 invocation=control_invocation, 177 ctx=TaskContext.from_task(self), 178 capture_stdout=True, 179 ) 180 181 self.switch_tasks = {} 182 for case_keys, case_spec in spec.case_task_specs: 183 task_invocation: Tuple[str, ...] = (f"{spec.name}[{','.join(case_keys)}]",) 184 if options.get("args"): 185 task_invocation = (*task_invocation, *invocation[1:]) 186 187 case_task = case_spec.create_task( 188 invocation=task_invocation, 189 ctx=TaskContext.from_task(self), 190 capture_stdout=self.capture_stdout, 191 ) 192 for case_key in case_keys: 193 self.switch_tasks[case_key] = case_task 194 195 def _handle_run( 196 self, 197 context: "RunContext", 198 env: "EnvVarsManager", 199 ) -> int: 200 named_arg_values, extra_args = self.get_parsed_arguments(env) 201 env.update(named_arg_values) 202 203 if not named_arg_values and any(arg.strip() for arg in self.invocation[1:]): 204 raise PoeException(f"Switch task {self.name!r} does not accept arguments") 205 206 # Indicate on the global context that there are multiple stages to this task 207 context.multistage = True 208 209 task_result = self.control_task.run(context=context, parent_env=env) 210 if task_result: 211 raise ExecutionError( 212 f"Switch task {self.name!r} aborted after failed control task" 213 ) 214 215 if context.dry: 216 self._print_action( 217 "unresolved case for switch task", dry=True, unresolved=True 218 ) 219 return 0 220 221 control_task_output = context.get_task_output(self.control_task.invocation) 222 case_task = self.switch_tasks.get( 223 control_task_output, self.switch_tasks.get(DEFAULT_CASE) 224 ) 225 226 if case_task is None: 227 if self.spec.options.default == "pass": 228 return 0 229 raise ExecutionError( 230 f"Control value {control_task_output!r} did not match any cases in " 231 f"switch task {self.name!r}." 232 ) 233 234 result = case_task.run(context=context, parent_env=env) 235 236 if self.capture_stdout is True: 237 # The executor saved output for the case task, but we need it to be 238 # registered for this switch task as well 239 context.save_task_output( 240 self.invocation, context.get_task_output(case_task.invocation).encode() 241 ) 242 243 return result
A task that runs one of several case
subtasks depending on the output of a
switch
subtask.
160 def __init__( 161 self, 162 spec: TaskSpec, 163 invocation: Tuple[str, ...], 164 ctx: TaskContext, 165 capture_stdout: bool = False, 166 ): 167 super().__init__(spec, invocation, ctx, capture_stdout) 168 169 control_task_name = f"{spec.name}[__control__]" 170 control_invocation: Tuple[str, ...] = (control_task_name,) 171 options = self.spec.options 172 if options.get("args"): 173 control_invocation = (*control_invocation, *invocation[1:]) 174 175 self.control_task = self.spec.control_task_spec.create_task( 176 invocation=control_invocation, 177 ctx=TaskContext.from_task(self), 178 capture_stdout=True, 179 ) 180 181 self.switch_tasks = {} 182 for case_keys, case_spec in spec.case_task_specs: 183 task_invocation: Tuple[str, ...] = (f"{spec.name}[{','.join(case_keys)}]",) 184 if options.get("args"): 185 task_invocation = (*task_invocation, *invocation[1:]) 186 187 case_task = case_spec.create_task( 188 invocation=task_invocation, 189 ctx=TaskContext.from_task(self), 190 capture_stdout=self.capture_stdout, 191 ) 192 for case_key in case_keys: 193 self.switch_tasks[case_key] = case_task
38 class TaskOptions(PoeTask.TaskOptions): 39 control: Union[str, dict] 40 default: Literal["pass", "fail"] = "fail" 41 42 @classmethod 43 def normalize( 44 cls, 45 config: Any, 46 strict: bool = True, 47 ): 48 """ 49 Perform validations that require access to to the raw config. 50 """ 51 if strict and isinstance(config, dict): 52 # Subtasks may not declare certain options 53 for subtask_def in config.get("switch", tuple()): 54 for banned_option in SUBTASK_OPTIONS_BLOCKLIST: 55 if banned_option in subtask_def: 56 if "case" not in subtask_def: 57 raise ConfigValidationError( 58 "Default case includes incompatible option " 59 f"{banned_option!r}" 60 ) 61 raise ConfigValidationError( 62 f"Case {subtask_def.get('case')!r} includes " 63 f"incompatible option {banned_option!r}" 64 ) 65 66 return super().normalize(config, strict)
A special kind of config object that parses options ...
42 @classmethod 43 def normalize( 44 cls, 45 config: Any, 46 strict: bool = True, 47 ): 48 """ 49 Perform validations that require access to to the raw config. 50 """ 51 if strict and isinstance(config, dict): 52 # Subtasks may not declare certain options 53 for subtask_def in config.get("switch", tuple()): 54 for banned_option in SUBTASK_OPTIONS_BLOCKLIST: 55 if banned_option in subtask_def: 56 if "case" not in subtask_def: 57 raise ConfigValidationError( 58 "Default case includes incompatible option " 59 f"{banned_option!r}" 60 ) 61 raise ConfigValidationError( 62 f"Case {subtask_def.get('case')!r} includes " 63 f"incompatible option {banned_option!r}" 64 ) 65 66 return super().normalize(config, strict)
Perform validations that require access to to the raw config.
68 class TaskSpec(PoeTask.TaskSpec): 69 control_task_spec: PoeTask.TaskSpec 70 case_task_specs: Tuple[Tuple[Tuple[Any, ...], PoeTask.TaskSpec], ...] 71 options: "SwitchTask.TaskOptions" 72 73 def __init__( 74 self, 75 name: str, 76 task_def: Dict[str, Any], 77 factory: "TaskSpecFactory", 78 source: "ConfigPartition", 79 parent: Optional["PoeTask.TaskSpec"] = None, 80 ): 81 super().__init__(name, task_def, factory, source, parent) 82 83 switch_args = task_def.get("args") 84 control_task_def = task_def["control"] 85 86 if switch_args: 87 if isinstance(control_task_def, str): 88 control_task_def = { 89 factory.config.default_task_type: control_task_def 90 } 91 control_task_def = dict(control_task_def, args=switch_args) 92 93 self.control_task_spec = factory.get( 94 task_name=f"{name}[__control__]", task_def=control_task_def, parent=self 95 ) 96 97 case_task_specs = [] 98 for switch_item in task_def["switch"]: 99 case_task_def = dict(switch_item, args=switch_args) 100 case = case_task_def.pop("case", DEFAULT_CASE) 101 case_tuple = tuple(case) if isinstance(case, list) else (case,) 102 case_task_index = ",".join(str(value) for value in case_tuple) 103 case_task_specs.append( 104 ( 105 case_tuple, 106 factory.get( 107 task_name=f"{name}[{case_task_index}]", 108 task_def=case_task_def, 109 parent=self, 110 ), 111 ) 112 ) 113 114 self.case_task_specs = tuple(case_task_specs) 115 116 def _task_validations(self, config: "PoeConfig", task_specs: "TaskSpecFactory"): 117 from collections import defaultdict 118 119 allowed_control_task_types = ("expr", "cmd", "script") 120 if ( 121 self.control_task_spec.task_type.__key__ 122 not in allowed_control_task_types 123 ): 124 raise ConfigValidationError( 125 f"Control task must have a type that is one of " 126 f"{allowed_control_task_types!r}" 127 ) 128 129 cases: MutableMapping[Any, int] = defaultdict(int) 130 for case_keys, case_task_spec in self.case_task_specs: 131 for case_key in case_keys: 132 cases[case_key] += 1 133 134 # Ensure case keys don't overlap (and only one default case) 135 for case, count in cases.items(): 136 if count > 1: 137 if case is DEFAULT_CASE: 138 raise ConfigValidationError( 139 "Switch array includes more than one default case" 140 ) 141 raise ConfigValidationError( 142 f"Switch array includes more than one case for {case!r}" 143 ) 144 145 if self.options.default != "fail" and DEFAULT_CASE in cases: 146 raise ConfigValidationError( 147 "switch tasks should not declare both a default case and the " 148 "'default' option" 149 ) 150 151 # Validate subtask specs 152 self.control_task_spec.validate(config, task_specs) 153 for _, case_task_spec in self.case_task_specs: 154 case_task_spec.validate(config, task_specs)
73 def __init__( 74 self, 75 name: str, 76 task_def: Dict[str, Any], 77 factory: "TaskSpecFactory", 78 source: "ConfigPartition", 79 parent: Optional["PoeTask.TaskSpec"] = None, 80 ): 81 super().__init__(name, task_def, factory, source, parent) 82 83 switch_args = task_def.get("args") 84 control_task_def = task_def["control"] 85 86 if switch_args: 87 if isinstance(control_task_def, str): 88 control_task_def = { 89 factory.config.default_task_type: control_task_def 90 } 91 control_task_def = dict(control_task_def, args=switch_args) 92 93 self.control_task_spec = factory.get( 94 task_name=f"{name}[__control__]", task_def=control_task_def, parent=self 95 ) 96 97 case_task_specs = [] 98 for switch_item in task_def["switch"]: 99 case_task_def = dict(switch_item, args=switch_args) 100 case = case_task_def.pop("case", DEFAULT_CASE) 101 case_tuple = tuple(case) if isinstance(case, list) else (case,) 102 case_task_index = ",".join(str(value) for value in case_tuple) 103 case_task_specs.append( 104 ( 105 case_tuple, 106 factory.get( 107 task_name=f"{name}[{case_task_index}]", 108 task_def=case_task_def, 109 parent=self, 110 ), 111 ) 112 ) 113 114 self.case_task_specs = tuple(case_task_specs)