博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
EOS多节点同步代码分析
阅读量:4984 次
发布时间:2019-06-12

本文共 11290 字,大约阅读时间需要 37 分钟。

EOS version: 1.0.7

一. 配置文件的修改

  EOS的节点同步流程是通过p2p来完成,在nodeos的配置文件config.ini中填写,其默认路径为~/.local/share/eosio/nodeos/config目录下,配置项及其格式如下:

p2p-peer-address = 10.186.11.223:9876121 p2p-peer-address = 10.186.11.220:9876122 p2p-peer-address = 10.186.11.141:9876

可以填写多个p2p站点地址。

二.节点同步的chain_id

  每一个节点都唯一分配一个chain_id,如果两个节点的chian_id不相等的话,是无法进行同步的,代码中处理如下:

void net_plugin_impl::handle_message( connection_ptr c, const handshake_message &msg) {         ...if( msg.chain_id != chain_id) {            elog( "Peer on a different chain. Closing connection");            c->enqueue( go_away_message(go_away_reason::wrong_chain) );            return;         }         ...}

  那么这个chain_id是如何开成的?

  chain_id在chain_plugin中定义,在net_plugin中使用,在chain_plugin中如下定义

//controller.cpp 

chain_id( cfg.genesis.compute_chain_id() ) //genesis_state.cpp chain::chain_id_type genesis_state::compute_chain_id() const {

digest_type::encoder enc;

fc::raw::pack( enc, *this );
return chain_id_type{enc.result()};
}

  这里相当于把整个genesis的数据做了一个类似hash的操作,默认情况下genesis的数据在代码中填写:

chain_config   initial_configuration = {      .max_block_net_usage                  = config::default_max_block_net_usage,      .target_block_net_usage_pct           = config::default_target_block_net_usage_pct,      .max_transaction_net_usage            = config::default_max_transaction_net_usage,      .base_per_transaction_net_usage       = config::default_base_per_transaction_net_usage,      .net_usage_leeway                     = config::default_net_usage_leeway,      .context_free_discount_net_usage_num  = config::default_context_free_discount_net_usage_num,      .context_free_discount_net_usage_den  = config::default_context_free_discount_net_usage_den,      .max_block_cpu_usage                  = config::default_max_block_cpu_usage,      .target_block_cpu_usage_pct           = config::default_target_block_cpu_usage_pct,      .max_transaction_cpu_usage            = config::default_max_transaction_cpu_usage,      .min_transaction_cpu_usage            = config::default_min_transaction_cpu_usage,      .max_transaction_lifetime             = config::default_max_trx_lifetime,      .deferred_trx_expiration_window       = config::default_deferred_trx_expiration_window,      .max_transaction_delay                = config::default_max_trx_delay,      .max_inline_action_size               = config::default_max_inline_action_size,      .max_inline_action_depth              = config::default_max_inline_action_depth,      .max_authority_depth                  = config::default_max_auth_depth,   };

  还可以通过nodeos命令行参数--genesis-json加载一个指定的配置文件genesis.json,其内容一般如下格式:

{  "initial_timestamp": "2018-03-02T12:00:00.000",  "initial_key": "EOS8Znrtgwt8TfpmbVpTKvA2oB8Nqey625CLN8bCN3TEbgx86Dsvr",  "initial_configuration": {    "max_block_net_usage": 1048576,    "target_block_net_usage_pct": 1000,    "max_transaction_net_usage": 524288,    "base_per_transaction_net_usage": 12,    "net_usage_leeway": 500,    "context_free_discount_net_usage_num": 20,    "context_free_discount_net_usage_den": 100,    "max_block_cpu_usage": 100000,    "target_block_cpu_usage_pct": 500,    "max_transaction_cpu_usage": 50000,    "min_transaction_cpu_usage": 100,    "max_transaction_lifetime": 3600,    "deferred_trx_expiration_window": 600,    "max_transaction_delay": 3888000,    "max_inline_action_size": 4096,    "max_inline_action_depth": 4,    "max_authority_depth": 6,    "max_generated_transaction_count": 16  },  "initial_chain_id": "0000000000000000000000000000000000000000000000000000000000000000"}

  所以,节点之间能同步的条件是参数配置需要完全相当的。

四.区块同步数据流

  数据同步涉及几个消息:

  handshake_message,  //hello握手信息,

  chain_size_message,  //暂未看到使用
  go_away_message //停止同步消息
  time_message,  // 时间戳相关
  notice_message,  //区块和事务状态同步
  request_message,  //请求发送区块同步,带有区块的num数据
  sync_request_message,  //在request_message基础上加了一个定时器做超时处理
  signed_block,      // 具体的区块数据
  packed_transaction    //事务同步处理

 

  现在假设有一个节点M,它的p2p-peer-address对就有三个地址a、b、c,现在数据同步的流程基本上有下面几个步骤.

 

  1.handshake_message处理流程

    首先,M结点会向a、b、c循环发起连接并发送一条握手信息,这条信息是一个名为struct handshake_message,定义如下:

struct handshake_message {      uint16_t                   network_version = 0; //net version, require  M == a == b == c      chain_id_type              chain_id; // M == a == b == c      fc::sha256                 node_id; ///< used to identify peers and prevent self-connect      chain::public_key_type     key; ///< authentication key; may be a producer or peer key, or empty      tstamp                     time;      fc::sha256                 token; ///< digest of time to prove we own the private key of the key above      chain::signature_type      sig; ///< signature for the digest      string                     p2p_address;      uint32_t                   last_irreversible_block_num = 0;      block_id_type              last_irreversible_block_id;      uint32_t                   head_num = 0;      block_id_type              head_id;      string                     os;      string                     agent;      int16_t                    generation;   };

包括了对通信的基本要求的参数,该消息初始化后会将其放入名为write_queue的消息队列中,最后消息是使用asio::async_write进行发送,发送消息的成功与否是通过回调来处理的。

void connection::do_queue_write() {...while (write_queue.size() > 0) {         auto& m = write_queue.front();         bufs.push_back(boost::asio::buffer(*m.buff));         out_queue.push_back(m);         write_queue.pop_front();      }      boost::asio::async_write(*socket, bufs, [c](boost::system::error_code ec, std::size_t w) { try {        for (auto& m: conn->out_queue) {                  m.callback(ec, w);               }while (conn->out_queue.size() > 0) {                  conn->out_queue.pop_front();               }               conn->enqueue_sync_block();               conn->do_queue_write();    }...}

对端收到handshake_message的消息后处理如下代码:

void sync_manager::recv_handshake (connection_ptr c, const handshake_message &msg) {    controller& cc = chain_plug->chain();      uint32_t lib_num = cc.last_irreversible_block_num( );      uint32_t peer_lib = msg.last_irreversible_block_num;      reset_lib_num(c);      c->syncing = false;      //--------------------------------      // sync need checks; (lib == last irreversible block)      //      // 0. my head block id == peer head id means we are all caugnt up block wise      // 1. my head block num < peer lib - start sync locally      // 2. my lib > peer head num - send an last_irr_catch_up notice if not the first generation      //      // 3  my head block num <= peer head block num - update sync state and send a catchup request      // 4  my head block num > peer block num ssend a notice catchup if this is not the first generation      //      //-----------------------------      uint32_t head = cc.head_block_num( );      block_id_type head_id = cc.head_block_id();      if (head_id == msg.head_id) {      ...      }            ...}

梳理流程:

  • 两个节点历史区块id相等,不进行同步;
  • A节点区块的head_block_num小于B节点不可逆区块的head_block_num,则B给A发送消息notice_message,消息中包含A节点所需要同步的区块范围,每次同步块数为sync_req_span,此参数在genesis.json中设置或者是程度初始的;
  • A节点不可逆区块的head_block_num大于B节点区块的head_block_num,则A给B发送消息notice_message,消息中包含可逆与不可逆区块的block_num;
  • A节点区块的head_block_num小于B节点的head_block_num,A节点会产生一个request_message消息发送给B; 

2.go_away_message

  一般在某些异常情况下节点A会断开与其它节点的同步,会发送一个go_away_message,会带有一个错误码:

enum go_away_reason {    no_reason, ///< no reason to go away    self, ///< the connection is to itself    duplicate, ///< the connection is redundant    wrong_chain, ///< the peer's chain id doesn't match    wrong_version, ///< the peer's network version doesn't match    forked, ///< the peer's irreversible blocks are different    unlinkable, ///< the peer sent a block we couldn't use    bad_transaction, ///< the peer sent a transaction that failed verification    validation, ///< the peer sent a block that failed validation    benign_other, ///< reasons such as a timeout. not fatal but warrant resetting    fatal_other, ///< a catch-all for errors we don't have discriminated    authentication ///< peer failed authenicatio  };

3.time_message

  这个消息应该是发送一个带有几个时间标志的keeplive消息包,目前设置的是每32秒发送一次。

 

4.notice_message

  这个消息定义如下:

struct notice_message {    notice_message () : known_trx(), known_blocks() {}    ordered_txn_ids known_trx;    ordered_blk_ids known_blocks;  };

  它包含了区块的信息和交易信息,也即对可逆区块,可逆事务,不可逆区块,不可逆事务都可以通过这个消息处理。比如,节点A把本地节点最新区块和事务信息(block_num)发送给节点B,节点B收到后会将本地的区块和事务信息(block_num)进行比较,根据比较的结果决定谁给谁传输数据。

 

5.request_message

  A节点请求端分为四种,节点B做为接收端,分别给予的应答如下:

   对于区块:

  • catch_up:B节点把本地的所有可逆的区块打包发给节点A; 
  • normal:根据A节点vector里面的区块id,在本地(B节点)不可逆的区块中进行查找,如果找到了就把该区块就发给A;

  对于事务:

  • catch_up:B节点把A节点所需要的可逆的transaction id 并且自己本地有的数据发送给A;
  • normal:  B节点把A节点所需要的不可逆的transaction id 并且自己本地有的数据发送给A;

6.sync_request_message

  此消息是在request_message实现基础上加了一个5S的定时器,同步消息在5S内没有得到应答会取消当前同步后再重新要求同步;

7.signed_block

  这里发送的是具体的区块数据,一般是收到request_message或者 sync_request_message消息后把本节点的区块发给对方;

bool connection::enqueue_sync_block() {      controller& cc = app().find_plugin
()->chain(); if (!peer_requested) return false; uint32_t num = ++peer_requested->last; bool trigger_send = num == peer_requested->start_block; if(num == peer_requested->end_block) { peer_requested.reset(); } try {
      //从本地取出区块数据 signed_block_ptr sb = cc.fetch_block_by_number(num); if(sb) {
       //放入消息队列并异步发送 enqueue( *sb, trigger_send); return true; } } catch ( ... ) { wlog( "write loop exception" ); } return false; }

 

8.packed_transaction

  节点A把多个transacton放在一起进行打包发送,收到packed_transaction消息的节点会对其进行各种校验,如果校验结果正确,会把数据缓存到本地,然后再对本端所有p2p-peer-address的地址进行广播。所以对于多个transaction的数据,在这里就实现了在多个地址之间相互快速传播的功能。

void net_plugin_impl::handle_message( connection_ptr c, const packed_transaction &msg) {      fc_dlog(logger, "got a packed transaction, cancel wait");      peer_ilog(c, "received packed_transaction");      if( sync_master->is_active(c) ) {         fc_dlog(logger, "got a txn during sync - dropping");         return;      }      transaction_id_type tid = msg.id();     //收到数据后取异步定时器      c->cancel_wait();      if(local_txns.get
().find(tid) != local_txns.end()) { fc_dlog(logger, "got a duplicate transaction - dropping"); return; }     //将数据保存到本地的缓存中 dispatcher->recv_transaction(c, tid); uint64_t code = 0;     //对数据进行校验,然后把结果传递给回调函数 chain_plug->accept_transaction(msg, [=](const static_variant
& result) { if (result.contains
()) { auto e_ptr = result.get
(); if (e_ptr->code() != tx_duplicate::code_value && e_ptr->code() != expired_tx_exception::code_value) elog("accept txn threw ${m}",("m",result.get
()->to_detail_string())); peer_elog(c, "bad packed_transaction : ${m}", ("m",result.get
()->what())); } else { auto trace = result.get
();          if (!trace->except) { fc_dlog(logger, "chain accepted transaction");                    //对其它p2p-peer-address进行广播,数据互传 dispatcher->bcast_transaction(msg); return; } peer_elog(c, "bad packed_transaction : ${m}", ("m",trace->except->what())); }      //数据校给失败,本地缓存数据回滚 dispatcher->rejected_transaction(tid); }); }

 

 

 

 

 

 

转载于:https://www.cnblogs.com/hbright/p/9287669.html

你可能感兴趣的文章
Microsoft Baseline Configuration Analyzer 2.0(MBCA) For Windows Server 2012
查看>>
SU Demos 03T-F Analysis-03Suphasevel
查看>>
继承(day09)
查看>>
javaWeb学习之tomcat服务器
查看>>
Sass入门——基本特性-基础
查看>>
c++ 一个构造函数 调用 另一个 构造函数
查看>>
Vue 仿QQ左滑删除组件
查看>>
分割线
查看>>
xls的读写
查看>>
用函数创建子进程
查看>>
1- vue django restful framework 打造生鲜超市
查看>>
SQL SERVER定时任务执行跟踪--供远程查看 [原创]
查看>>
迷途の荣耀 Chapter Ⅱ
查看>>
安装vue模板时,选择webpack-simple还是Webpack?
查看>>
使用Collections类对 集合排序
查看>>
Binding to other control:Funny Face
查看>>
Part7-时钟初始化_lesson1
查看>>
Myeclipse配置插件
查看>>
gitlab配置通过smtp发送邮件(QQ exmail腾讯企业为例)
查看>>
struts2学习笔记——常见报错及解决方法汇总(持续更新)
查看>>