Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding timeout functionality to action nodes for ineffectual work functions #62

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 3 additions & 56 deletions beams/behavior_tree/ActionNode.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,18 @@
import atexit
import logging
import os
import time
from multiprocessing import Event, Queue, Value
from typing import Callable
from multiprocessing import Event

import py_trees

from beams.behavior_tree.ActionWorker import wrapped_action_work # noqa: F401
from beams.behavior_tree.ActionWorker import ActionWorker
from beams.behavior_tree.VolatileStatus import VolatileStatus
from beams.typing_helper import (ActionNodeWorkFunction, ActionNodeWorkLoop,
Evaluatable)
from beams.typing_helper import ActionNodeWorkLoop, Evaluatable

logger = logging.getLogger(__name__)


def wrapped_action_work(loop_period_sec: float = 0.1):
def action_worker_work_function_generator(func: ActionNodeWorkFunction) -> ActionNodeWorkLoop:
def work_wrapper(
do_work: Value,
name: str,
work_gate: Event,
volatile_status: VolatileStatus,
completion_condition: Evaluatable,
log_queue: Queue,
log_configurer: Callable) -> None:
"""
Wrap self.work_func, and set up logging / status communication
InterProcess Communication performed by shared memory objects:
- volatile status
- logging queue

Runs a persistent while loop, in which the work func is called repeatedly
"""
log_configurer(log_queue)
while (do_work.value):
logger.debug(f"WAITING FOR INIT from node: {name}")
work_gate.wait()
work_gate.clear()

# Set to running
volatile_status.set_value(py_trees.common.Status.RUNNING)
while not completion_condition():
logger.debug(f"CALLING CAGET FROM from node ({name})")
try:
status = func(completion_condition)
except Exception as ex:
volatile_status.set_value(py_trees.common.Status.FAILURE)
logger.error(f"Work function failed, setting node ({name}) "
f"as FAILED. ({ex})")
break

volatile_status.set_value(status)
logger.debug(f"Setting node ({name}): {volatile_status.get_value()}")
time.sleep(loop_period_sec)

# one last check
if completion_condition():
volatile_status.set_value(py_trees.common.Status.SUCCESS)
else:
volatile_status.set_value(py_trees.common.Status.FAILURE)

logger.debug(f"Worker for node ({name}) completed.")
return work_wrapper
return action_worker_work_function_generator


class ActionNode(py_trees.behaviour.Behaviour):
def __init__(
self,
Expand Down
73 changes: 69 additions & 4 deletions beams/behavior_tree/ActionWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,79 @@
* LOGGER_QUEUE: instance of the logging queue
* worker_logging_configurer: utility functuon to register log queue with handler
"""
from multiprocessing import Event
from typing import Any, Callable, Optional
import logging
import time
from multiprocessing import Event, Queue, Value
from typing import Callable, Optional

import py_trees
from epics.multiproc import CAProcess

from beams.behavior_tree.VolatileStatus import VolatileStatus
from beams.logging import LOGGER_QUEUE, worker_logging_configurer
from beams.sequencer.helpers.Timer import Timer
from beams.sequencer.helpers.Worker import Worker
from beams.typing_helper import (ActionNodeWorkFunction, ActionNodeWorkLoop,
Evaluatable)

logger = logging.getLogger(__name__)


def wrapped_action_work(loop_period_sec: float = 0.1, work_function_timeout_period_sec: float = 2):
def action_worker_work_function_generator(func: ActionNodeWorkFunction) -> ActionNodeWorkLoop:
def work_wrapper(
do_work: Value,
name: str,
work_gate: Event,
volatile_status: VolatileStatus,
completion_condition: Evaluatable,
log_queue: Queue,
log_configurer: Callable) -> None:
"""
Wrap self.work_func, and set up logging / status communication
InterProcess Communication performed by shared memory objects:
- volatile status
- logging queue

Runs a persistent while loop, in which the work func is called repeatedly
"""
log_configurer(log_queue)
work_loop_timeout_timer = Timer(name=name,
timer_period_seconds=work_function_timeout_period_sec,
auto_start=False)
while (do_work.value):
logger.debug(f"WAITING FOR INIT from node: {name}")
work_gate.wait()
work_gate.clear()

# Set to running
volatile_status.set_value(py_trees.common.Status.RUNNING)
# Start timer
work_loop_timeout_timer.start_timer()
while not completion_condition() and not work_loop_timeout_timer.is_elapsed():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we only check the timer at the top of the while loop, if a work function never completes / hangs forever, this won't actually timeout will it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, at present this doesn't have a mechanism to send a kill signal to a hung function. Within Worker we can tie these functionalities together... it is a good, deeper idea. The intention here was if futile work was being done, i.e. the function is returning but having no effect on the success condition, to time out that work

logger.debug(f"CALLING CAGET FROM from node ({name})")
try:
status = func(completion_condition)
except Exception as ex:
volatile_status.set_value(py_trees.common.Status.FAILURE)
logger.error(f"Work function failed, setting node ({name}) "
f"as FAILED. ({ex})")
break

volatile_status.set_value(status)
logger.debug(f"Setting node ({name}): {volatile_status.get_value()}")
time.sleep(loop_period_sec)

# check if we exited loop because we timed out or we succeeded at task
if completion_condition():
logger.debug(f"Worker for node ({name}) completed.")
volatile_status.set_value(py_trees.common.Status.SUCCESS)
else:
logger.debug(f"Worker for node ({name}) failed.")
volatile_status.set_value(py_trees.common.Status.FAILURE)

return work_wrapper
return action_worker_work_function_generator


class ActionWorker(Worker):
Expand All @@ -27,8 +92,8 @@ def __init__(
proc_name: str,
volatile_status: VolatileStatus,
work_gate: Event,
work_func: Callable[[Any], None],
comp_cond: Callable[[Any], bool],
work_func: Callable[..., None],
comp_cond: Callable[..., bool],
stop_func: Optional[Callable[[None], None]] = None
):
super().__init__(
Expand Down
38 changes: 38 additions & 0 deletions beams/sequencer/helpers/Timer.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pep8 says module names should be lowercase

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you don't have to change it because we need to do a mass update later to bring the package into compliance

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import time


class Timer():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mixed feelings about this class name because it collides with threading.Timer from the standard library

def __init__(self,
name: str,
timer_period_seconds: float,
auto_start: bool = False,
is_periodic: bool = False):
self.name = name
self.timer_period_seconds = timer_period_seconds
self.is_periodic = is_periodic
self.auto_start = auto_start
if (self.auto_start):
self.timer_start_time = time.monotonic()
else:
self.timer_start_time = -1

def start_timer(self) -> None:
self.timer_start_time = time.time()

def check_valid_timer(self) -> bool:
if (self.timer_start_time == -1):
raise RuntimeError(f"{self.name} timer checked but not started")

def is_elapsed(self) -> bool:
elapsed = self.get_elapsed()
if (elapsed > self.timer_period_seconds):
if (self.is_periodic):
self.timer_start_time = time.time()
return True
Comment on lines +29 to +31
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what this periodic clause would be used for?
It's unintuitive that a timer would restart itself when checked.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great point. It was more out of habit from previously implemented timers that I found the pattern of "one shot" vs "periodic" timer distinction helpful. Here I am not sure if there is much (or may even be negative) value add. Let me reflect for a moment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay on further reflection. You are 100% right that the timer within ActionWorker does not need to be periodic. I would however like to keep the notion of periodic timer in its current implementation for this use case:

A periodic timer when checked if elapsed (and it is) will emit a rising edge signal that is lowered thereafter which is convenient for breaking out of loops at fixed frequency, if not set as periodic the elapsed signal will stay high

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://en.wikipedia.org/wiki/You_aren%27t_gonna_need_it I might advocate for adding this when we need it, if we later discover we need it.

Is the distinction between a stays-high signal and a periodic-break signal even relevant here in python land? We should design our "signals" in the way that works for our use case

else:
return False

def get_elapsed(self) -> float:
self.check_valid_timer()
now = time.time()
return now - self.timer_start_time
29 changes: 29 additions & 0 deletions beams/tests/test_leaf_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,35 @@ def comp_cond():
assert percentage_complete.value == 100


def test_action_node_timeout():
# For test
percentage_complete = Value("i", 0)

@wrapped_action_work(loop_period_sec=0.001, work_function_timeout_period_sec=.002)
def work_func(comp_condition: Callable) -> Status:
percentage_complete.value += 10
if comp_condition():
return Status.SUCCESS
logger.debug(f"pct complete -> {percentage_complete.value}")
return Status.RUNNING

def comp_cond():
return percentage_complete.value >= 100

action = ActionNode(name="action", work_func=work_func,
completion_condition=comp_cond)
action.setup()

while action.status not in (
Status.SUCCESS,
Status.FAILURE,
):
time.sleep(0.01)
action.tick_once()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this testing the right thing? The timeout is supposed to trigger within a tick right? Should the sleep be within the work function?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is correct, the sleep here is more like the tree's tick rate

assert action.status == Status.FAILURE
assert percentage_complete.value != 100


def test_condition_node():
def condition_fn():
return True
Expand Down
37 changes: 37 additions & 0 deletions beams/tests/test_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import time

import pytest

from beams.sequencer.helpers.Timer import Timer


def test_elapsed():
t = Timer(name="test_elapsed",
timer_period_seconds=0.1,
is_periodic=False)
t.start_timer()
assert not t.is_elapsed()
time.sleep(0.5)
assert t.is_elapsed()


def test_timer_error():
t = Timer(name="test_error_not_started",
timer_period_seconds=0.1,
is_periodic=False)
with pytest.raises(RuntimeError):
t.get_elapsed()
with pytest.raises(RuntimeError):
t.is_elapsed()


def test_periodic():
t = Timer(name="test_error_not_started",
timer_period_seconds=0.1,
auto_start=True,
is_periodic=True)
time.sleep(0.2)
assert t.is_elapsed()
assert not t.is_elapsed()
time.sleep(0.1)
assert t.is_elapsed()