poethepoet.task.base
1import re 2import sys 3from os import environ 4from pathlib import Path 5from typing import ( 6 TYPE_CHECKING, 7 Any, 8 ClassVar, 9 Dict, 10 Iterator, 11 List, 12 Mapping, 13 NamedTuple, 14 Optional, 15 Sequence, 16 Tuple, 17 Type, 18 Union, 19) 20 21from ..exceptions import ConfigValidationError, PoeException 22from ..options import PoeOptions 23 24if TYPE_CHECKING: 25 from ..config import ConfigPartition, PoeConfig 26 from ..context import RunContext 27 from ..env.manager import EnvVarsManager 28 from ..ui import PoeUi 29 from .args import PoeTaskArgs 30 31 32_TASK_NAME_PATTERN = re.compile(r"^\w[\w\d\-\_\+\:]*$") 33 34 35class MetaPoeTask(type): 36 """ 37 This metaclass makes all descendents of PoeTask (task types) register themselves on 38 declaration and validates that they include the expected class attributes. 39 """ 40 41 def __init__(cls, *args): 42 super().__init__(*args) 43 if cls.__name__ == "PoeTask": 44 return 45 46 assert isinstance(getattr(cls, "__key__", None), str) 47 assert issubclass(getattr(cls, "TaskOptions", None), PoeOptions) 48 PoeTask._PoeTask__task_types[cls.__key__] = cls 49 50 # Give each TaskSpec a reference to its parent PoeTask 51 if "TaskSpec" in cls.__dict__: 52 cls.TaskSpec.task_type = cls 53 54 55TaskContent = Union[str, Sequence[Union[str, Mapping[str, Any]]]] 56 57TaskDef = Union[str, Mapping[str, Any], Sequence[Union[str, Mapping[str, Any]]]] 58 59 60class TaskSpecFactory: 61 __cache: Dict[str, "PoeTask.TaskSpec"] 62 config: "PoeConfig" 63 64 def __init__(self, config: "PoeConfig"): 65 self.__cache = {} 66 self.config = config 67 68 def __contains__(self, other) -> bool: 69 return other in self.__cache 70 71 def get( 72 self, 73 task_name: str, 74 task_def: Optional[TaskDef] = None, 75 task_type: Optional[str] = None, 76 parent: Optional["PoeTask.TaskSpec"] = None, 77 ) -> "PoeTask.TaskSpec": 78 if task_def and parent: 79 # This is probably a subtask and will be cached by the parent task_spec 80 if not task_type: 81 task_type = PoeTask.resolve_task_type(task_def, self.config) 82 assert task_type 83 return self.create( 84 task_name, task_type, task_def, source=parent.source, parent=parent 85 ) 86 87 if task_name not in self.__cache: 88 self.load(task_name) 89 90 return self.__cache[task_name] 91 92 def create( 93 self, 94 task_name: str, 95 task_type: str, 96 task_def: TaskDef, 97 source: "ConfigPartition", 98 parent: Optional["PoeTask.TaskSpec"] = None, 99 ) -> "PoeTask.TaskSpec": 100 """ 101 A parent task should be provided when this task is defined inline within another 102 task, for example as part of a sequence. 103 """ 104 if not isinstance(task_def, dict): 105 task_def = {task_type: task_def} 106 107 return PoeTask.lookup_task_spec_cls(task_type)( 108 name=task_name, 109 task_def=task_def, 110 factory=self, 111 source=source, 112 parent=parent, 113 ) 114 115 def load_all(self): 116 for task_name in self.config.task_names: 117 self.load(task_name) 118 119 return self 120 121 def load(self, task_name: str): 122 task_def, config_partition = self.config.lookup_task(task_name) 123 124 if task_def is None or config_partition is None: 125 raise PoeException(f"Cannot instantiate unknown task {task_name!r}") 126 127 task_type = PoeTask.resolve_task_type(task_def, self.config) 128 if not task_type: 129 raise ConfigValidationError( 130 "Task definition must be a string, a list, or a table including exactly" 131 " one task key\n" 132 f"Available task keys: {set(PoeTask.get_task_types())!r}", 133 task_name=task_name, 134 filename=( 135 None if config_partition.is_primary else str(config_partition.path) 136 ), 137 ) 138 139 self.__cache[task_name] = self.create( 140 task_name, task_type, task_def, source=config_partition 141 ) 142 143 def __iter__(self): 144 return iter(self.__cache.values()) 145 146 147class TaskContext(NamedTuple): 148 """ 149 Collection of contextual config inherited from a parent task to a child task 150 """ 151 152 config: "PoeConfig" 153 cwd: str 154 ui: "PoeUi" 155 specs: "TaskSpecFactory" 156 157 @classmethod 158 def from_task(cls, parent_task: "PoeTask"): 159 return cls( 160 config=parent_task.ctx.config, 161 cwd=str(parent_task.spec.options.get("cwd", parent_task.ctx.cwd)), 162 specs=parent_task.ctx.specs, 163 ui=parent_task.ctx.ui, 164 ) 165 166 167class PoeTask(metaclass=MetaPoeTask): 168 __key__: ClassVar[str] 169 __content_type__: ClassVar[Type] = str 170 171 class TaskOptions(PoeOptions): 172 args: Optional[Union[dict, list]] = None 173 capture_stdout: Optional[str] = None 174 cwd: Optional[str] = None 175 deps: Optional[Sequence[str]] = None 176 env: Optional[dict] = None 177 envfile: Optional[Union[str, list]] = None 178 executor: Optional[dict] = None 179 help: Optional[str] = None 180 uses: Optional[dict] = None 181 182 def validate(self): 183 """ 184 Validation rules that don't require any extra context go here. 185 """ 186 187 class TaskSpec: 188 name: str 189 content: TaskContent 190 options: "PoeTask.TaskOptions" 191 task_type: Type["PoeTask"] 192 source: "ConfigPartition" 193 parent: Optional["PoeTask.TaskSpec"] = None 194 195 _args: Optional["PoeTaskArgs"] = None 196 197 def __init__( 198 self, 199 name: str, 200 task_def: Dict[str, Any], 201 factory: TaskSpecFactory, 202 source: "ConfigPartition", 203 parent: Optional["PoeTask.TaskSpec"] = None, 204 ): 205 self.name = name 206 self.content = task_def[self.task_type.__key__] 207 self.options = self._parse_options(task_def) 208 self.source = source 209 self.parent = parent 210 211 def _parse_options(self, task_def: Dict[str, Any]): 212 try: 213 return next( 214 self.task_type.TaskOptions.parse( 215 task_def, extra_keys=(self.task_type.__key__,) 216 ) 217 ) 218 except ConfigValidationError as error: 219 error.task_name = self.name 220 raise 221 222 def get_task_env( 223 self, 224 parent_env: "EnvVarsManager", 225 uses_values: Optional[Mapping[str, str]] = None, 226 ) -> "EnvVarsManager": 227 """ 228 Resolve the EnvVarsManager for this task, relative to the given parent_env 229 """ 230 task_envfile = self.options.get("envfile") 231 task_env = self.options.get("env") 232 233 result = parent_env.clone() 234 235 # Include env vars from outputs of upstream dependencies 236 if uses_values: 237 result.update(uses_values) 238 239 result.set("POE_CONF_DIR", str(self.source.config_dir)) 240 result.apply_env_config( 241 task_envfile, 242 task_env, 243 config_dir=self.source.config_dir, 244 config_working_dir=self.source.cwd, 245 ) 246 247 return result 248 249 @property 250 def args(self) -> Optional["PoeTaskArgs"]: 251 from .args import PoeTaskArgs 252 253 if not self._args and self.options.args: 254 self._args = PoeTaskArgs(self.options.args, self.name) 255 256 return self._args 257 258 def create_task( 259 self, 260 invocation: Tuple[str, ...], 261 ctx: TaskContext, 262 capture_stdout: Union[str, bool] = False, 263 ) -> "PoeTask": 264 return self.task_type( 265 spec=self, 266 invocation=invocation, 267 capture_stdout=capture_stdout, 268 ctx=ctx, 269 ) 270 271 def validate(self, config: "PoeConfig", task_specs: TaskSpecFactory): 272 try: 273 self._base_validations(config, task_specs) 274 self._task_validations(config, task_specs) 275 except ConfigValidationError as error: 276 error.task_name = self.name 277 raise 278 279 def _base_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory): 280 """ 281 Perform validations on this TaskSpec that apply to all task types 282 """ 283 if not (self.name[0].isalpha() or self.name[0] == "_"): 284 raise ConfigValidationError( 285 "Task names must start with a letter or underscore." 286 ) 287 288 if not self.parent and not _TASK_NAME_PATTERN.match(self.name): 289 raise ConfigValidationError( 290 "Task names characters must be alphanumeric, colon, underscore or " 291 "dash." 292 ) 293 294 if not isinstance(self.content, self.task_type.__content_type__): 295 raise ConfigValidationError( 296 f"Content for {self.task_type.__name__} must be a " 297 f"{self.task_type.__content_type__.__name__}" 298 ) 299 300 if self.options.deps: 301 for dep in self.options.deps: 302 dep_task_name = dep.split(" ", 1)[0] 303 if dep_task_name not in task_specs: 304 raise ConfigValidationError( 305 "'deps' option includes reference to unknown task: " 306 f"{dep_task_name!r}" 307 ) 308 309 if task_specs.get(dep_task_name).options.get("use_exec", False): 310 raise ConfigValidationError( 311 f"'deps' option includes reference to task with " 312 f"'use_exec' set to true: {dep_task_name!r}" 313 ) 314 315 if self.options.uses: 316 from ..helpers import is_valid_env_var 317 318 for key, dep in self.options.uses.items(): 319 if not is_valid_env_var(key): 320 raise ConfigValidationError( 321 f"'uses' option includes invalid key: {key!r}" 322 ) 323 324 dep_task_name = dep.split(" ", 1)[0] 325 if dep_task_name not in task_specs: 326 raise ConfigValidationError( 327 "'uses' options includes reference to unknown task: " 328 f"{dep_task_name!r}" 329 ) 330 331 referenced_task = task_specs.get(dep_task_name) 332 if referenced_task.options.get("use_exec", False): 333 raise ConfigValidationError( 334 f"'uses' option references task with 'use_exec' set to " 335 f"true: {dep_task_name!r}" 336 ) 337 if referenced_task.options.get("capture_stdout"): 338 raise ConfigValidationError( 339 f"'uses' option references task with 'capture_stdout' " 340 f"option set: {dep_task_name!r}" 341 ) 342 343 def _task_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory): 344 """ 345 Perform validations on this TaskSpec that apply to a specific task type 346 """ 347 348 spec: TaskSpec 349 ctx: TaskContext 350 _parsed_args: Optional[Tuple[Dict[str, str], Tuple[str, ...]]] = None 351 352 __task_types: ClassVar[Dict[str, Type["PoeTask"]]] = {} 353 __upstream_invocations: Optional[ 354 Dict[str, Union[List[Tuple[str, ...]], Dict[str, Tuple[str, ...]]]] 355 ] = None 356 357 def __init__( 358 self, 359 spec: TaskSpec, 360 invocation: Tuple[str, ...], 361 ctx: TaskContext, 362 capture_stdout: Union[str, bool] = False, 363 ): 364 self.spec = spec 365 self.invocation = invocation 366 self.ctx = ctx 367 self.capture_stdout = spec.options.capture_stdout or capture_stdout 368 self._is_windows = sys.platform == "win32" 369 370 @property 371 def name(self): 372 return self.spec.name 373 374 @classmethod 375 def lookup_task_spec_cls(cls, task_key: str) -> Type[TaskSpec]: 376 return cls.__task_types[task_key].TaskSpec 377 378 @classmethod 379 def resolve_task_type( 380 cls, 381 task_def: TaskDef, 382 config: "PoeConfig", 383 array_item: Union[bool, str] = False, 384 ) -> Optional[str]: 385 if isinstance(task_def, str): 386 if array_item: 387 return ( 388 array_item 389 if isinstance(array_item, str) 390 else config.default_array_item_task_type 391 ) 392 else: 393 return config.default_task_type 394 395 elif isinstance(task_def, dict): 396 task_type_keys = set(task_def.keys()).intersection(cls.__task_types) 397 if len(task_type_keys) == 1: 398 return next(iter(task_type_keys)) 399 400 elif isinstance(task_def, list): 401 return config.default_array_task_type 402 403 return None 404 405 def _parse_named_args( 406 self, extra_args: Sequence[str], env: "EnvVarsManager" 407 ) -> Optional[Dict[str, str]]: 408 if task_args := self.spec.args: 409 return task_args.parse(extra_args, env, self.ctx.ui.program_name) 410 411 return None 412 413 def get_parsed_arguments( 414 self, env: "EnvVarsManager" 415 ) -> Tuple[Dict[str, str], Tuple[str, ...]]: 416 if self._parsed_args is None: 417 all_args = self.invocation[1:] 418 419 if task_args := self.spec.args: 420 try: 421 split_index = all_args.index("--") 422 option_args = all_args[:split_index] 423 extra_args = all_args[split_index + 1 :] 424 except ValueError: 425 option_args = all_args 426 extra_args = tuple() 427 428 self._parsed_args = ( 429 task_args.parse(option_args, env, self.ctx.ui.program_name), 430 extra_args, 431 ) 432 433 else: 434 self._parsed_args = ({}, all_args) 435 436 return self._parsed_args 437 438 def run( 439 self, 440 context: "RunContext", 441 parent_env: Optional["EnvVarsManager"] = None, 442 ) -> int: 443 """ 444 Run this task 445 """ 446 447 if environ.get("POE_DEBUG"): 448 task_type_key = self.__key__ # type: ignore[attr-defined] 449 print(f" * Running {task_type_key}:{self.name}") 450 print(f" . Invocation {self.invocation!r}") 451 452 upstream_invocations = self._get_upstream_invocations(context) 453 454 if context.dry and upstream_invocations.get("uses", {}): 455 self._print_action( 456 ( 457 "unresolved dependency task results via uses option for task " 458 f"{self.name!r}" 459 ), 460 dry=True, 461 unresolved=True, 462 ) 463 return 0 464 465 task_env = self.spec.get_task_env( 466 parent_env or context.env, 467 context._get_dep_values(upstream_invocations["uses"]), 468 ) 469 470 if environ.get("POE_DEBUG"): 471 named_arg_values, extra_args = self.get_parsed_arguments(task_env) 472 print(f" . Parsed args {named_arg_values!r}") 473 print(f" . Extra args {extra_args!r}") 474 475 return self._handle_run(context, task_env) 476 477 def _handle_run( 478 self, 479 context: "RunContext", 480 env: "EnvVarsManager", 481 ) -> int: 482 """ 483 This method must be implemented by a subclass and return a single executor 484 result. 485 """ 486 raise NotImplementedError 487 488 def _get_executor( 489 self, 490 context: "RunContext", 491 env: "EnvVarsManager", 492 *, 493 delegate_dry_run: bool = False, 494 ): 495 return context.get_executor( 496 self.invocation, 497 env, 498 working_dir=self.get_working_dir(env), 499 executor_config=self.spec.options.get("executor"), 500 capture_stdout=self.capture_stdout, 501 delegate_dry_run=delegate_dry_run, 502 ) 503 504 def get_working_dir( 505 self, 506 env: "EnvVarsManager", 507 ) -> Path: 508 cwd_option = env.fill_template(self.spec.options.get("cwd", self.ctx.cwd)) 509 working_dir = Path(cwd_option) 510 511 if not working_dir.is_absolute(): 512 working_dir = self.ctx.config.project_dir / working_dir 513 514 return working_dir 515 516 def iter_upstream_tasks( 517 self, context: "RunContext" 518 ) -> Iterator[Tuple[str, "PoeTask"]]: 519 invocations = self._get_upstream_invocations(context) 520 for invocation in invocations["deps"]: 521 yield ("", self._instantiate_dep(invocation, capture_stdout=False)) 522 for key, invocation in invocations["uses"].items(): 523 yield (key, self._instantiate_dep(invocation, capture_stdout=True)) 524 525 def _get_upstream_invocations(self, context: "RunContext"): 526 """ 527 NB. this memoization assumes the context (and contained env vars) will be the 528 same in all instances for the lifetime of this object. Whilst this should be OK 529 for all current usecases is it strictly speaking something that this object 530 should not know enough to safely assume. So we probably want to revisit this. 531 """ 532 import shlex 533 534 options = self.spec.options 535 536 if self.__upstream_invocations is None: 537 env = self.spec.get_task_env(context.env) 538 env.update(self.get_parsed_arguments(env)[0]) 539 540 self.__upstream_invocations = { 541 "deps": [ 542 tuple(shlex.split(env.fill_template(task_ref))) 543 for task_ref in options.get("deps", tuple()) 544 ], 545 "uses": { 546 key: tuple(shlex.split(env.fill_template(task_ref))) 547 for key, task_ref in options.get("uses", {}).items() 548 }, 549 } 550 551 return self.__upstream_invocations 552 553 def _instantiate_dep( 554 self, invocation: Tuple[str, ...], capture_stdout: bool 555 ) -> "PoeTask": 556 return self.ctx.specs.get(invocation[0]).create_task( 557 invocation=invocation, 558 ctx=TaskContext( 559 config=self.ctx.config, 560 cwd=str(self.ctx.config.project_dir), 561 specs=self.ctx.specs, 562 ui=self.ctx.ui, 563 ), 564 capture_stdout=capture_stdout, 565 ) 566 567 def has_deps(self) -> bool: 568 return bool( 569 self.spec.options.get("deps", False) or self.spec.options.get("uses", False) 570 ) 571 572 @classmethod 573 def is_task_type( 574 cls, task_def_key: str, content_type: Optional[Type] = None 575 ) -> bool: 576 """ 577 Checks whether the given key identifies a known task type. 578 Optionally also check whether the given content_type matches the type of content 579 for this tasks type. 580 """ 581 return task_def_key in cls.__task_types and ( 582 content_type is None 583 or cls.__task_types[task_def_key].__content_type__ is content_type 584 ) 585 586 @classmethod 587 def get_task_types(cls, content_type: Optional[Type] = None) -> Tuple[str, ...]: 588 if content_type: 589 return tuple( 590 task_type 591 for task_type, task_cls in cls.__task_types.items() 592 if task_cls.__content_type__ is content_type 593 ) 594 return tuple(task_type for task_type in cls.__task_types.keys()) 595 596 def _print_action(self, action: str, dry: bool, unresolved: bool = False): 597 """ 598 Print the action taken by a task just before executing it. 599 """ 600 min_verbosity = -1 if dry else 0 601 arrow = "??" if unresolved else "<=" if self.capture_stdout else "=>" 602 self.ctx.ui.print_msg( 603 f"<hl>Poe {arrow}</hl> <action>{action}</action>", min_verbosity 604 ) 605 606 class Error(Exception): 607 pass
36class MetaPoeTask(type): 37 """ 38 This metaclass makes all descendents of PoeTask (task types) register themselves on 39 declaration and validates that they include the expected class attributes. 40 """ 41 42 def __init__(cls, *args): 43 super().__init__(*args) 44 if cls.__name__ == "PoeTask": 45 return 46 47 assert isinstance(getattr(cls, "__key__", None), str) 48 assert issubclass(getattr(cls, "TaskOptions", None), PoeOptions) 49 PoeTask._PoeTask__task_types[cls.__key__] = cls 50 51 # Give each TaskSpec a reference to its parent PoeTask 52 if "TaskSpec" in cls.__dict__: 53 cls.TaskSpec.task_type = cls
This metaclass makes all descendents of PoeTask (task types) register themselves on declaration and validates that they include the expected class attributes.
42 def __init__(cls, *args): 43 super().__init__(*args) 44 if cls.__name__ == "PoeTask": 45 return 46 47 assert isinstance(getattr(cls, "__key__", None), str) 48 assert issubclass(getattr(cls, "TaskOptions", None), PoeOptions) 49 PoeTask._PoeTask__task_types[cls.__key__] = cls 50 51 # Give each TaskSpec a reference to its parent PoeTask 52 if "TaskSpec" in cls.__dict__: 53 cls.TaskSpec.task_type = cls
Inherited Members
- builtins.type
- mro
61class TaskSpecFactory: 62 __cache: Dict[str, "PoeTask.TaskSpec"] 63 config: "PoeConfig" 64 65 def __init__(self, config: "PoeConfig"): 66 self.__cache = {} 67 self.config = config 68 69 def __contains__(self, other) -> bool: 70 return other in self.__cache 71 72 def get( 73 self, 74 task_name: str, 75 task_def: Optional[TaskDef] = None, 76 task_type: Optional[str] = None, 77 parent: Optional["PoeTask.TaskSpec"] = None, 78 ) -> "PoeTask.TaskSpec": 79 if task_def and parent: 80 # This is probably a subtask and will be cached by the parent task_spec 81 if not task_type: 82 task_type = PoeTask.resolve_task_type(task_def, self.config) 83 assert task_type 84 return self.create( 85 task_name, task_type, task_def, source=parent.source, parent=parent 86 ) 87 88 if task_name not in self.__cache: 89 self.load(task_name) 90 91 return self.__cache[task_name] 92 93 def create( 94 self, 95 task_name: str, 96 task_type: str, 97 task_def: TaskDef, 98 source: "ConfigPartition", 99 parent: Optional["PoeTask.TaskSpec"] = None, 100 ) -> "PoeTask.TaskSpec": 101 """ 102 A parent task should be provided when this task is defined inline within another 103 task, for example as part of a sequence. 104 """ 105 if not isinstance(task_def, dict): 106 task_def = {task_type: task_def} 107 108 return PoeTask.lookup_task_spec_cls(task_type)( 109 name=task_name, 110 task_def=task_def, 111 factory=self, 112 source=source, 113 parent=parent, 114 ) 115 116 def load_all(self): 117 for task_name in self.config.task_names: 118 self.load(task_name) 119 120 return self 121 122 def load(self, task_name: str): 123 task_def, config_partition = self.config.lookup_task(task_name) 124 125 if task_def is None or config_partition is None: 126 raise PoeException(f"Cannot instantiate unknown task {task_name!r}") 127 128 task_type = PoeTask.resolve_task_type(task_def, self.config) 129 if not task_type: 130 raise ConfigValidationError( 131 "Task definition must be a string, a list, or a table including exactly" 132 " one task key\n" 133 f"Available task keys: {set(PoeTask.get_task_types())!r}", 134 task_name=task_name, 135 filename=( 136 None if config_partition.is_primary else str(config_partition.path) 137 ), 138 ) 139 140 self.__cache[task_name] = self.create( 141 task_name, task_type, task_def, source=config_partition 142 ) 143 144 def __iter__(self): 145 return iter(self.__cache.values())
72 def get( 73 self, 74 task_name: str, 75 task_def: Optional[TaskDef] = None, 76 task_type: Optional[str] = None, 77 parent: Optional["PoeTask.TaskSpec"] = None, 78 ) -> "PoeTask.TaskSpec": 79 if task_def and parent: 80 # This is probably a subtask and will be cached by the parent task_spec 81 if not task_type: 82 task_type = PoeTask.resolve_task_type(task_def, self.config) 83 assert task_type 84 return self.create( 85 task_name, task_type, task_def, source=parent.source, parent=parent 86 ) 87 88 if task_name not in self.__cache: 89 self.load(task_name) 90 91 return self.__cache[task_name]
93 def create( 94 self, 95 task_name: str, 96 task_type: str, 97 task_def: TaskDef, 98 source: "ConfigPartition", 99 parent: Optional["PoeTask.TaskSpec"] = None, 100 ) -> "PoeTask.TaskSpec": 101 """ 102 A parent task should be provided when this task is defined inline within another 103 task, for example as part of a sequence. 104 """ 105 if not isinstance(task_def, dict): 106 task_def = {task_type: task_def} 107 108 return PoeTask.lookup_task_spec_cls(task_type)( 109 name=task_name, 110 task_def=task_def, 111 factory=self, 112 source=source, 113 parent=parent, 114 )
A parent task should be provided when this task is defined inline within another task, for example as part of a sequence.
122 def load(self, task_name: str): 123 task_def, config_partition = self.config.lookup_task(task_name) 124 125 if task_def is None or config_partition is None: 126 raise PoeException(f"Cannot instantiate unknown task {task_name!r}") 127 128 task_type = PoeTask.resolve_task_type(task_def, self.config) 129 if not task_type: 130 raise ConfigValidationError( 131 "Task definition must be a string, a list, or a table including exactly" 132 " one task key\n" 133 f"Available task keys: {set(PoeTask.get_task_types())!r}", 134 task_name=task_name, 135 filename=( 136 None if config_partition.is_primary else str(config_partition.path) 137 ), 138 ) 139 140 self.__cache[task_name] = self.create( 141 task_name, task_type, task_def, source=config_partition 142 )
148class TaskContext(NamedTuple): 149 """ 150 Collection of contextual config inherited from a parent task to a child task 151 """ 152 153 config: "PoeConfig" 154 cwd: str 155 ui: "PoeUi" 156 specs: "TaskSpecFactory" 157 158 @classmethod 159 def from_task(cls, parent_task: "PoeTask"): 160 return cls( 161 config=parent_task.ctx.config, 162 cwd=str(parent_task.spec.options.get("cwd", parent_task.ctx.cwd)), 163 specs=parent_task.ctx.specs, 164 ui=parent_task.ctx.ui, 165 )
Collection of contextual config inherited from a parent task to a child task
Create new instance of TaskContext(config, cwd, ui, specs)
Inherited Members
- builtins.tuple
- index
- count
168class PoeTask(metaclass=MetaPoeTask): 169 __key__: ClassVar[str] 170 __content_type__: ClassVar[Type] = str 171 172 class TaskOptions(PoeOptions): 173 args: Optional[Union[dict, list]] = None 174 capture_stdout: Optional[str] = None 175 cwd: Optional[str] = None 176 deps: Optional[Sequence[str]] = None 177 env: Optional[dict] = None 178 envfile: Optional[Union[str, list]] = None 179 executor: Optional[dict] = None 180 help: Optional[str] = None 181 uses: Optional[dict] = None 182 183 def validate(self): 184 """ 185 Validation rules that don't require any extra context go here. 186 """ 187 188 class TaskSpec: 189 name: str 190 content: TaskContent 191 options: "PoeTask.TaskOptions" 192 task_type: Type["PoeTask"] 193 source: "ConfigPartition" 194 parent: Optional["PoeTask.TaskSpec"] = None 195 196 _args: Optional["PoeTaskArgs"] = None 197 198 def __init__( 199 self, 200 name: str, 201 task_def: Dict[str, Any], 202 factory: TaskSpecFactory, 203 source: "ConfigPartition", 204 parent: Optional["PoeTask.TaskSpec"] = None, 205 ): 206 self.name = name 207 self.content = task_def[self.task_type.__key__] 208 self.options = self._parse_options(task_def) 209 self.source = source 210 self.parent = parent 211 212 def _parse_options(self, task_def: Dict[str, Any]): 213 try: 214 return next( 215 self.task_type.TaskOptions.parse( 216 task_def, extra_keys=(self.task_type.__key__,) 217 ) 218 ) 219 except ConfigValidationError as error: 220 error.task_name = self.name 221 raise 222 223 def get_task_env( 224 self, 225 parent_env: "EnvVarsManager", 226 uses_values: Optional[Mapping[str, str]] = None, 227 ) -> "EnvVarsManager": 228 """ 229 Resolve the EnvVarsManager for this task, relative to the given parent_env 230 """ 231 task_envfile = self.options.get("envfile") 232 task_env = self.options.get("env") 233 234 result = parent_env.clone() 235 236 # Include env vars from outputs of upstream dependencies 237 if uses_values: 238 result.update(uses_values) 239 240 result.set("POE_CONF_DIR", str(self.source.config_dir)) 241 result.apply_env_config( 242 task_envfile, 243 task_env, 244 config_dir=self.source.config_dir, 245 config_working_dir=self.source.cwd, 246 ) 247 248 return result 249 250 @property 251 def args(self) -> Optional["PoeTaskArgs"]: 252 from .args import PoeTaskArgs 253 254 if not self._args and self.options.args: 255 self._args = PoeTaskArgs(self.options.args, self.name) 256 257 return self._args 258 259 def create_task( 260 self, 261 invocation: Tuple[str, ...], 262 ctx: TaskContext, 263 capture_stdout: Union[str, bool] = False, 264 ) -> "PoeTask": 265 return self.task_type( 266 spec=self, 267 invocation=invocation, 268 capture_stdout=capture_stdout, 269 ctx=ctx, 270 ) 271 272 def validate(self, config: "PoeConfig", task_specs: TaskSpecFactory): 273 try: 274 self._base_validations(config, task_specs) 275 self._task_validations(config, task_specs) 276 except ConfigValidationError as error: 277 error.task_name = self.name 278 raise 279 280 def _base_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory): 281 """ 282 Perform validations on this TaskSpec that apply to all task types 283 """ 284 if not (self.name[0].isalpha() or self.name[0] == "_"): 285 raise ConfigValidationError( 286 "Task names must start with a letter or underscore." 287 ) 288 289 if not self.parent and not _TASK_NAME_PATTERN.match(self.name): 290 raise ConfigValidationError( 291 "Task names characters must be alphanumeric, colon, underscore or " 292 "dash." 293 ) 294 295 if not isinstance(self.content, self.task_type.__content_type__): 296 raise ConfigValidationError( 297 f"Content for {self.task_type.__name__} must be a " 298 f"{self.task_type.__content_type__.__name__}" 299 ) 300 301 if self.options.deps: 302 for dep in self.options.deps: 303 dep_task_name = dep.split(" ", 1)[0] 304 if dep_task_name not in task_specs: 305 raise ConfigValidationError( 306 "'deps' option includes reference to unknown task: " 307 f"{dep_task_name!r}" 308 ) 309 310 if task_specs.get(dep_task_name).options.get("use_exec", False): 311 raise ConfigValidationError( 312 f"'deps' option includes reference to task with " 313 f"'use_exec' set to true: {dep_task_name!r}" 314 ) 315 316 if self.options.uses: 317 from ..helpers import is_valid_env_var 318 319 for key, dep in self.options.uses.items(): 320 if not is_valid_env_var(key): 321 raise ConfigValidationError( 322 f"'uses' option includes invalid key: {key!r}" 323 ) 324 325 dep_task_name = dep.split(" ", 1)[0] 326 if dep_task_name not in task_specs: 327 raise ConfigValidationError( 328 "'uses' options includes reference to unknown task: " 329 f"{dep_task_name!r}" 330 ) 331 332 referenced_task = task_specs.get(dep_task_name) 333 if referenced_task.options.get("use_exec", False): 334 raise ConfigValidationError( 335 f"'uses' option references task with 'use_exec' set to " 336 f"true: {dep_task_name!r}" 337 ) 338 if referenced_task.options.get("capture_stdout"): 339 raise ConfigValidationError( 340 f"'uses' option references task with 'capture_stdout' " 341 f"option set: {dep_task_name!r}" 342 ) 343 344 def _task_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory): 345 """ 346 Perform validations on this TaskSpec that apply to a specific task type 347 """ 348 349 spec: TaskSpec 350 ctx: TaskContext 351 _parsed_args: Optional[Tuple[Dict[str, str], Tuple[str, ...]]] = None 352 353 __task_types: ClassVar[Dict[str, Type["PoeTask"]]] = {} 354 __upstream_invocations: Optional[ 355 Dict[str, Union[List[Tuple[str, ...]], Dict[str, Tuple[str, ...]]]] 356 ] = None 357 358 def __init__( 359 self, 360 spec: TaskSpec, 361 invocation: Tuple[str, ...], 362 ctx: TaskContext, 363 capture_stdout: Union[str, bool] = False, 364 ): 365 self.spec = spec 366 self.invocation = invocation 367 self.ctx = ctx 368 self.capture_stdout = spec.options.capture_stdout or capture_stdout 369 self._is_windows = sys.platform == "win32" 370 371 @property 372 def name(self): 373 return self.spec.name 374 375 @classmethod 376 def lookup_task_spec_cls(cls, task_key: str) -> Type[TaskSpec]: 377 return cls.__task_types[task_key].TaskSpec 378 379 @classmethod 380 def resolve_task_type( 381 cls, 382 task_def: TaskDef, 383 config: "PoeConfig", 384 array_item: Union[bool, str] = False, 385 ) -> Optional[str]: 386 if isinstance(task_def, str): 387 if array_item: 388 return ( 389 array_item 390 if isinstance(array_item, str) 391 else config.default_array_item_task_type 392 ) 393 else: 394 return config.default_task_type 395 396 elif isinstance(task_def, dict): 397 task_type_keys = set(task_def.keys()).intersection(cls.__task_types) 398 if len(task_type_keys) == 1: 399 return next(iter(task_type_keys)) 400 401 elif isinstance(task_def, list): 402 return config.default_array_task_type 403 404 return None 405 406 def _parse_named_args( 407 self, extra_args: Sequence[str], env: "EnvVarsManager" 408 ) -> Optional[Dict[str, str]]: 409 if task_args := self.spec.args: 410 return task_args.parse(extra_args, env, self.ctx.ui.program_name) 411 412 return None 413 414 def get_parsed_arguments( 415 self, env: "EnvVarsManager" 416 ) -> Tuple[Dict[str, str], Tuple[str, ...]]: 417 if self._parsed_args is None: 418 all_args = self.invocation[1:] 419 420 if task_args := self.spec.args: 421 try: 422 split_index = all_args.index("--") 423 option_args = all_args[:split_index] 424 extra_args = all_args[split_index + 1 :] 425 except ValueError: 426 option_args = all_args 427 extra_args = tuple() 428 429 self._parsed_args = ( 430 task_args.parse(option_args, env, self.ctx.ui.program_name), 431 extra_args, 432 ) 433 434 else: 435 self._parsed_args = ({}, all_args) 436 437 return self._parsed_args 438 439 def run( 440 self, 441 context: "RunContext", 442 parent_env: Optional["EnvVarsManager"] = None, 443 ) -> int: 444 """ 445 Run this task 446 """ 447 448 if environ.get("POE_DEBUG"): 449 task_type_key = self.__key__ # type: ignore[attr-defined] 450 print(f" * Running {task_type_key}:{self.name}") 451 print(f" . Invocation {self.invocation!r}") 452 453 upstream_invocations = self._get_upstream_invocations(context) 454 455 if context.dry and upstream_invocations.get("uses", {}): 456 self._print_action( 457 ( 458 "unresolved dependency task results via uses option for task " 459 f"{self.name!r}" 460 ), 461 dry=True, 462 unresolved=True, 463 ) 464 return 0 465 466 task_env = self.spec.get_task_env( 467 parent_env or context.env, 468 context._get_dep_values(upstream_invocations["uses"]), 469 ) 470 471 if environ.get("POE_DEBUG"): 472 named_arg_values, extra_args = self.get_parsed_arguments(task_env) 473 print(f" . Parsed args {named_arg_values!r}") 474 print(f" . Extra args {extra_args!r}") 475 476 return self._handle_run(context, task_env) 477 478 def _handle_run( 479 self, 480 context: "RunContext", 481 env: "EnvVarsManager", 482 ) -> int: 483 """ 484 This method must be implemented by a subclass and return a single executor 485 result. 486 """ 487 raise NotImplementedError 488 489 def _get_executor( 490 self, 491 context: "RunContext", 492 env: "EnvVarsManager", 493 *, 494 delegate_dry_run: bool = False, 495 ): 496 return context.get_executor( 497 self.invocation, 498 env, 499 working_dir=self.get_working_dir(env), 500 executor_config=self.spec.options.get("executor"), 501 capture_stdout=self.capture_stdout, 502 delegate_dry_run=delegate_dry_run, 503 ) 504 505 def get_working_dir( 506 self, 507 env: "EnvVarsManager", 508 ) -> Path: 509 cwd_option = env.fill_template(self.spec.options.get("cwd", self.ctx.cwd)) 510 working_dir = Path(cwd_option) 511 512 if not working_dir.is_absolute(): 513 working_dir = self.ctx.config.project_dir / working_dir 514 515 return working_dir 516 517 def iter_upstream_tasks( 518 self, context: "RunContext" 519 ) -> Iterator[Tuple[str, "PoeTask"]]: 520 invocations = self._get_upstream_invocations(context) 521 for invocation in invocations["deps"]: 522 yield ("", self._instantiate_dep(invocation, capture_stdout=False)) 523 for key, invocation in invocations["uses"].items(): 524 yield (key, self._instantiate_dep(invocation, capture_stdout=True)) 525 526 def _get_upstream_invocations(self, context: "RunContext"): 527 """ 528 NB. this memoization assumes the context (and contained env vars) will be the 529 same in all instances for the lifetime of this object. Whilst this should be OK 530 for all current usecases is it strictly speaking something that this object 531 should not know enough to safely assume. So we probably want to revisit this. 532 """ 533 import shlex 534 535 options = self.spec.options 536 537 if self.__upstream_invocations is None: 538 env = self.spec.get_task_env(context.env) 539 env.update(self.get_parsed_arguments(env)[0]) 540 541 self.__upstream_invocations = { 542 "deps": [ 543 tuple(shlex.split(env.fill_template(task_ref))) 544 for task_ref in options.get("deps", tuple()) 545 ], 546 "uses": { 547 key: tuple(shlex.split(env.fill_template(task_ref))) 548 for key, task_ref in options.get("uses", {}).items() 549 }, 550 } 551 552 return self.__upstream_invocations 553 554 def _instantiate_dep( 555 self, invocation: Tuple[str, ...], capture_stdout: bool 556 ) -> "PoeTask": 557 return self.ctx.specs.get(invocation[0]).create_task( 558 invocation=invocation, 559 ctx=TaskContext( 560 config=self.ctx.config, 561 cwd=str(self.ctx.config.project_dir), 562 specs=self.ctx.specs, 563 ui=self.ctx.ui, 564 ), 565 capture_stdout=capture_stdout, 566 ) 567 568 def has_deps(self) -> bool: 569 return bool( 570 self.spec.options.get("deps", False) or self.spec.options.get("uses", False) 571 ) 572 573 @classmethod 574 def is_task_type( 575 cls, task_def_key: str, content_type: Optional[Type] = None 576 ) -> bool: 577 """ 578 Checks whether the given key identifies a known task type. 579 Optionally also check whether the given content_type matches the type of content 580 for this tasks type. 581 """ 582 return task_def_key in cls.__task_types and ( 583 content_type is None 584 or cls.__task_types[task_def_key].__content_type__ is content_type 585 ) 586 587 @classmethod 588 def get_task_types(cls, content_type: Optional[Type] = None) -> Tuple[str, ...]: 589 if content_type: 590 return tuple( 591 task_type 592 for task_type, task_cls in cls.__task_types.items() 593 if task_cls.__content_type__ is content_type 594 ) 595 return tuple(task_type for task_type in cls.__task_types.keys()) 596 597 def _print_action(self, action: str, dry: bool, unresolved: bool = False): 598 """ 599 Print the action taken by a task just before executing it. 600 """ 601 min_verbosity = -1 if dry else 0 602 arrow = "??" if unresolved else "<=" if self.capture_stdout else "=>" 603 self.ctx.ui.print_msg( 604 f"<hl>Poe {arrow}</hl> <action>{action}</action>", min_verbosity 605 ) 606 607 class Error(Exception): 608 pass
358 def __init__( 359 self, 360 spec: TaskSpec, 361 invocation: Tuple[str, ...], 362 ctx: TaskContext, 363 capture_stdout: Union[str, bool] = False, 364 ): 365 self.spec = spec 366 self.invocation = invocation 367 self.ctx = ctx 368 self.capture_stdout = spec.options.capture_stdout or capture_stdout 369 self._is_windows = sys.platform == "win32"
379 @classmethod 380 def resolve_task_type( 381 cls, 382 task_def: TaskDef, 383 config: "PoeConfig", 384 array_item: Union[bool, str] = False, 385 ) -> Optional[str]: 386 if isinstance(task_def, str): 387 if array_item: 388 return ( 389 array_item 390 if isinstance(array_item, str) 391 else config.default_array_item_task_type 392 ) 393 else: 394 return config.default_task_type 395 396 elif isinstance(task_def, dict): 397 task_type_keys = set(task_def.keys()).intersection(cls.__task_types) 398 if len(task_type_keys) == 1: 399 return next(iter(task_type_keys)) 400 401 elif isinstance(task_def, list): 402 return config.default_array_task_type 403 404 return None
414 def get_parsed_arguments( 415 self, env: "EnvVarsManager" 416 ) -> Tuple[Dict[str, str], Tuple[str, ...]]: 417 if self._parsed_args is None: 418 all_args = self.invocation[1:] 419 420 if task_args := self.spec.args: 421 try: 422 split_index = all_args.index("--") 423 option_args = all_args[:split_index] 424 extra_args = all_args[split_index + 1 :] 425 except ValueError: 426 option_args = all_args 427 extra_args = tuple() 428 429 self._parsed_args = ( 430 task_args.parse(option_args, env, self.ctx.ui.program_name), 431 extra_args, 432 ) 433 434 else: 435 self._parsed_args = ({}, all_args) 436 437 return self._parsed_args
439 def run( 440 self, 441 context: "RunContext", 442 parent_env: Optional["EnvVarsManager"] = None, 443 ) -> int: 444 """ 445 Run this task 446 """ 447 448 if environ.get("POE_DEBUG"): 449 task_type_key = self.__key__ # type: ignore[attr-defined] 450 print(f" * Running {task_type_key}:{self.name}") 451 print(f" . Invocation {self.invocation!r}") 452 453 upstream_invocations = self._get_upstream_invocations(context) 454 455 if context.dry and upstream_invocations.get("uses", {}): 456 self._print_action( 457 ( 458 "unresolved dependency task results via uses option for task " 459 f"{self.name!r}" 460 ), 461 dry=True, 462 unresolved=True, 463 ) 464 return 0 465 466 task_env = self.spec.get_task_env( 467 parent_env or context.env, 468 context._get_dep_values(upstream_invocations["uses"]), 469 ) 470 471 if environ.get("POE_DEBUG"): 472 named_arg_values, extra_args = self.get_parsed_arguments(task_env) 473 print(f" . Parsed args {named_arg_values!r}") 474 print(f" . Extra args {extra_args!r}") 475 476 return self._handle_run(context, task_env)
Run this task
505 def get_working_dir( 506 self, 507 env: "EnvVarsManager", 508 ) -> Path: 509 cwd_option = env.fill_template(self.spec.options.get("cwd", self.ctx.cwd)) 510 working_dir = Path(cwd_option) 511 512 if not working_dir.is_absolute(): 513 working_dir = self.ctx.config.project_dir / working_dir 514 515 return working_dir
517 def iter_upstream_tasks( 518 self, context: "RunContext" 519 ) -> Iterator[Tuple[str, "PoeTask"]]: 520 invocations = self._get_upstream_invocations(context) 521 for invocation in invocations["deps"]: 522 yield ("", self._instantiate_dep(invocation, capture_stdout=False)) 523 for key, invocation in invocations["uses"].items(): 524 yield (key, self._instantiate_dep(invocation, capture_stdout=True))
573 @classmethod 574 def is_task_type( 575 cls, task_def_key: str, content_type: Optional[Type] = None 576 ) -> bool: 577 """ 578 Checks whether the given key identifies a known task type. 579 Optionally also check whether the given content_type matches the type of content 580 for this tasks type. 581 """ 582 return task_def_key in cls.__task_types and ( 583 content_type is None 584 or cls.__task_types[task_def_key].__content_type__ is content_type 585 )
Checks whether the given key identifies a known task type. Optionally also check whether the given content_type matches the type of content for this tasks type.
587 @classmethod 588 def get_task_types(cls, content_type: Optional[Type] = None) -> Tuple[str, ...]: 589 if content_type: 590 return tuple( 591 task_type 592 for task_type, task_cls in cls.__task_types.items() 593 if task_cls.__content_type__ is content_type 594 ) 595 return tuple(task_type for task_type in cls.__task_types.keys())
172 class TaskOptions(PoeOptions): 173 args: Optional[Union[dict, list]] = None 174 capture_stdout: Optional[str] = None 175 cwd: Optional[str] = None 176 deps: Optional[Sequence[str]] = None 177 env: Optional[dict] = None 178 envfile: Optional[Union[str, list]] = None 179 executor: Optional[dict] = None 180 help: Optional[str] = None 181 uses: Optional[dict] = None 182 183 def validate(self): 184 """ 185 Validation rules that don't require any extra context go here. 186 """
A special kind of config object that parses options ...
183 def validate(self): 184 """ 185 Validation rules that don't require any extra context go here. 186 """
Validation rules that don't require any extra context go here.
Inherited Members
- poethepoet.options.PoeOptions
- PoeOptions
- parse
- normalize
- get
- update
- type_of
- get_annotation
- get_fields
188 class TaskSpec: 189 name: str 190 content: TaskContent 191 options: "PoeTask.TaskOptions" 192 task_type: Type["PoeTask"] 193 source: "ConfigPartition" 194 parent: Optional["PoeTask.TaskSpec"] = None 195 196 _args: Optional["PoeTaskArgs"] = None 197 198 def __init__( 199 self, 200 name: str, 201 task_def: Dict[str, Any], 202 factory: TaskSpecFactory, 203 source: "ConfigPartition", 204 parent: Optional["PoeTask.TaskSpec"] = None, 205 ): 206 self.name = name 207 self.content = task_def[self.task_type.__key__] 208 self.options = self._parse_options(task_def) 209 self.source = source 210 self.parent = parent 211 212 def _parse_options(self, task_def: Dict[str, Any]): 213 try: 214 return next( 215 self.task_type.TaskOptions.parse( 216 task_def, extra_keys=(self.task_type.__key__,) 217 ) 218 ) 219 except ConfigValidationError as error: 220 error.task_name = self.name 221 raise 222 223 def get_task_env( 224 self, 225 parent_env: "EnvVarsManager", 226 uses_values: Optional[Mapping[str, str]] = None, 227 ) -> "EnvVarsManager": 228 """ 229 Resolve the EnvVarsManager for this task, relative to the given parent_env 230 """ 231 task_envfile = self.options.get("envfile") 232 task_env = self.options.get("env") 233 234 result = parent_env.clone() 235 236 # Include env vars from outputs of upstream dependencies 237 if uses_values: 238 result.update(uses_values) 239 240 result.set("POE_CONF_DIR", str(self.source.config_dir)) 241 result.apply_env_config( 242 task_envfile, 243 task_env, 244 config_dir=self.source.config_dir, 245 config_working_dir=self.source.cwd, 246 ) 247 248 return result 249 250 @property 251 def args(self) -> Optional["PoeTaskArgs"]: 252 from .args import PoeTaskArgs 253 254 if not self._args and self.options.args: 255 self._args = PoeTaskArgs(self.options.args, self.name) 256 257 return self._args 258 259 def create_task( 260 self, 261 invocation: Tuple[str, ...], 262 ctx: TaskContext, 263 capture_stdout: Union[str, bool] = False, 264 ) -> "PoeTask": 265 return self.task_type( 266 spec=self, 267 invocation=invocation, 268 capture_stdout=capture_stdout, 269 ctx=ctx, 270 ) 271 272 def validate(self, config: "PoeConfig", task_specs: TaskSpecFactory): 273 try: 274 self._base_validations(config, task_specs) 275 self._task_validations(config, task_specs) 276 except ConfigValidationError as error: 277 error.task_name = self.name 278 raise 279 280 def _base_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory): 281 """ 282 Perform validations on this TaskSpec that apply to all task types 283 """ 284 if not (self.name[0].isalpha() or self.name[0] == "_"): 285 raise ConfigValidationError( 286 "Task names must start with a letter or underscore." 287 ) 288 289 if not self.parent and not _TASK_NAME_PATTERN.match(self.name): 290 raise ConfigValidationError( 291 "Task names characters must be alphanumeric, colon, underscore or " 292 "dash." 293 ) 294 295 if not isinstance(self.content, self.task_type.__content_type__): 296 raise ConfigValidationError( 297 f"Content for {self.task_type.__name__} must be a " 298 f"{self.task_type.__content_type__.__name__}" 299 ) 300 301 if self.options.deps: 302 for dep in self.options.deps: 303 dep_task_name = dep.split(" ", 1)[0] 304 if dep_task_name not in task_specs: 305 raise ConfigValidationError( 306 "'deps' option includes reference to unknown task: " 307 f"{dep_task_name!r}" 308 ) 309 310 if task_specs.get(dep_task_name).options.get("use_exec", False): 311 raise ConfigValidationError( 312 f"'deps' option includes reference to task with " 313 f"'use_exec' set to true: {dep_task_name!r}" 314 ) 315 316 if self.options.uses: 317 from ..helpers import is_valid_env_var 318 319 for key, dep in self.options.uses.items(): 320 if not is_valid_env_var(key): 321 raise ConfigValidationError( 322 f"'uses' option includes invalid key: {key!r}" 323 ) 324 325 dep_task_name = dep.split(" ", 1)[0] 326 if dep_task_name not in task_specs: 327 raise ConfigValidationError( 328 "'uses' options includes reference to unknown task: " 329 f"{dep_task_name!r}" 330 ) 331 332 referenced_task = task_specs.get(dep_task_name) 333 if referenced_task.options.get("use_exec", False): 334 raise ConfigValidationError( 335 f"'uses' option references task with 'use_exec' set to " 336 f"true: {dep_task_name!r}" 337 ) 338 if referenced_task.options.get("capture_stdout"): 339 raise ConfigValidationError( 340 f"'uses' option references task with 'capture_stdout' " 341 f"option set: {dep_task_name!r}" 342 ) 343 344 def _task_validations(self, config: "PoeConfig", task_specs: TaskSpecFactory): 345 """ 346 Perform validations on this TaskSpec that apply to a specific task type 347 """
198 def __init__( 199 self, 200 name: str, 201 task_def: Dict[str, Any], 202 factory: TaskSpecFactory, 203 source: "ConfigPartition", 204 parent: Optional["PoeTask.TaskSpec"] = None, 205 ): 206 self.name = name 207 self.content = task_def[self.task_type.__key__] 208 self.options = self._parse_options(task_def) 209 self.source = source 210 self.parent = parent
223 def get_task_env( 224 self, 225 parent_env: "EnvVarsManager", 226 uses_values: Optional[Mapping[str, str]] = None, 227 ) -> "EnvVarsManager": 228 """ 229 Resolve the EnvVarsManager for this task, relative to the given parent_env 230 """ 231 task_envfile = self.options.get("envfile") 232 task_env = self.options.get("env") 233 234 result = parent_env.clone() 235 236 # Include env vars from outputs of upstream dependencies 237 if uses_values: 238 result.update(uses_values) 239 240 result.set("POE_CONF_DIR", str(self.source.config_dir)) 241 result.apply_env_config( 242 task_envfile, 243 task_env, 244 config_dir=self.source.config_dir, 245 config_working_dir=self.source.cwd, 246 ) 247 248 return result
Resolve the EnvVarsManager for this task, relative to the given parent_env
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args