From 8752a8ac4258a3e7f0f983266e1fe1d3074afaf0 Mon Sep 17 00:00:00 2001 From: Taleb Zeghmi Date: Wed, 29 Nov 2023 14:32:06 -0800 Subject: [PATCH] parallel exit handler & conditional steps --- metaflow/plugins/aip/aip.py | 57 ++++++++++++++++-------- metaflow/plugins/aip/aip_exit_handler.py | 7 ++- 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/metaflow/plugins/aip/aip.py b/metaflow/plugins/aip/aip.py index d9047683f98..b8609ad834b 100644 --- a/metaflow/plugins/aip/aip.py +++ b/metaflow/plugins/aip/aip.py @@ -333,19 +333,32 @@ def _create_workflow_yaml( # rename exit-handler-1 to exit-handler exit_handler_template["name"] = "exit-handler" workflow["spec"]["onExit"] = "exit-handler" - exit_handler_template["dag"] = { - "tasks": [ - { - "name": "sqs-exit-handler", - "template": "sqs-exit-handler", - "dependencies": ["notify-email-exit-handler"], - }, - { - "name": "notify-email-exit-handler", - "template": "notify-email-exit-handler", - }, - ] - } + + if self.sqs_url_on_error: + exit_handler_template["dag"] = { + "tasks": [ + { + "name": "sqs-exit-handler", + "template": "sqs-exit-handler", + "when": "{{workflow.status}} != 'Succeeded'", + }, + ] + } + + if self.notify: + notify_task = { + "name": "notify-email-exit-handler", + "template": "notify-email-exit-handler", + } + + if self.notify_on_success: + # Always run, even on failure because METAFLOW_NOTIFY_ON_ERROR + # can be injected by the AIP webhook. + pass + else: + notify_task["when"] = "{{workflow.status}} != 'Succeeded'" + + exit_handler_template["dag"]["tasks"].append(notify_task) return workflow @@ -1292,18 +1305,26 @@ def call_build_kfp_dag(workflow_uid_op: ContainerOp): ) if self.notify or self.sqs_url_on_error: - op = self._create_notify_exit_handler_op( - flow_variables.package_commands, flow_parameters + op = ( + self._create_notify_exit_handler_op( + flow_variables.package_commands, flow_parameters + ) + if self.notify + else None ) # The following exit handler gets created and added as a ContainerOp # and also as a parallel task to the Argo template "exit-handler-1" # (the hardcoded kfp compiler name of the exit handler) # We replace, and rename, this parallel task dag with dag of steps in _create_workflow_yaml(). - self._create_sqs_exit_handler_op( - flow_variables.package_commands, flow_parameters + op2 = ( + self._create_sqs_exit_handler_op( + flow_variables.package_commands, flow_parameters + ) + if self.sqs_url_on_error + else None ) - with dsl.ExitHandler(op): + with dsl.ExitHandler(op if op else op2): s3_sensor_op: Optional[ContainerOp] = self.create_s3_sensor_op( flow_variables, ) diff --git a/metaflow/plugins/aip/aip_exit_handler.py b/metaflow/plugins/aip/aip_exit_handler.py index 9b517bcb3db..c395df7ffb9 100644 --- a/metaflow/plugins/aip/aip_exit_handler.py +++ b/metaflow/plugins/aip/aip_exit_handler.py @@ -12,8 +12,8 @@ @click.option("--run_id") @click.option("--env_variables_json") @click.option("--flow_parameters_json") -@click.option("--run_email_notify", is_flag=True) -@click.option("--run_sqs_on_error", is_flag=True) +@click.option("--run_email_notify", is_flag=True, default=False) +@click.option("--run_sqs_on_error", is_flag=True, default=False) def exit_handler( flow_name: str, status: str, @@ -48,7 +48,6 @@ def get_env(name, default=None) -> str: return env_variables.get(name, os.environ.get(name, default=default)) def email_notify(send_to): - import posixpath import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText @@ -137,11 +136,11 @@ def send_sqs_message(queue_url: str, message_body: str, *, role_arn: str = None) _logger.error(err) raise err + print(f"Flow completed with status={status}") if run_email_notify: notify_on_error = get_env("METAFLOW_NOTIFY_ON_ERROR") notify_on_success = get_env("METAFLOW_NOTIFY_ON_SUCCESS") - print(f"Flow completed with status={status}") if notify_on_error and status == "Failed": email_notify(notify_on_error) elif notify_on_success and status == "Succeeded":