forked from mariolpantunes/chord
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDHTNode.py
275 lines (230 loc) · 9.32 KB
/
DHTNode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
""" Chord DHT node implementation. """
import socket
import threading
import logging
import pickle
from utils import dht_hash, contains
class FingerTable:
"""Finger Table."""
def __init__(self, node_id, node_addr, m_bits=10):
""" Initialize Finger Table."""
pass
def fill(self, node_id, node_addr):
""" Fill all entries of finger_table with node_id, node_addr."""
pass
def update(self, index, node_id, node_addr):
"""Update index of table with node_id and node_addr."""
pass
def find(self, identification):
""" Get node address of closest preceding node (in finger table) of identification. """
pass
def refresh(self):
""" Retrieve finger table entries requiring refresh."""
pass
def getIdxFromId(self, id):
pass
def __repr__(self):
pass
@property
def as_list(self):
"""return the finger table as a list of tuples: (identifier, (host, port)).
NOTE: list index 0 corresponds to finger_table index 1
"""
pass
class DHTNode(threading.Thread):
""" DHT Node Agent. """
def __init__(self, address, dht_address=None, timeout=3):
"""Constructor
Parameters:
address: self's address
dht_address: address of a node in the DHT
timeout: impacts how often stabilize algorithm is carried out
"""
threading.Thread.__init__(self)
self.done = False
self.identification = dht_hash(address.__str__())
self.addr = address # My address
self.dht_address = dht_address # Address of the initial Node
if dht_address is None:
self.inside_dht = True
# I'm my own successor
self.successor_id = self.identification
self.successor_addr = address
self.predecessor_id = None
self.predecessor_addr = None
else:
self.inside_dht = False
self.successor_id = None
self.successor_addr = None
self.predecessor_id = None
self.predecessor_addr = None
self.finger_table = None #TODO create finger_table
self.keystore = {} # Where all data is stored
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(timeout)
self.logger = logging.getLogger("Node {}".format(self.identification))
def send(self, address, msg):
""" Send msg to address. """
payload = pickle.dumps(msg)
self.socket.sendto(payload, address)
def recv(self):
""" Retrieve msg payload and from address."""
try:
payload, addr = self.socket.recvfrom(1024)
except socket.timeout:
return None, None
if len(payload) == 0:
return None, addr
return payload, addr
def node_join(self, args):
"""Process JOIN_REQ message.
Parameters:
args (dict): addr and id of the node trying to join
"""
self.logger.debug("Node join: %s", args)
addr = args["addr"]
identification = args["id"]
if self.identification == self.successor_id: # I'm the only node in the DHT
self.successor_id = identification
self.successor_addr = addr
#TODO update finger table
args = {"successor_id": self.identification, "successor_addr": self.addr}
self.send(addr, {"method": "JOIN_REP", "args": args})
elif contains(self.identification, self.successor_id, identification):
args = {
"successor_id": self.successor_id,
"successor_addr": self.successor_addr,
}
self.successor_id = identification
self.successor_addr = addr
#TODO update finger table
self.send(addr, {"method": "JOIN_REP", "args": args})
else:
self.logger.debug("Find Successor(%d)", args["id"])
self.send(self.successor_addr, {"method": "JOIN_REQ", "args": args})
self.logger.info(self)
def get_successor(self, args):
"""Process SUCCESSOR message.
Parameters:
args (dict): addr and id of the node asking
"""
self.logger.debug("Get successor: %s", args)
#TODO Implement processing of SUCCESSOR message
pass
def notify(self, args):
"""Process NOTIFY message.
Updates predecessor pointers.
Parameters:
args (dict): id and addr of the predecessor node
"""
self.logger.debug("Notify: %s", args)
if self.predecessor_id is None or contains(
self.predecessor_id, self.identification, args["predecessor_id"]
):
self.predecessor_id = args["predecessor_id"]
self.predecessor_addr = args["predecessor_addr"]
self.logger.info(self)
def stabilize(self, from_id, addr):
"""Process STABILIZE protocol.
Updates all successor pointers.
Parameters:
from_id: id of the predecessor of node with address addr
addr: address of the node sending stabilize message
"""
self.logger.debug("Stabilize: %s %s", from_id, addr)
if from_id is not None and contains(
self.identification, self.successor_id, from_id
):
# Update our successor
self.successor_id = from_id
self.successor_addr = addr
#TODO update finger table
# notify successor of our existence, so it can update its predecessor record
args = {"predecessor_id": self.identification, "predecessor_addr": self.addr}
self.send(self.successor_addr, {"method": "NOTIFY", "args": args})
# TODO refresh finger_table
def put(self, key, value, address):
"""Store value in DHT.
Parameters:
key: key of the data
value: data to be stored
address: address where to send ack/nack
"""
key_hash = dht_hash(key)
self.logger.debug("Put: %s %s", key, key_hash)
#TODO Replace next code:
self.send(address, {"method": "NACK"})
def get(self, key, address):
"""Retrieve value from DHT.
Parameters:
key: key of the data
address: address where to send ack/nack
"""
key_hash = dht_hash(key)
self.logger.debug("Get: %s %s", key, key_hash)
#TODO Replace next code:
self.send(address, {"method": "NACK"})
def run(self):
self.socket.bind(self.addr)
# Loop untiln joining the DHT
while not self.inside_dht:
join_msg = {
"method": "JOIN_REQ",
"args": {"addr": self.addr, "id": self.identification},
}
self.send(self.dht_address, join_msg)
payload, addr = self.recv()
if payload is not None:
output = pickle.loads(payload)
self.logger.debug("O: %s", output)
if output["method"] == "JOIN_REP":
args = output["args"]
self.successor_id = args["successor_id"]
self.successor_addr = args["successor_addr"]
#TODO fill finger table
self.inside_dht = True
self.logger.info(self)
while not self.done:
payload, addr = self.recv()
if payload is not None:
output = pickle.loads(payload)
self.logger.info("O: %s", output)
if output["method"] == "JOIN_REQ":
self.node_join(output["args"])
elif output["method"] == "NOTIFY":
self.notify(output["args"])
elif output["method"] == "PUT":
self.put(
output["args"]["key"],
output["args"]["value"],
output["args"].get("from", addr),
)
elif output["method"] == "GET":
self.get(output["args"]["key"], output["args"].get("from", addr))
elif output["method"] == "PREDECESSOR":
# Reply with predecessor id
self.send(
addr, {"method": "STABILIZE", "args": self.predecessor_id}
)
elif output["method"] == "SUCCESSOR":
# Reply with successor of id
self.get_successor(output["args"])
elif output["method"] == "STABILIZE":
# Initiate stabilize protocol
self.stabilize(output["args"], addr)
elif output["method"] == "SUCCESSOR_REP":
#TODO Implement processing of SUCCESSOR_REP
pass
else: # timeout occurred, lets run the stabilize algorithm
# Ask successor for predecessor, to start the stabilize process
self.send(self.successor_addr, {"method": "PREDECESSOR"})
def __str__(self):
return "Node ID: {}; DHT: {}; Successor: {}; Predecessor: {}; FingerTable: {}".format(
self.identification,
self.inside_dht,
self.successor_id,
self.predecessor_id,
self.finger_table,
)
def __repr__(self):
return self.__str__()