Saturday, January 16, 2016

ElasticSearch Zen Discovery Internals


ElasticSearch Zen Discovery Internals


ElasticSearch is a very widely used opensource distributed search engine based on lucene. Zen discovery is algorithm used by Elastic Search to discover nodes when their state changes in the cluster. It does master election,fault detection and cluster state maintenance and publishing. There was a need to know the internals of zen discovery as the documentation is very less. It turns out that code is the source of truth, so I went to the code itself.  This blog is dedicated in reverse engineering the “zen discovery” using unicast zen ping mechanism. So here we start.

High level components

Zen discovery has the following parts as show in the diagram below:
JGelasticsearch_high (1)
  1. MembershipAction: Takes action on any Node requesting to added to Master, or leaving the master
  2. UnicastZenPing( ZenPingService) :Sending pings to other nodes and sending responses from the node who got pinged.
  3. PublishClusterStateAction :Used to publish new cluster state from master to all non master nodes as well as receive and apply cluster state on non master nodes.
  4. MasterFaultDetection: Runs on both the master and non master nodes. Used for pinging master node from non master nodes at regular interval to see if master is alive. On the master side it waits to take action on “internal:discovery/zen/fd/master_ping” to respond if it master or not.
  5. Join process and NodeJoinController : Main thread to take care of master election and joining of the node to the master
  6. NodesFaultDetection : Runs on master and non master nodes. Pings from master node to all non master nodes and look for transport connection issues. Also runs on non master nodes to respond the fault detection pings.

Role of each component

org.elasticsearch.discovery. Discovery interface implementation is provided by ZenDiscovery. There are other discovery algos available too. All of these components run on all the nodes.
MembershipAction
Role: Takes action on any Node requesting to added to Master, or leaving the master.
  • any incoming join request to master  ( internal:discovery/zen/join ) This is used when a node joins a master after master is elected at that node.
  • Leave a cluster (internal:discovery/zen/leave)  This action is called when zendiscovery stops.
  • validate a join request ( internal:discovery/zen/join/validate )
Used in ZenDiscovery to join a master after the master election is done.Ping all the nodes and get PingResponse. Each Pingresponse contains nodeid, masterNodeId.
UnicastZenPing( ZenPingService)
Role: Sending pings to other nodes and sending response from the node who got pinged.
  1. Tied to action : “internal:discovery/zen/unicast”
  2. This is used by ZenPingService when Unicast is enabled.
  3. PingRepsonses include the node that recieved the ping, the master Node name, clustername etc.
  4. It is used to send pings to other nodes in ZenDiscovery as part of “find the Master”. More details “Join process and NodeJoinController” step2.
  5. It gives preference to unicast hosts that are configured as part of the “discovery.zen.ping.unicast.hosts” config.
PublishClusterStateAction
Role: Used to publish new cluster state from master to all non master nodes as well as receive and apply cluster state on non master nodes.
Action attached to : internal:discovery/zen/publish”
  1. Send cluster state difference when master has seen that node before
  2. Sends full state otherwise
  3. cluster state is compressed and serialized to non master nodes
MasterFaultDetection
Role: Runs on both the master and non master nodes. Used for pinging master node from non master nodes at regular interval to see if master is alive. On the master side it waits to takes action on “internal:discovery/zen/fd/master_ping” to respond if it master or not.
Takes care of cases where a node which was master is not really master anymore but considered to be master by other nodes.
  1. In case non master node is not able to reach to master node then “Master failure” is the event that is initiated here on non master node.If that happens then
    1. A master election is done
    2. “rejoin” is activated in cases where
      1. rejoinOnMasterGone is true( This is a config)
      2. There are not enough master capable nodes available in the cluster state as calculated on non master node.
      3. No master is available.
      4. Rejoin is a reset button to activation and node joins fresh to the cluster.
    3. If rejoin does not happen and current node is chosen as master then starts acting as master.
Before understanding the working on join thread lets run through an example to understand the join/election scenario.
Note on “pending list of master eligible node joins”
    1. Say Node1 , Node2 , Node3 , Node4 , Node5 are all eligible to be master nodes.
    2. Assume they all start at the same time. On all machines the master is chosen as Node1
    3. Now assume that Node2,Node3,Node4 and Node5 send join request to Node1 as Node1 according to them is the master.
    4. Now assume Node1 in parallel is running this process of master election too and finds out that is is the master.
    5. On Node1 when it sees itself as the master, it has to wait for atleast “minimum_master_nodes” lets say 2 master eligible nodes to join itself before it can mark it as master.
    6. So Node1 waits for atleast 2(minimum_master_nodes) nodes among Node2 to Node5 to join before victory is declared. It waits for sometime for doing that.
So the Node1 will use this pendingjoin list to keep track of who wants it to be joined while election is happening on Node1. As a note, this pending list is also kept on all nodes, but is thrown as soon node sees that it is not elected as master.
Join process and NodeJoinController
  1. Start accumulating joins from nodes who are sending request to join the master. This makes a queue as while election is happening we need to keep incoming join request somewhere in case this node becomes the master election.See : “pending list of master eligible node joins” for details.
  2. Find master by pinging all nodes through pingservice
    1. Ping all the nodes and get PingResponse. Each Pingresponse contains nodeid, masterNodeid. UniCastPing is used here if Unicast is enabled.
    2. Look for only master eligible nodes
    3. Three lists are created
      1. ActiveNodes list : List of all Nodes from pinging process + localnode
      2. NodesJoinedAtleastOnceBefore List of active nodes who have joined cluster before to distinguish it from new node + local node local node is added to this list if this has joined the cluster before.
      3. List of Nodes who are masters( not just master eligible) as reported by other nodes in ping responses. Call it masterNodes list.
    4. if masterNodes is there, which means other nodes see some node(s) as master
      1. elect master among these masterNodes.
      2. This list does not contain local node
    5. if there is no masterNodes list. This means no other node has seen any node as master.
      1. First do master election among nodes who have been in cluster earlier.
      2. if no master is elected then do master election on all active nodes.
      3. This makes sure that preference is given to existing nodes for being the master vs new master eligible node.
    6. Now if LOCAL NODE is elected as master then
      1. waits for joins from non master elected nodes so that victory is declared.
      2. see “pending list of master eligible node joins” for details.
      3. once elected cluster state is published
      4. zen-disco-join(elected_as_master, [” + pendingMasterJoins + “]joins received)”;
    7. If LOCAL NODE is not elected as master then
      1. stop accumulating any joins
      2. send a request to master ( Node1) through membership service join action
      3. “finalize_join (“ will be in the logs of non master nodes once this is done.
NodesFaultDetection
Role: Runs on master and non master nodes. Pings from master node to all non master nodes and look for transport connection issues. Also runs on non master nodes to respond to the fault detection pings.
On master in case of ping not received from non master nodes:
  1. Will see “zen-disco-node_failed” on master logs
  2. Check if still have enough master capable nodes.
    1. If not, then rejoin mechanism. This initiates a restart on joining the cluster algorithm, which will eventually publishes the cluster state.
    2. if enough master nodes there
      1. Updates the cluster state after refreshing the routing tables.
On non master nodes when we receive the ping
  1. Handles case where this was master before and now some other node is also a master and we have received ping from that Node. This node still feels it is master. In that case it waits for maxPingsFromAnotherMaster from another master before it handles another master
    1. Then it handles anotherMaster scenario.
    2. internal:discovery/zen/rejoin is the action attached for it.
    3. It sends a ping to this action internal:discovery/zen/rejoin on the new master
      1. ClusterState on master is re published in that case if the above step is ACK on the master side.

Data flow when the node starts up

JGElasticSearch_3 (8)
  1. Zendiscovery.dostart()
  2. clusterService.submitStateUpdateTask(“initial_join”, new ClusterStateNonMasterUpdateTask()
  3. joinThreadControl.startNewThreadIfNotRunning()
  4. zendiscovery–>innerJoinCluster()
    1. Starts looking for master()  by Zendiscovery.findMaster()
      1. Calls pingService.pingAndWait(pingTimeout);
      2. When using ping service ( unicast ping service is use) which gives preference to hosts that are provided.
      3. Filters out all the non master nodes.
      4. The “ping response” contains master node in it. clusterName , node info, master node info.
      5. If out of all the ping responses, no master node is there, election is needed
        1. Herewe need to check in the responses received if we have got all the active nodes which have enough “master capable nodes”, then only we re-elect.
          1. call to ElectmasterService is used to electMaster. This just runs a comparator on all master nodes list and returns the first one.
          2. Preference to nodes who have previously already joined the cluster. Those will have a cluster state in memory, including an up to date routing table (which is not persistent to disk by the gateway)
      6. If there are masters available in the ping responses, still a re-election is done among the master enabled nodes discovered. Reelection is just a in memory ElectmasterService call.The master node list is sorted, so it will be same for all. This is an in memory operation. Just sorting.
    2. Now we must have got a master node which can be pinged.
    3. If the current node is itself elected as master because it may be the only node or a legitimate winner in ElectmasterService then we
      1. wait for nodeJoinController to waitToBeElectedAsMaster
    4. If the current node is not the master then it should send request to join the master
      1. stop accumulate join requests from any node as it is not the master
      2. send a request to join the master joinElectedMaster(masterNode);
        1. Make sure we are able to reach  the master Node
        2. MembershipAction service is called to join a blocking join. Multiple attempts are made here.
        3. A action with internal:discovery/zen/join is made to master node.
          1. JoinRequestRequestHandler handles this on master.  
      3. clusterService.submitStateUpdateTask(“finalize_join (” + masterNode + “)”, new ClusterStateNonMasterUpdateTask()
Some liberty has been taken to not over complicate the following picture. The main workflow in at the bottom of the diagram. The numbers tell the sequence.
Comments welcome.

By 
Gaurav Kukal
www.linkedin.com/in/gauravkukal

No comments:

Post a Comment