for comp in self.roles[:]: if not hasattr(comp, handler_name): continue comp.logger.debug("received %s from %s", message, sender) fn = getattr(comp, handler_name) fn(sender=sender, **message._asdict())
def do_Prepare(self, sender, ballot_num): if ballot_num > self.ballot_num: self.ballot_num = ballot_num # we've heard from a scout, so it might be the next leader self.node.send([self.node.address], Accepting(leader=sender)) self.node.send([sender], Promise( ballot_num=self.ballot_num, accepted_proposals=self.accepted_proposals )) def do_Accept(self, sender, ballot_num, slot, proposal): if ballot_num >= self.ballot_num: self.ballot_num = ballot_num acc = self.accepted_proposals if slot not in acc or acc[slot][0] < ballot_num: acc[slot] = (ballot_num, proposal) self.node.send([sender], Accepted( slot=slot, ballot_num=self.ballot_num))
active leader将以心跳的形式发送Active消息。如果在LEADER_TIMEOUT到期之前没有此类消息到达,则Replica将假定该Leader已死,并转向下一个Leader。在这种情况下,重要的是所有副本都选择相同的新领导者,我们可以通过对成员进行排序并在列表中选择下一个leader。 当节点加入网络时,Bootstrap将发送一条Join消息(下图)。Replica以一条Welcome包含其最新状态的消息作为响应,从而使新节点能够快速启用。
def __init__(self, node, execute_fn, state, slot, decisions, peers): super(Replica, self).__init__(node) self.execute_fn = execute_fn self.state = state self.slot = slot self.decisions = decisions self.peers = peers self.proposals = {} # next slot num for a proposal(may lead slot) self.next_slot = slot self.latest_leader = None self.latest_leader_timeout = None
# making proposals
def do_Invoke(self, sender, caller, client_id, input_value): proposal = Proposal(caller, client_id, input_value) slot = next((s for s, p in self.proposals.iteritems() if p == proposal), None) # propose, or re-propose ifthis proposal already has a slot self.propose(proposal, slot)
def propose(self, proposal, slot=None): """Send (or resend, if slot is specified) a proposal to the leader""" if not slot: slot, self.next_slot = self.next_slot, self.next_slot + 1 self.proposals[slot] = proposal # find a leader we think is working - either the latest we know of, or # ourselves (which may trigger a scout to make us the leader) leader = self.latest_leader or self.node.address self.logger.info( "proposing %s at slot %d to leader %s" % (proposal, slot, leader)) self.node.send([leader], Propose(slot=slot, proposal=proposal))
# handling decided proposals
def do_Decision(self, sender, slot, proposal): assert not self.decisions.get(self.slot, None), \ "next slot to commit is already decided" if slot in self.decisions: assert self.decisions[slot] == proposal, \ "slot %d already decided with %r!" % (slot, self.decisions[slot]) return self.decisions[slot] = proposal self.next_slot = max(self.next_slot, slot + 1)
# re-propose our proposal in a newslotif it lost its slot and wasn't a no-op our_proposal = self.proposals.get(slot) if (our_proposal is not None and our_proposal != proposal and our_proposal.caller): self.propose(our_proposal) # execute any pending, decided proposals while True: commit_proposal = self.decisions.get(self.slot) if not commit_proposal: break # not decided yet commit_slot, self.slot = self.slot, self.slot + 1 self.commit(commit_slot, commit_proposal) def commit(self, slot, proposal): """Actually commit a proposal that is decided and in sequence""" decided_proposals = [p for s, p in self.decisions.iteritems() if s < slot] if proposal in decided_proposals: self.logger.info( "not committing duplicate proposal %r, slot %d", proposal, slot) return # duplicate self.logger.info("committing %r at slot %d" % (proposal, slot)) if proposal.caller is not None: # perform a client operation self.state, output = self.execute_fn(self.state, proposal.input) self.node.send([proposal.caller], Invoked(client_id=proposal.client_id, output=output)) # tracking the leader def do_Adopted(self, sender, ballot_num, accepted_proposals): self.latest_leader = self.node.address self.leader_alive() def do_Accepting(self, sender, leader): self.latest_leader = leader self.leader_alive() def do_Active(self, sender): if sender != self.latest_leader: return self.leader_alive() def leader_alive(self): if self.latest_leader_timeout: self.latest_leader_timeout.cancel() def reset_leader(): idx = self.peers.index(self.latest_leader) self.latest_leader = self.peers[(idx + 1) % len(self.peers)] self.logger.debug("leader timed out; tring the next one, %s", self.latest_leader) self.latest_leader_timeout = self.set_timer(LEADER_TIMEOUT, reset_leader) # adding new cluster members def do_Join(self, sender): if sender in self.peers: self.node.send([sender], Welcome( state=self.state, slot=self.slot, decisions=self.decisions))
def do_Preempted(self, sender, slot, preempted_by): if not slot: # from the scout self.scouting = False self.logger.info("leader preempted by %s", preempted_by.leader) self.active = False self.ballot_num = Ballot((preempted_by or self.ballot_num).n + 1, self.ballot_num.leader)
def do_Propose(self, sender, slot, proposal): if slot not in self.proposals: if self.active: self.proposals[slot] = proposal self.logger.info("spawning commander for slot %d" % (slot,)) self.spawn_commander(self.ballot_num, slot) else: if not self.scouting: self.logger.info("got PROPOSE when not active - scouting") self.spawn_scout() else: self.logger.info("got PROPOSE while scouting; ignored") else: self.logger.info("got PROPOSE for a slot already being proposed")
def update_accepted(self, accepted_proposals): acc = self.accepted_proposals for slot, (ballot_num, proposal) in accepted_proposals.iteritems(): if slot not in acc or acc[slot][0] < ballot_num: acc[slot] = (ballot_num, proposal)
def do_Promise(self, sender, ballot_num, accepted_proposals): if ballot_num == self.ballot_num: self.logger.info("got matching promise; need %d" % self.quorum) self.update_accepted(accepted_proposals) self.acceptors.add(sender) iflen(self.acceptors) >= self.quorum: # strip the ballot numbers from self.accepted_proposals, now that it # represents a majority accepted_proposals= \ dict((s, p) for s, (b, p) in self.accepted_proposals.iteritems()) # We're adopted; note that this does *not* mean that no other # leader is active. # Any such conflicts will be handled by the # commanders. self.node.send([self.node.address], Adopted(ballot_num=ballot_num, accepted_proposals=accepted_proposals)) self.stop() else: # this acceptor has promised another leader a higher ballot number, # so we've lost self.node.send([self.node.address], Preempted(slot=None, preempted_by=ballot_num)) self.stop()