-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathnet.py
32 lines (23 loc) · 905 Bytes
/
net.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import ipaddress
import socket
from typing import Optional
from _logger import get_logger, VERBOSE
from exceptions import ParseErrorBadIPv6, SocketClosedByPeer
logger = get_logger()
def read_socket(sock: socket.socket, byte_count: int) -> Optional[bytes]: # ideally this should either raise or return None, not both
data = bytearray()
while len(data) < byte_count:
try:
packet = sock.recv(byte_count - len(data))
except OSError:
logger.log(VERBOSE, f"Error while reading {byte_count} bytes", exc_info=True)
return None
if len(packet) > 0:
data.extend(packet)
else:
raise SocketClosedByPeer('read_socket: data=%s' % data)
return bytes(data)
def parse_ipv6(data: bytes) -> ipaddress.IPv6Address:
if len(data) != 16:
raise ParseErrorBadIPv6()
return ipaddress.IPv6Address(data)