Skip to content

Commit

Permalink
Merge pull request #273 from zillow/tz/AIP-7511-exit-handler
Browse files Browse the repository at this point in the history
AIP-7511 exit_handler DAG - Part 1
  • Loading branch information
talebzeghmi authored Nov 30, 2023
2 parents db524bb + 97bee89 commit e75b38f
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 50 deletions.
151 changes: 132 additions & 19 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,72 @@ def _create_workflow_yaml(

KubeflowPipelines._add_archive_section_to_cards_artifacts(workflow)

if "onExit" in workflow["spec"]:
# replace entrypoint content with the exit handler handler content
"""
# What it looks like beforehand...
entrypoint: helloflow
templates:
- name: exit-handler-1
dag:
tasks:
- name: end
template: end
dependencies: [start]
- {name: start, template: start}
- name: helloflow
dag:
tasks:
- {name: exit-handler-1, template: exit-handler-1}
- {name: sqs-exit-handler, template: sqs-exit-handler}
"""
# find the exit-handler-1 template
exit_handler_template: dict = [
template
for template in workflow["spec"]["templates"]
if template["name"] == "exit-handler-1"
][0]

# find the entrypoint template
entrypoint_template: dict = [
template
for template in workflow["spec"]["templates"]
if template["name"] == workflow["spec"]["entrypoint"]
][0]

# replace the entrypoint template with the exit handler template
entrypoint_template["dag"] = exit_handler_template["dag"]

# rename exit-handler-1 to exit-handler
exit_handler_template["name"] = "exit-handler"
workflow["spec"]["onExit"] = "exit-handler"

# initialize
exit_handler_template["dag"] = {"tasks": []}
if self.sqs_url_on_error:
exit_handler_template["dag"]["tasks"].append(
{
"name": "sqs-exit-handler",
"template": "sqs-exit-handler",
"when": "{{workflow.status}} != 'Succeeded'",
}
)

if self.notify:
notify_task = {
"name": "notify-email-exit-handler",
"template": "notify-email-exit-handler",
}

if self.notify_on_success:
# Always run, even on failure because METAFLOW_NOTIFY_ON_ERROR
# can be injected by the AIP webhook.
pass
else:
notify_task["when"] = "{{workflow.status}} != 'Succeeded'"

exit_handler_template["dag"]["tasks"].append(notify_task)

return workflow

@staticmethod
Expand Down Expand Up @@ -1239,11 +1305,26 @@ def call_build_kfp_dag(workflow_uid_op: ContainerOp):
)

if self.notify or self.sqs_url_on_error:
with dsl.ExitHandler(
self._create_exit_handler_op(
op = (
self._create_notify_exit_handler_op(
flow_variables.package_commands, flow_parameters
)
):
if self.notify
else None
)

# The following exit handler gets created and added as a ContainerOp
# and also as a parallel task to the Argo template "exit-handler-1"
# (the hardcoded kfp compiler name of the exit handler)
# We replace, and rename, this parallel task dag with dag of steps in _create_workflow_yaml().
op2 = (
self._create_sqs_exit_handler_op(
flow_variables.package_commands, flow_parameters
)
if self.sqs_url_on_error
else None
)
with dsl.ExitHandler(op if op else op2):
s3_sensor_op: Optional[ContainerOp] = self.create_s3_sensor_op(
flow_variables,
)
Expand Down Expand Up @@ -1551,12 +1632,36 @@ def _create_s3_sensor_op(
)
return s3_sensor_op

def _create_exit_handler_op(
def _create_sqs_exit_handler_op(
self,
package_commands: str,
flow_parameters: Dict,
) -> ContainerOp:
env_variables: dict = {
key: from_conf(key)
for key in [
"ARGO_RUN_URL_PREFIX",
]
if from_conf(key)
}

if self.sqs_role_arn_on_error:
env_variables["METAFLOW_SQS_ROLE_ARN_ON_ERROR"] = self.sqs_role_arn_on_error

return self._get_aip_exit_handler_op(
flow_parameters,
env_variables,
package_commands,
name="sqs-exit-handler",
flag="--run_sqs_on_error",
)

def _create_notify_exit_handler_op(
self,
package_commands: str,
flow_parameters: Dict,
) -> ContainerOp:
notify_variables: dict = {
env_variables: dict = {
key: from_conf(key)
for key in [
"METAFLOW_NOTIFY_EMAIL_FROM",
Expand All @@ -1569,19 +1674,27 @@ def _create_exit_handler_op(
}

if self.notify_on_error:
notify_variables["METAFLOW_NOTIFY_ON_ERROR"] = self.notify_on_error
env_variables["METAFLOW_NOTIFY_ON_ERROR"] = self.notify_on_error

if self.notify_on_success:
notify_variables["METAFLOW_NOTIFY_ON_SUCCESS"] = self.notify_on_success

if self.sqs_url_on_error:
notify_variables["METAFLOW_SQS_URL_ON_ERROR"] = self.sqs_url_on_error

if self.sqs_role_arn_on_error:
notify_variables[
"METAFLOW_SQS_ROLE_ARN_ON_ERROR"
] = self.sqs_role_arn_on_error
env_variables["METAFLOW_NOTIFY_ON_SUCCESS"] = self.notify_on_success

return self._get_aip_exit_handler_op(
flow_parameters,
env_variables,
package_commands,
name="notify-email-exit-handler",
flag="--run_email_notify",
)

def _get_aip_exit_handler_op(
self,
flow_parameters: Dict,
env_variables: Dict,
package_commands: str,
name: str,
flag: str = "",
) -> ContainerOp:
# when there are no flow parameters argo complains
# that {{workflow.parameters}} failed to resolve
# see https://github.com/argoproj/argo-workflows/issues/6036
Expand All @@ -1594,19 +1707,19 @@ def _create_exit_handler_op(
" && python -m metaflow.plugins.aip.aip_exit_handler"
f" --flow_name {self.name}"
" --run_id {{workflow.name}}"
f" --notify_variables_json {json.dumps(json.dumps(notify_variables))}"
f" --env_variables_json {json.dumps(json.dumps(env_variables))}"
f" --flow_parameters_json {flow_parameters_json if flow_parameters else '{}'}"
" --status {{workflow.status}}"
f" {flag}"
),
]

return (
dsl.ContainerOp(
name="exit_handler",
name=name,
image=self.base_image,
command=exit_handler_command,
)
.set_display_name("exit_handler")
.set_display_name(name)
.set_retry(
EXIT_HANDLER_RETRY_COUNT,
policy="Always",
Expand Down
69 changes: 38 additions & 31 deletions metaflow/plugins/aip/aip_exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
@click.option("--flow_name")
@click.option("--status")
@click.option("--run_id")
@click.option("--notify_variables_json")
@click.option("--env_variables_json")
@click.option("--flow_parameters_json")
@click.option("--run_email_notify", is_flag=True, default=False)
@click.option("--run_sqs_on_error", is_flag=True, default=False)
def exit_handler(
flow_name: str,
status: str,
run_id: str,
notify_variables_json: str,
env_variables_json: str,
flow_parameters_json: str,
run_email_notify: bool = False,
run_sqs_on_error: bool = False,
):
"""
The environment variables that this depends on:
Expand All @@ -38,13 +42,12 @@ def exit_handler(
import boto3
from botocore.session import Session

notify_variables: Dict[str, str] = json.loads(notify_variables_json)
env_variables: Dict[str, str] = json.loads(env_variables_json)

def get_env(name, default=None) -> str:
return notify_variables.get(name, os.environ.get(name, default=default))
return env_variables.get(name, os.environ.get(name, default=default))

def email_notify(send_to):
import posixpath
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
Expand Down Expand Up @@ -133,34 +136,38 @@ def send_sqs_message(queue_url: str, message_body: str, *, role_arn: str = None)
_logger.error(err)
raise err

notify_on_error = get_env("METAFLOW_NOTIFY_ON_ERROR")
notify_on_success = get_env("METAFLOW_NOTIFY_ON_SUCCESS")

print(f"Flow completed with status={status}")
if notify_on_error and status == "Failed":
email_notify(notify_on_error)
elif notify_on_success and status == "Succeeded":
email_notify(notify_on_success)
else:
print("No notification is necessary!")

# Send message to SQS if 'METAFLOW_SQS_URL_ON_ERROR' is set
metaflow_sqs_url_on_error = get_env("METAFLOW_SQS_URL_ON_ERROR")

if metaflow_sqs_url_on_error:
if status == "Failed":
message_body = flow_parameters_json
metaflow_sqs_role_arn_on_error = get_env("METAFLOW_SQS_ROLE_ARN_ON_ERROR")
send_sqs_message(
metaflow_sqs_url_on_error,
message_body,
role_arn=metaflow_sqs_role_arn_on_error,
)
print(f"message was sent to: {metaflow_sqs_url_on_error} successfully")
if run_email_notify:
notify_on_error = get_env("METAFLOW_NOTIFY_ON_ERROR")
notify_on_success = get_env("METAFLOW_NOTIFY_ON_SUCCESS")

if notify_on_error and status == "Failed":
email_notify(notify_on_error)
elif notify_on_success and status == "Succeeded":
email_notify(notify_on_success)
else:
print("No notification is necessary!")

if run_sqs_on_error:
# Send message to SQS if 'METAFLOW_SQS_URL_ON_ERROR' is set
metaflow_sqs_url_on_error = get_env("METAFLOW_SQS_URL_ON_ERROR")

if metaflow_sqs_url_on_error:
if status == "Failed":
message_body = flow_parameters_json
metaflow_sqs_role_arn_on_error = get_env(
"METAFLOW_SQS_ROLE_ARN_ON_ERROR"
)
send_sqs_message(
metaflow_sqs_url_on_error,
message_body,
role_arn=metaflow_sqs_role_arn_on_error,
)
print(f"message was sent to: {metaflow_sqs_url_on_error} successfully")
else:
print("Workflow succeeded, thus no SQS message is sent to SQS!")
else:
print("Workflow succeeded, thus no SQS message is sent to SQS!")
else:
print("SQS is not configured!")
print("SQS is not configured!")


if __name__ == "__main__":
Expand Down

0 comments on commit e75b38f

Please sign in to comment.