From d3b43fba79e7680b55c1606fc99cd70432f41bd8 Mon Sep 17 00:00:00 2001 From: "Li, Shulong" Date: Wed, 16 Oct 2019 11:11:45 +0800 Subject: [PATCH] classic paxos demo --- homework/lsl/acceptor.py | 79 +++++++++++++++++++++ homework/lsl/cluster.py | 72 +++++++++++++++++++ homework/lsl/message.py | 33 +++++++++ homework/lsl/paxos.py | 78 +++++++++++++++++++++ homework/lsl/proposer.py | 148 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 410 insertions(+) create mode 100644 homework/lsl/acceptor.py create mode 100644 homework/lsl/cluster.py create mode 100644 homework/lsl/message.py create mode 100644 homework/lsl/paxos.py create mode 100644 homework/lsl/proposer.py diff --git a/homework/lsl/acceptor.py b/homework/lsl/acceptor.py new file mode 100644 index 0000000..2ec6367 --- /dev/null +++ b/homework/lsl/acceptor.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- + +import cluster +import message + +import Queue +import logging + + +logger = logging.getLogger(__name__) + + +class Acceptor(cluster.Node): + def __init__(self, ident): + super(Acceptor, self).__init__(ident, cluster.Role.ACCEPTOR) + self.register() + + ''' pre_round_num is None means never receives msg ''' + self.pre_round_num = None + + ''' round_num and value accepted ''' + self.round_num = None + self.value = None + + def stable_store(self, round_num, value=None, accepted=False): + ''' simulate store info on stable storage ''' + self.pre_round_num = round_num + + if accepted: + self.round_num = round_num + self.value = value + + def handle_msg(self): + send_msg = None + recv_msg = self.receive() + + logger.info("{who} receives msg: {msg}".format(who=self.ident, msg=recv_msg)) + + if recv_msg.category == message.Category.PREPATR_REQUEST: + round_num = recv_msg.round_num + + if self.pre_round_num is None or round_num > self.pre_round_num: + send_msg = message.Message(self.ident, + recv_msg.sender, + self.round_num, + self.value, + message.Category.PREPATR_REQUEST, + message.MessageType.RESPONSE) + + self.stable_store(round_num) + logger.info("{who} accept prepare request: {m}".format(who=self.ident, m=recv_msg)) + else: + logger.info("{who} reject prepare request: {m}".format(who=self.ident, m=recv_msg)) + + elif recv_msg.category == message.Category.ACCEPT_REQUEST: + round_num = recv_msg.round_num + if self.pre_round_num is None or round_num >= self.pre_round_num: + send_msg = message.Message(self.ident, + recv_msg.sender, + round_num, + recv_msg.value, + message.Category.ACCEPT_REQUEST, + message.MessageType.RESPONSE) + self.stable_store(round_num, recv_msg.value, True) + logger.info("{who} accept accept request: {m}".format(who=self.ident, m=recv_msg)) + else: + logger.info("{who} reject accept request: {m}".format(who=self.ident, m=recv_msg)) + + else: + ''' ignore unkonwn msg ''' + logger.warn("received unknoen message: {msg}".format(msg=recv_msg)) + pass + + if send_msg is not None: + self.send(send_msg) + + def run(self): + while True: + self.handle_msg() diff --git a/homework/lsl/cluster.py b/homework/lsl/cluster.py new file mode 100644 index 0000000..2394762 --- /dev/null +++ b/homework/lsl/cluster.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +''' +nodes info in cluster +''' + +import copy +import Queue +import logging +import random +import time + + +class Role(object): + PROPOSER = "proposer" + ACCEPTOR = "acceptor" + + +random.seed(time.time()) +logger = logging.getLogger(__name__) +CLUSTER_NODES_BY_IDENT = {} +CLUSTER_NODES_BY_ROLE = { + Role.PROPOSER: {}, + Role.ACCEPTOR: {}, +} + + +class Node(object): + def __init__(self, ident, role): + self.role = role + self.ident = ident + self.channel = { + # TODO(lsl): one queue may enough + "send": Queue.Queue(), + "recv": Queue.Queue(), + } + + def receive(self, timeout=None): + msg = None + try: + msg = self.channel["recv"].get(timeout=timeout) + return msg + except Queue.Empty as err: + logger.info("timeout before receive any messages, {e}".format(e=err)) + return None + + + def send(self, msg): + rece_chan = self.getNodeChannel(msg.receiver) + rece_chan["recv"].put(msg) + + def register(self): + CLUSTER_NODES_BY_IDENT[self.ident] = self.channel + CLUSTER_NODES_BY_ROLE[self.role][self.ident] = self.channel + + def getNodeChannel(self, ident): + return CLUSTER_NODES_BY_IDENT[ident] + + +def getRandomNodesByRoleList(role): + idents = CLUSTER_NODES_BY_ROLE[role].keys() + random.seed(time.time()) + random.shuffle(idents) + + ret = [] + for ident in idents: + ret.append({ + "ident": ident, + "channel": CLUSTER_NODES_BY_IDENT[ident], + }) + + return ret + diff --git a/homework/lsl/message.py b/homework/lsl/message.py new file mode 100644 index 0000000..303d2b6 --- /dev/null +++ b/homework/lsl/message.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- + + +class Category(object): + ACCEPT_REQUEST = "accept" + PREPATR_REQUEST = "prepare" + + +class MessageType(object): + REQUEST = "request" + RESPONSE = "response" + + +class Message(object): + def __init__(self, sender, receiver, round_num, value, category, m_type): + self.sender = sender + self.receiver = receiver + + ''' round_num is None and value is None means never receives msg ''' + self.round_num = round_num + self.value = value + + ''' prepare request or accept request ''' + self.category = category + ''' request or response which is not a must ''' + self.m_type = m_type + + def __repr__(self): + return ""\ + "".format(s=self.sender, r=self.receiver, + n=self.round_num, v=self.value, + c=self.category, m=self.m_type) diff --git a/homework/lsl/paxos.py b/homework/lsl/paxos.py new file mode 100644 index 0000000..13ca0af --- /dev/null +++ b/homework/lsl/paxos.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +""" +main entry of paxos demo +""" + +import cluster +import proposer +import acceptor + +import time +import threading +import logging + + +logger = logging.getLogger(__name__) + + +def make_nodes(num, role): + ret= [] + for idx in range(num): + if role == cluster.Role.PROPOSER: + node = proposer.Proposer("proposer-{idx}".format(idx=idx)) + + elif role == cluster.Role.ACCEPTOR: + node = acceptor.Acceptor("acceptor-{idx}".format(idx=idx)) + + else: + raise Exception("unknown role: {r}".format(r=role)) + + ret.append(node) + + return ret + +def main(): + acceptors = make_nodes(5, cluster.Role.ACCEPTOR) + proposers = make_nodes(3, cluster.Role.PROPOSER) + + logger.info("cluster: {nodes}".format(nodes=cluster.CLUSTER_NODES_BY_IDENT)) + + ''' start acceptors ''' + for acptor in acceptors: + threading.Thread(target=acptor.run).start() + + logger.info("acceptors started") + time.sleep(0.1) + + proposer_ths = [] + value = 0 + ''' start proposers ''' + for pro in proposers: + th = threading.Thread(target=pro.propose, args=(value,)) + th.start() + proposer_ths.append(th) + value += 1 + + logger.info("proposers started") + + for th in proposer_ths: + th.join() + + logger.info("propose finished") + for acptor in acceptors: + logger.info("{who}: rnd: {rnd}, value: {val}".format(who=acptor.ident, + rnd=acptor.round_num, + val=acptor.value)) + + +def init_log(): + log_format = '%(asctime)s:%(levelname)s:%(filename)s:' \ + '%(funcName)s:%(lineno)s:%(message)s' + datefmt = '%a, %d %b %Y %H:%M:%S' + level = logging.DEBUG + + logging.basicConfig(format=log_format, datefmt=datefmt, level=level) + +if __name__ == "__main__": + init_log() + main() diff --git a/homework/lsl/proposer.py b/homework/lsl/proposer.py new file mode 100644 index 0000000..1f136dc --- /dev/null +++ b/homework/lsl/proposer.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- + +import cluster +import message + +import time +import random +import logging + + +random.seed(time.time()) +logger = logging.getLogger(__name__) + + +class Proposer(cluster.Node): + def __init__(self, ident): + super(Proposer, self).__init__(ident, cluster.Role.PROPOSER) + self.register() + + self.round_num = None + + def makeRoundNum(self): + self.round_num = "{tm}-{ident}".format(tm=int(time.time()*1000*1000), ident=self.ident) + + def unstable_send(self, msg, rato=0.1): + rnd = random.randint(1, 10) + if rnd < int(rato*10): + ''' ignore sending ''' + return + + self.send(msg) + + def unstable_receive(self, rato=0.1): + timeout = False + + msg = self.receive(timeout=0.1) + if msg is None: + timeout = True + + rnd = random.randint(1, 10) + if rnd < int(rato*10): + ''' ignore received msg ''' + msg = None + + return msg, timeout + + def send_reqs(self, acceptors, category, m_type, round_num, value=None): + for ident in acceptors.keys(): + msg = message.Message(self.ident, + ident, + round_num, + value, + category, + m_type) + self.unstable_send(msg) + + received = [] + for ident in acceptors.keys(): + msg, timeout = self.unstable_receive() + if timeout: + logger.error("some acceptor not response") + break + + if msg is not None: + received.append(msg) + + return received + + def prepare_reqs(self, acceptors, round_num): + return self.send_reqs(acceptors, + message.Category.PREPATR_REQUEST, + message.MessageType.REQUEST, + round_num) + + def accept_reqs(self, round_num, value, acceptors): + return self.send_reqs(acceptors, + message.Category.ACCEPT_REQUEST, + message.MessageType.REQUEST, + round_num, + value) + + def do_propose(self, value): + ''' + return: + whether accepted by majority and accepted value + ''' + self.makeRoundNum() + + all_acceptors = cluster.getRandomNodesByRoleList(cluster.Role.ACCEPTOR) + logger.info("{who} sees acceptors: {a}".format(who=self.ident, a=all_acceptors)) + + majority = {} + quorum = len(all_acceptors)/2 + 1 + + logger.info("{who} quorum: {q}".format(who=self.ident, q=quorum)) + + for acptor in all_acceptors: + majority[acptor["ident"]] = acptor["channel"] + + if len(majority) >= quorum: + break + + ''' send prepare requests ''' + received_msg = self.prepare_reqs(majority, self.round_num) + if len(received_msg) < quorum: + return False, None + + logger.info("{who} receives {n} prepare reponses".format(who=self.ident, n=len(received_msg))) + + acceptors = {} + max_rnd = None + max_rnd_value = None + for msg in received_msg: + acceptors[msg.sender] = self.getNodeChannel(msg.sender) + + if msg.round_num is None: + continue + + if max_rnd is None or msg.round_num > max_rnd: + max_rnd = msg.round_num + max_rnd_value = msg.value + + propose_value = value + if max_rnd_value is not None: + propose_value = max_rnd_value + + if max_rnd is not None and max_rnd > self.round_num: + logger.info("{who} finds greater round num: {g} > {s}, " + "increase self round number".format(who=self.ident, g=max_rnd, s=self.round_num)) + return False, None + + ''' accept request ''' + received_msg = self.accept_reqs(self.round_num, propose_value, acceptors) + if len(received_msg) < quorum: + return False, None + + return True, propose_value + + def propose(self, value): + while True: + accepted, acpt_val = self.do_propose(value) + if accepted: + logger.info("{who}: value {v} accepted".format(who=self.ident, v=acpt_val)) + break + + time.sleep((random.random()+0.1)/100) + + logger.info("{who} quits".format(who=self.ident))