diff --git a/beams/behavior_tree/ActionNode.py b/beams/behavior_tree/ActionNode.py index 2300dfd..0599aa8 100644 --- a/beams/behavior_tree/ActionNode.py +++ b/beams/behavior_tree/ActionNode.py @@ -1,71 +1,17 @@ import atexit import logging import os -import time -from multiprocessing import Event, Queue, Value -from typing import Callable +from multiprocessing import Event import py_trees -from beams.behavior_tree.ActionWorker import ActionWorker +from beams.behavior_tree.ActionWorker import ActionWorker, wrapped_action_work # the latter is grabbed as a pas through from beams.behavior_tree.VolatileStatus import VolatileStatus -from beams.typing_helper import (ActionNodeWorkFunction, ActionNodeWorkLoop, - Evaluatable) +from beams.typing_helper import ActionNodeWorkLoop, Evaluatable logger = logging.getLogger(__name__) -def wrapped_action_work(loop_period_sec: float = 0.1): - def action_worker_work_function_generator(func: ActionNodeWorkFunction) -> ActionNodeWorkLoop: - def work_wrapper( - do_work: Value, - name: str, - work_gate: Event, - volatile_status: VolatileStatus, - completion_condition: Evaluatable, - log_queue: Queue, - log_configurer: Callable) -> None: - """ - Wrap self.work_func, and set up logging / status communication - InterProcess Communication performed by shared memory objects: - - volatile status - - logging queue - - Runs a persistent while loop, in which the work func is called repeatedly - """ - log_configurer(log_queue) - while (do_work.value): - logger.debug(f"WAITING FOR INIT from node: {name}") - work_gate.wait() - work_gate.clear() - - # Set to running - volatile_status.set_value(py_trees.common.Status.RUNNING) - while not completion_condition(): - logger.debug(f"CALLING CAGET FROM from node ({name})") - try: - status = func(completion_condition) - except Exception as ex: - volatile_status.set_value(py_trees.common.Status.FAILURE) - logger.error(f"Work function failed, setting node ({name}) " - f"as FAILED. ({ex})") - break - - volatile_status.set_value(status) - logger.debug(f"Setting node ({name}): {volatile_status.get_value()}") - time.sleep(loop_period_sec) - - # one last check - if completion_condition(): - volatile_status.set_value(py_trees.common.Status.SUCCESS) - else: - volatile_status.set_value(py_trees.common.Status.FAILURE) - - logger.debug(f"Worker for node ({name}) completed.") - return work_wrapper - return action_worker_work_function_generator - - class ActionNode(py_trees.behaviour.Behaviour): def __init__( self, diff --git a/beams/behavior_tree/ActionWorker.py b/beams/behavior_tree/ActionWorker.py index da41f8f..e68b940 100644 --- a/beams/behavior_tree/ActionWorker.py +++ b/beams/behavior_tree/ActionWorker.py @@ -11,14 +11,81 @@ * LOGGER_QUEUE: instance of the logging queue * worker_logging_configurer: utility functuon to register log queue with handler """ -from multiprocessing import Event -from typing import Any, Callable, Optional +import logging +import time +from multiprocessing import Event, Value, Queue +from typing import Callable, Optional from epics.multiproc import CAProcess +import py_trees + from beams.behavior_tree.VolatileStatus import VolatileStatus from beams.logging import LOGGER_QUEUE, worker_logging_configurer +from beams.typing_helper import (ActionNodeWorkFunction, ActionNodeWorkLoop, + Evaluatable) from beams.sequencer.helpers.Worker import Worker +from beams.sequencer.helpers.Timer import Timer + +logger = logging.getLogger(__name__) + + +def wrapped_action_work(loop_period_sec: float = 0.1, work_function_timeout_period_sec: float = 2): + def action_worker_work_function_generator(func: ActionNodeWorkFunction) -> ActionNodeWorkLoop: + def work_wrapper( + do_work: Value, + name: str, + work_gate: Event, + volatile_status: VolatileStatus, + completion_condition: Evaluatable, + log_queue: Queue, + log_configurer: Callable) -> None: + """ + Wrap self.work_func, and set up logging / status communication + InterProcess Communication performed by shared memory objects: + - volatile status + - logging queue + + Runs a persistent while loop, in which the work func is called repeatedly + """ + log_configurer(log_queue) + work_loop_timeout_timer = Timer(name=name, + timer_period_seconds=work_function_timeout_period_sec, + auto_start=False, + is_periodic=True) + while (do_work.value): + logger.debug(f"WAITING FOR INIT from node: {name}") + work_gate.wait() + work_gate.clear() + + # Set to running + volatile_status.set_value(py_trees.common.Status.RUNNING) + # Start timer + work_loop_timeout_timer.start_timer() + while not completion_condition() and not work_loop_timeout_timer.is_elapsed(): + logger.debug(f"CALLING CAGET FROM from node ({name})") + try: + status = func(completion_condition) + except Exception as ex: + volatile_status.set_value(py_trees.common.Status.FAILURE) + logger.error(f"Work function failed, setting node ({name}) " + f"as FAILED. ({ex})") + break + + volatile_status.set_value(status) + logger.debug(f"Setting node ({name}): {volatile_status.get_value()}") + time.sleep(loop_period_sec) + + # check if we exited loop because we timed out or we succeeded at task + if completion_condition(): + logger.debug(f"Worker for node ({name}) completed.") + volatile_status.set_value(py_trees.common.Status.SUCCESS) + else: + logger.debug(f"Worker for node ({name}) failed.") + volatile_status.set_value(py_trees.common.Status.FAILURE) + + return work_wrapper + return action_worker_work_function_generator class ActionWorker(Worker): @@ -27,8 +94,8 @@ def __init__( proc_name: str, volatile_status: VolatileStatus, work_gate: Event, - work_func: Callable[[Any], None], - comp_cond: Callable[[Any], bool], + work_func: Callable[..., None], + comp_cond: Callable[..., bool], stop_func: Optional[Callable[[None], None]] = None ): super().__init__( diff --git a/beams/tests/test_leaf_node.py b/beams/tests/test_leaf_node.py index 674ef7d..a834194 100644 --- a/beams/tests/test_leaf_node.py +++ b/beams/tests/test_leaf_node.py @@ -35,6 +35,35 @@ def comp_cond(): assert percentage_complete.value == 100 +def test_action_node_timeout(): + # For test + percentage_complete = Value("i", 0) + + @wrapped_action_work(loop_period_sec=0.001, work_function_timeout_period_sec=.002) + def work_func(comp_condition: Callable) -> Status: + percentage_complete.value += 10 + if comp_condition(): + return Status.SUCCESS + logger.debug(f"pct complete -> {percentage_complete.value}") + return Status.RUNNING + + def comp_cond(): + return percentage_complete.value >= 100 + + action = ActionNode(name="action", work_func=work_func, + completion_condition=comp_cond) + action.setup() + + while action.status not in ( + Status.SUCCESS, + Status.FAILURE, + ): + time.sleep(0.01) + action.tick_once() + assert action.status == Status.FAILURE + assert percentage_complete.value != 100 + + def test_condition_node(): def condition_fn(): return True