Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid busy-waiting in SimVisaLibrary.read #114

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "PyVISA-sim"
description = "Simulated backend for PyVISA implementing TCPIP, GPIB, RS232, and USB resources"
readme = "README.rst"
requires-python = ">=3.7"
requires-python = ">=3.8"
license = {file = "LICENSE.txt"}
authors = [
{name = "Hernan E. Grecco", email = "hernan.grecco@gmail.com"},
Expand Down
69 changes: 53 additions & 16 deletions pyvisa_sim/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

"""

from typing import Deque, Dict, List, Optional, Tuple, Union
from threading import Lock
from typing import Deque, Dict, Final, List, Optional, Tuple, Union

from pyvisa import constants, rname

Expand Down Expand Up @@ -56,6 +57,51 @@ def clear(self) -> None:
_value: int


class OutputQueue:
"""Store output in a FIFO queue."""

# N.B.: We don't use a `deque[tuple[bytes, bool]]` or `SimpleQueue[tuple[bytes,
# bool]]` for this as
# * It would require splitting each bytearray message and pushing (length-1 bytes
# object, bool) to the deque for each byte in the message.
# * We would still need a separate lock, as they do not have thread-safe `.extend`
# methods.

def __init__(self) -> None:
self._message_buffers: Final[Deque[bytearray]] = Deque()
self._lock: Final = Lock()
self._readable = False

def append_message(self, message: bytearray) -> None:
"""Append a message to the output queue."""
with self._lock:
self._message_buffers.append(message)
self._readable = True

def read(self, timeout: float) -> Tuple[bytes, bool]:
"""
Return a single byte from the output queue and whether it is accompanied by an
END indicator.
"""
if not self._lock.acquire(timeout=timeout):
return b"", False
try:
if not self._readable:
return b"", False

current_message_buffer = self._message_buffers[0]
b = int_to_byte(current_message_buffer.pop(0))
if current_message_buffer:
return b, False
else:
self._message_buffers.popleft()
if not self._message_buffers:
self._readable = False
return b, True
finally:
self._lock.release()


class ErrorQueue:
"""Store error messages in a FIFO queue.

Expand Down Expand Up @@ -134,7 +180,7 @@ def __init__(self, name: str, delimiter: bytes) -> None:
self._status_registers = {}
self._error_map = {}
self._eoms = {}
self._output_buffers = Deque()
self._output_queue = OutputQueue()
self._input_buffer = bytearray()
self._error_queues = {}

Expand Down Expand Up @@ -251,26 +297,17 @@ def write(self, data: bytes) -> None:
assert response is not None

if response is not NoResponse:
self._output_buffers.append(bytearray(response) + eom)
self._output_queue.append_message(bytearray(response) + eom)

finally:
self._input_buffer = bytearray()

def read(self) -> Tuple[bytes, bool]:
def read(self, timeout: float) -> Tuple[bytes, bool]:
"""
Return a single byte from the output buffer and whether it is accompanied by an
END indicator.
"""
if not self._output_buffers:
return b"", False

output_buffer = self._output_buffers[0]
b = int_to_byte(output_buffer.pop(0))
if output_buffer:
return b, False
else:
self._output_buffers.popleft()
return b, True
return self._output_queue.read(timeout=timeout)

# --- Private API

Expand Down Expand Up @@ -300,8 +337,8 @@ def read(self) -> Tuple[bytes, bool]:
#: TYPE CLASS -> (query termination, response termination)
_eoms: Dict[Tuple[constants.InterfaceType, str], Tuple[bytes, bytes]]

#: Deque of buffers in which the user can read
_output_buffers: Deque[bytearray]
#: Queue in which the user can read
_output_buffer: OutputQueue

#: Buffer in which the user can write
_input_buffer: bytearray
Expand Down
8 changes: 2 additions & 6 deletions pyvisa_sim/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ def read(self, count: int) -> Tuple[bytes, constants.StatusCode]:

out = bytearray()

while time.monotonic() - start <= timeout:
last, end_indicator = self.device.read()
while (timeout_remaining := (timeout - (time.monotonic() - start))) >= 0:
last, end_indicator = self.device.read(timeout=timeout_remaining)

out += last

Expand Down Expand Up @@ -270,10 +270,6 @@ def read(self, count: int) -> Tuple[bytes, constants.StatusCode]:
elif len(out) == count:
# Rule 6.1.3.
return out, constants.StatusCode.success_max_count_read

# Busy-wait only if the device's output buffer was empty.
if not last:
time.sleep(0.01)
else:
return out, constants.StatusCode.error_timeout

Expand Down
Loading