Skip to content

Commit 2075ea0

Browse files
author
Andres Correa Casablanca
committed
Isolate workaround nodes from experiment network
Signed-off-by: Andres Correa Casablanca <andres@thirdhash.com>
1 parent 0ffb472 commit 2075ea0

File tree

2 files changed

+142
-82
lines changed

2 files changed

+142
-82
lines changed

experiments/forking_simulation.py

Lines changed: 130 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@
3838
getLogger
3939
)
4040
from math import floor
41-
from os import environ
41+
from os import (
42+
environ,
43+
mkdir
44+
)
4245
from os.path import (
4346
dirname,
4447
exists as path_exists,
@@ -51,6 +54,8 @@
5154
from tempfile import mkdtemp
5255
from time import time as _time
5356
from typing import (
57+
Dict,
58+
Iterable,
5459
List,
5560
Optional,
5661
Set,
@@ -132,7 +137,7 @@ def __init__(
132137

133138
# Required to interact with the network & the nodes
134139
self.loop = loop
135-
self.nodes: List[TestNode] = []
140+
self.nodes: Dict[int, TestNode] = {}
136141
self.nodes_hub: Optional[NodesHub] = None
137142
self.proposer_node_ids: List[int] = []
138143
self.validator_node_ids: List[int] = []
@@ -145,11 +150,18 @@ def run(self) -> bool:
145150
self.setup_chain()
146151
self.setup_nodes()
147152

148-
try:
149-
if self.num_validator_nodes > 0:
153+
if self.num_validator_nodes > 0:
154+
try:
150155
self.autofinalization_workaround()
156+
except BaseException as e:
157+
self.logger.critical(
158+
'Workaround execution failure', exc_info=e
159+
)
160+
return False
161+
try:
151162
self.start_nodes()
152-
except (OSError, AssertionError):
163+
except (OSError, AssertionError) as e:
164+
self.logger.critical('Unable to start nodes', exc_info=e)
153165
return False # Early shutdown
154166

155167
self.nodes_hub = NodesHub(
@@ -163,14 +175,19 @@ def run(self) -> bool:
163175
self.nodes_hub.sync_start_proxies()
164176
self.nodes_hub.sync_connect_nodes_graph(self.graph_edges)
165177

166-
# Notice that the validators have already loaded their wallets
167178
self.logger.info('Importing wallets')
168-
for idx, proposer_id in enumerate(self.proposer_node_ids):
169-
if idx > 0:
170-
self.nodes[proposer_id].createwallet(f'n{proposer_id}')
179+
for node_id, node in self.nodes.items():
180+
node.createwallet(f'n{node_id}')
181+
tmp_wallet = node.get_wallet_rpc(f'n{node_id}')
171182

172-
tmp_wallet = self.nodes[proposer_id].get_wallet_rpc(f'n{proposer_id}')
173-
tmp_wallet.importwallet(normpath(self.tmp_dir + f'/n{proposer_id}.wallet'))
183+
if self.num_validator_nodes > 0:
184+
tmp_wallet.importwallet(
185+
normpath(self.tmp_dir + f'/n{node_id}.wallet')
186+
)
187+
else:
188+
tmp_wallet.importmasterkey(
189+
regtest_mnemonics[node_id]['mnemonics']
190+
)
174191

175192
self.loop.run_until_complete(self.trigger_simulation_stop())
176193
return True
@@ -183,10 +200,20 @@ def autofinalization_workaround(self):
183200
self.logger.info('Running auto-finalization workaround')
184201

185202
lucky_proposer_id = self.proposer_node_ids[0]
186-
validators = [self.nodes[i] for i in self.validator_node_ids]
203+
lucky_node_ids = [lucky_proposer_id] + self.validator_node_ids
187204

188-
self.start_node(lucky_proposer_id)
189-
self.start_nodes(validators)
205+
# We'll start nodes isolated from the experiment's network, and reload
206+
# their wallets later once the experiment starts after the workaround.
207+
if not path_exists(self.tmp_dir + '/workaround'):
208+
mkdir(self.tmp_dir + '/workaround')
209+
for node_id in lucky_node_ids:
210+
initialize_datadir(self.tmp_dir + '/workaround', node_id)
211+
212+
workaround_nodes = self.build_nodes_instances(
213+
base_dir=self.tmp_dir + '/workaround',
214+
node_ids=lucky_node_ids
215+
)
216+
self.start_nodes(workaround_nodes)
190217

191218
# Although we don't need to collect data during this initialization
192219
# phase, we'll connect the nodes through a NodesHub instance to ensure
@@ -195,48 +222,63 @@ def autofinalization_workaround(self):
195222
tmp_hub = NodesHub(
196223
loop=self.loop,
197224
latency_policy=StaticLatencyPolicy(0),
198-
nodes=self.nodes,
225+
nodes=workaround_nodes,
199226
network_stats_collector=NullNetworkStatsCollector()
200227
)
201-
lucky_node_ids = [lucky_proposer_id] + self.validator_node_ids
202228
tmp_hub.sync_start_proxies(lucky_node_ids)
203229
dense_graph = create_simple_dense_graph(node_ids=lucky_node_ids)
204230
tmp_hub.sync_connect_nodes_graph(dense_graph)
205231

206232
# We have to load some money into the nodes
207-
lucky_proposer = self.nodes[lucky_proposer_id]
233+
lucky_proposer = workaround_nodes[lucky_proposer_id]
208234
for proposer_id in self.proposer_node_ids:
209235
lucky_proposer.createwallet(f'n{proposer_id}')
210236
tmp_wallet = lucky_proposer.get_wallet_rpc(f'n{proposer_id}')
211237
tmp_wallet.importmasterkey(
212238
regtest_mnemonics[proposer_id]['mnemonics']
213239
)
214240
for validator_id in self.validator_node_ids:
215-
self.nodes[validator_id].createwallet(f'n{validator_id}')
216-
tmp_wallet = self.nodes[validator_id].get_wallet_rpc(f'n{validator_id}')
241+
workaround_nodes[validator_id].createwallet(f'n{validator_id}')
242+
tmp_wallet = workaround_nodes[validator_id].get_wallet_rpc(f'n{validator_id}')
217243
tmp_wallet.importmasterkey(
218244
regtest_mnemonics[validator_id]['mnemonics']
219245
)
220246

221-
self.loop.run_until_complete(self.ensure_autofinalization_is_off())
247+
self.logger.info('Imported mnemonics into workaround nodes')
222248

223-
# Unloading the wallets that don't belong to the lucky proposer
249+
self.loop.run_until_complete(self.ensure_autofinalization_is_off(
250+
workaround_nodes
251+
))
252+
253+
# Dumping wallets to be loaded later
224254
for proposer_id in self.proposer_node_ids:
225255
# The wallet file is created in the autofinalization_workaround method
226256
tmp_wallet = lucky_proposer.get_wallet_rpc(f'n{proposer_id}')
227257
tmp_wallet.dumpwallet(normpath(self.tmp_dir + f'/n{proposer_id}.wallet'))
228-
if proposer_id != lucky_proposer_id:
229-
lucky_proposer.unloadwallet(f'n{proposer_id}')
258+
lucky_proposer.unloadwallet(f'n{proposer_id}')
259+
for validator_id in self.validator_node_ids:
260+
tmp_wallet = workaround_nodes[validator_id].get_wallet_rpc(f'n{validator_id}')
261+
tmp_wallet.dumpwallet(normpath(self.tmp_dir + f'/n{validator_id}.wallet'))
262+
263+
self.logger.info('Dumped workaround wallets to be reused later')
264+
265+
# We close all temporary connections & shut down nodes
266+
tmp_hub.close()
267+
self.stop_nodes(workaround_nodes)
230268

231-
tmp_hub.close() # We close all temporary connections
269+
# Cleaning workaround stuff
270+
rmtree(self.tmp_dir + '/workaround')
232271

233272
# We recover the original topology for the full network
234273
# self.num_nodes, self.graph_edges = tmp_num_nodes, tmp_graph_edges
235274
self.logger.info('Finished auto-finalization workaround')
236275

237-
async def ensure_autofinalization_is_off(self):
276+
async def ensure_autofinalization_is_off(
277+
self,
278+
workaround_nodes: Dict[int, TestNode]
279+
):
238280
for validator_id in self.validator_node_ids:
239-
validator = self.nodes[validator_id]
281+
validator = workaround_nodes[validator_id]
240282
tmp_wallet = validator.get_wallet_rpc(f'n{validator_id}')
241283
tmp_wallet.deposit(
242284
tmp_wallet.getnewaddress('', 'legacy'),
@@ -250,7 +292,7 @@ async def ensure_autofinalization_is_off(self):
250292
# We have to wait at least for one epoch :( .
251293
await asyncio_sleep(1 + self.block_time_seconds * 50)
252294

253-
lucky_proposer = self.nodes[self.proposer_node_ids[0]]
295+
lucky_proposer = workaround_nodes[self.proposer_node_ids[0]]
254296
is_autofinalization_off = False
255297

256298
while not is_autofinalization_off:
@@ -266,6 +308,8 @@ def safe_run(self, close_loop=True) -> bool:
266308
successful_run = False
267309
try:
268310
successful_run = self.run()
311+
except BaseException as e:
312+
self.logger.critical('The sub-experiment failed', exc_info=e)
269313
finally:
270314
self.logger.info('Releasing resources')
271315
if self.nodes_hub is not None:
@@ -300,29 +344,13 @@ def cleanup_directories(self):
300344
if self.tmp_dir != '' and path_exists(self.tmp_dir):
301345
self.logger.info('Cleaning temporary directories')
302346
rmtree(self.tmp_dir)
303-
# TODO: Remove wallet.* files too
304347

305348
def setup_chain(self):
306349
self.logger.info('Preparing "empty" chain')
307350
for i in range(self.num_nodes):
308351
initialize_datadir(self.tmp_dir, i)
309352

310-
def setup_nodes(self):
311-
if len(self.nodes) > 0:
312-
self.logger.info('Skipping nodes setup')
313-
return
314-
315-
self.logger.info('Creating node wrappers')
316-
317-
all_node_ids = set(range(self.num_nodes))
318-
self.proposer_node_ids = sample(
319-
all_node_ids, self.num_proposer_nodes
320-
)
321-
self.validator_node_ids = sample(
322-
all_node_ids.difference(self.proposer_node_ids),
323-
self.num_validator_nodes
324-
)
325-
353+
def get_node_args(self, node_id: int) -> List[str]:
326354
# Some values are copied from test_framework.util.initialize_datadir, so
327355
# they are redundant, but it's easier to see what's going on by having
328356
# all of them together.
@@ -354,36 +382,61 @@ def setup_nodes(self):
354382
for mnemonic in regtest_mnemonics
355383
]
356384
}
357-
}, separators=(",",":"))}'''
385+
}, separators=(",", ":"))}'''
358386
]
359387
relay_args = ['-proposing=0'] + node_args
360388
proposer_args = ['-proposing=1'] + node_args
361389
validator_args = ['-proposing=0', '-validating=1'] + node_args
362390

391+
if node_id in self.proposer_node_ids:
392+
_node_args = proposer_args
393+
elif node_id in self.validator_node_ids:
394+
_node_args = validator_args
395+
else:
396+
_node_args = relay_args
397+
return [
398+
f'-bind=127.0.0.1:{NodesHub.get_p2p_node_port(node_id)}',
399+
f'-rpcbind=127.0.0.1:{NodesHub.get_rpc_node_port(node_id)}',
400+
f'''-stats-log-output-file={
401+
self.nodes_stats_directory.joinpath(f"stats_{node_id}.csv")
402+
}''',
403+
f'-uacomment=simpatch{node_id}'
404+
] + _node_args
405+
406+
def setup_nodes(self):
407+
if len(self.nodes) > 0:
408+
self.logger.info('Skipping nodes setup')
409+
return
410+
411+
self.logger.info('Creating node wrappers')
412+
413+
all_node_ids = set(range(self.num_nodes))
414+
self.proposer_node_ids = sample(
415+
all_node_ids, self.num_proposer_nodes
416+
)
417+
self.validator_node_ids = sample(
418+
all_node_ids.difference(self.proposer_node_ids),
419+
self.num_validator_nodes
420+
)
421+
363422
if not self.nodes_stats_directory.exists():
364423
self.nodes_stats_directory.mkdir()
365424

366-
def get_node_args(node_id: int) -> List[str]:
367-
if node_id in self.proposer_node_ids:
368-
_node_args = proposer_args
369-
elif node_id in self.validator_node_ids:
370-
_node_args = validator_args
371-
else:
372-
_node_args = relay_args
373-
return [
374-
f'-bind=127.0.0.1:{NodesHub.get_p2p_node_port(node_id)}',
375-
f'-rpcbind=127.0.0.1:{NodesHub.get_rpc_node_port(node_id)}',
376-
f'''-stats-log-output-file={
377-
self.nodes_stats_directory.joinpath(f"stats_{node_id}.csv")
378-
}''',
379-
f'-uacomment=simpatch{node_id}'
380-
] + _node_args
381-
382-
self.nodes = [
383-
TestNode(
425+
self.nodes = self.build_nodes_instances(
426+
base_dir=self.tmp_dir,
427+
node_ids=range(self.num_nodes)
428+
)
429+
430+
def build_nodes_instances(
431+
self,
432+
base_dir: str,
433+
node_ids: Iterable[int]
434+
) -> Dict[int, TestNode]:
435+
return {
436+
i: TestNode(
384437
i=i,
385-
datadir=f'{self.tmp_dir}/node{i}',
386-
extra_args=get_node_args(i),
438+
datadir=f'{base_dir}/node{i}',
439+
extra_args=self.get_node_args(i),
387440
rpchost=None,
388441
timewait=60,
389442
unit_e=environ['UNIT_E'],
@@ -392,8 +445,8 @@ def get_node_args(node_id: int) -> List[str]:
392445
coverage_dir=None,
393446
use_cli=False
394447
)
395-
for i in range(self.num_nodes)
396-
]
448+
for i in node_ids
449+
}
397450

398451
def start_node(self, i: int):
399452
node = self.nodes[i]
@@ -404,20 +457,20 @@ def start_node(self, i: int):
404457
self.stop_nodes()
405458
raise
406459

407-
def start_nodes(self, nodes: Optional[List[TestNode]] = None):
460+
def start_nodes(self, nodes: Optional[Dict[int, TestNode]] = None):
408461
self.logger.info('Starting nodes')
409462

410463
if nodes is None:
411464
nodes = self.nodes
412465

413-
for node_id, node in enumerate(nodes):
466+
for node_id, node in nodes.items():
414467
try:
415468
if not node.running:
416469
node.start()
417470
except OSError as e:
418471
self.logger.critical(f'Node {node_id} failed to start', e)
419472
raise
420-
for node_id, node in enumerate(nodes):
473+
for node_id, node in nodes.items():
421474
try:
422475
node.wait_for_rpc_connection()
423476
except AssertionError as e:
@@ -429,14 +482,18 @@ def start_nodes(self, nodes: Optional[List[TestNode]] = None):
429482

430483
self.logger.info('Started nodes')
431484

432-
def stop_nodes(self):
485+
def stop_nodes(self, nodes: Optional[Dict[int, TestNode]] = None):
433486
self.logger.info('Stopping nodes')
434-
for node in self.nodes:
487+
488+
if nodes is None:
489+
nodes = self.nodes
490+
491+
for node in nodes.values():
435492
try:
436493
node.stop_node()
437494
except AssertionError:
438495
continue
439-
for node in self.nodes:
496+
for node in nodes.values():
440497
node.wait_until_stopped()
441498

442499
def define_network_topology(self):

0 commit comments

Comments
 (0)