Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-restart jobs on hang #177

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion sisyphus/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def get_job_used_resources(self, current_process):
This function should only be used by the worker.

:param psutil.Process current_process:
:param engine_selector:
"""
d = {}

Expand Down
10 changes: 8 additions & 2 deletions sisyphus/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Optional
import gzip
import logging
import os
Expand All @@ -12,6 +13,7 @@
import time
from threading import Thread, Condition

import sisyphus
import sisyphus.global_settings as gs
import sisyphus.job_path
import sisyphus.toolkit
Expand Down Expand Up @@ -57,7 +59,7 @@ def __init__(self, job, task, task_id):
self.task = task
self.task_id = task_id
self.start_time = None
super().__init__(daemon=True)
super().__init__(daemon=True, name="LoggingThread")
self.out_of_memory = False
self._cond = Condition()
self.__stop = False
Expand Down Expand Up @@ -223,7 +225,7 @@ def worker_helper(args):
path.cached = False

# find task
task = None
task: Optional[sisyphus.Task] = None
for task_check in job._sis_tasks():
if task_check.name() == args.task_name:
task = task_check
Expand All @@ -240,6 +242,10 @@ def worker_helper(args):
logging_thread = LoggingThread(job, task, task_id)
logging_thread.start()

# TODO init hang detection thread here...
# based on task.running(task_id), which causes
# "Job marked as running but logging file has not been updated" warnings when False

sisyphus.job_path.Path.cacheing_enabled = True
resume_job = False
gs.active_engine.init_worker(task)
Expand Down
Loading