Skip to content

Commit

Permalink
parallel exit handler & conditional steps
Browse files Browse the repository at this point in the history
  • Loading branch information
talebzeghmi committed Nov 29, 2023
1 parent 8414100 commit 8752a8a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 22 deletions.
57 changes: 39 additions & 18 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,19 +333,32 @@ def _create_workflow_yaml(
# rename exit-handler-1 to exit-handler
exit_handler_template["name"] = "exit-handler"
workflow["spec"]["onExit"] = "exit-handler"
exit_handler_template["dag"] = {
"tasks": [
{
"name": "sqs-exit-handler",
"template": "sqs-exit-handler",
"dependencies": ["notify-email-exit-handler"],
},
{
"name": "notify-email-exit-handler",
"template": "notify-email-exit-handler",
},
]
}

if self.sqs_url_on_error:
exit_handler_template["dag"] = {
"tasks": [
{
"name": "sqs-exit-handler",
"template": "sqs-exit-handler",
"when": "{{workflow.status}} != 'Succeeded'",
},
]
}

if self.notify:
notify_task = {
"name": "notify-email-exit-handler",
"template": "notify-email-exit-handler",
}

if self.notify_on_success:
# Always run, even on failure because METAFLOW_NOTIFY_ON_ERROR
# can be injected by the AIP webhook.
pass
else:
notify_task["when"] = "{{workflow.status}} != 'Succeeded'"

exit_handler_template["dag"]["tasks"].append(notify_task)

return workflow

Expand Down Expand Up @@ -1292,18 +1305,26 @@ def call_build_kfp_dag(workflow_uid_op: ContainerOp):
)

if self.notify or self.sqs_url_on_error:
op = self._create_notify_exit_handler_op(
flow_variables.package_commands, flow_parameters
op = (
self._create_notify_exit_handler_op(
flow_variables.package_commands, flow_parameters
)
if self.notify
else None
)

# The following exit handler gets created and added as a ContainerOp
# and also as a parallel task to the Argo template "exit-handler-1"
# (the hardcoded kfp compiler name of the exit handler)
# We replace, and rename, this parallel task dag with dag of steps in _create_workflow_yaml().
self._create_sqs_exit_handler_op(
flow_variables.package_commands, flow_parameters
op2 = (
self._create_sqs_exit_handler_op(
flow_variables.package_commands, flow_parameters
)
if self.sqs_url_on_error
else None
)
with dsl.ExitHandler(op):
with dsl.ExitHandler(op if op else op2):
s3_sensor_op: Optional[ContainerOp] = self.create_s3_sensor_op(
flow_variables,
)
Expand Down
7 changes: 3 additions & 4 deletions metaflow/plugins/aip/aip_exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
@click.option("--run_id")
@click.option("--env_variables_json")
@click.option("--flow_parameters_json")
@click.option("--run_email_notify", is_flag=True)
@click.option("--run_sqs_on_error", is_flag=True)
@click.option("--run_email_notify", is_flag=True, default=False)
@click.option("--run_sqs_on_error", is_flag=True, default=False)
def exit_handler(
flow_name: str,
status: str,
Expand Down Expand Up @@ -48,7 +48,6 @@ def get_env(name, default=None) -> str:
return env_variables.get(name, os.environ.get(name, default=default))

def email_notify(send_to):
import posixpath
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
Expand Down Expand Up @@ -137,11 +136,11 @@ def send_sqs_message(queue_url: str, message_body: str, *, role_arn: str = None)
_logger.error(err)
raise err

print(f"Flow completed with status={status}")
if run_email_notify:
notify_on_error = get_env("METAFLOW_NOTIFY_ON_ERROR")
notify_on_success = get_env("METAFLOW_NOTIFY_ON_SUCCESS")

print(f"Flow completed with status={status}")
if notify_on_error and status == "Failed":
email_notify(notify_on_error)
elif notify_on_success and status == "Succeeded":
Expand Down

0 comments on commit 8752a8a

Please sign in to comment.