From a5c05987fa97b10f73f417635480e14d0877781f Mon Sep 17 00:00:00 2001 From: "zhaohj@sina.com" <81799935> Date: Thu, 2 Apr 2020 11:14:15 +0800 Subject: [PATCH] zhaohuanjun add pbft --- src/main/protobuf/peer.proto | 17 ++ .../scala/rep/app/conf/SystemProfile.scala | 17 +- .../cache/pbft/TransactionPoolOfPBFT.scala | 51 ++++ .../pbft/ConfirmOfBlockOfPBFT.scala | 155 +++++++++++ .../ISequencialAlgorithmOfVote.scala | 2 +- .../consensus/common/vote/IVoter.scala | 2 +- .../network/consensus/pbft/MsgOfPBFT.scala | 68 +++++ .../consensus/pbft/block/BlockerOfPBFT.scala | 249 ++++++++++++++++++ .../pbft/block/EndorseCollector.scala | 114 ++++++++ .../block/EndorsementRequest4Future.scala | 117 ++++++++ .../consensus/pbft/block/GenesisBlocker.scala | 129 +++++++++ .../endorse/DispatchOfRecvEndorsement.scala | 71 +++++ .../pbft/endorse/Endorser4Future.scala | 230 ++++++++++++++++ .../consensus/pbft/endorse/PbftCommit.scala | 78 ++++++ .../pbft/endorse/PbftPrePrepare.scala | 63 +++++ .../consensus/pbft/endorse/PbftPrepare.scala | 94 +++++++ .../consensus/pbft/vote/VoterOfPBFT.scala | 93 +++++++ .../module/pbft/ModuleManagerOfPBFT.scala | 59 +++++ .../network/module/pbft/PBFTActorType.scala | 28 ++ .../persistence/pbft/StoragerOfPBFT.scala | 38 +++ .../parser/pbft/IPBFTOfSynchAnalyzer.scala | 67 +++++ .../request/pbft/SynchRequesterOfPBFT.scala | 76 ++++++ .../scala/rep/network/tools/NodeMgr.scala | 5 + .../scala/rep/network/util/NodeHelp.scala | 22 +- 24 files changed, 1830 insertions(+), 15 deletions(-) create mode 100644 src/main/scala/rep/network/cache/pbft/TransactionPoolOfPBFT.scala create mode 100644 src/main/scala/rep/network/confirmblock/pbft/ConfirmOfBlockOfPBFT.scala create mode 100644 src/main/scala/rep/network/consensus/pbft/MsgOfPBFT.scala create mode 100644 src/main/scala/rep/network/consensus/pbft/block/BlockerOfPBFT.scala create mode 100644 src/main/scala/rep/network/consensus/pbft/block/EndorseCollector.scala create mode 100644 src/main/scala/rep/network/consensus/pbft/block/EndorsementRequest4Future.scala create mode 100644 src/main/scala/rep/network/consensus/pbft/block/GenesisBlocker.scala create mode 100644 src/main/scala/rep/network/consensus/pbft/endorse/DispatchOfRecvEndorsement.scala create mode 100644 src/main/scala/rep/network/consensus/pbft/endorse/Endorser4Future.scala create mode 100644 src/main/scala/rep/network/consensus/pbft/endorse/PbftCommit.scala create mode 100644 src/main/scala/rep/network/consensus/pbft/endorse/PbftPrePrepare.scala create mode 100644 src/main/scala/rep/network/consensus/pbft/endorse/PbftPrepare.scala create mode 100644 src/main/scala/rep/network/consensus/pbft/vote/VoterOfPBFT.scala create mode 100644 src/main/scala/rep/network/module/pbft/ModuleManagerOfPBFT.scala create mode 100644 src/main/scala/rep/network/module/pbft/PBFTActorType.scala create mode 100644 src/main/scala/rep/network/persistence/pbft/StoragerOfPBFT.scala create mode 100644 src/main/scala/rep/network/sync/parser/pbft/IPBFTOfSynchAnalyzer.scala create mode 100644 src/main/scala/rep/network/sync/request/pbft/SynchRequesterOfPBFT.scala diff --git a/src/main/protobuf/peer.proto b/src/main/protobuf/peer.proto index e9db293e..12114321 100644 --- a/src/main/protobuf/peer.proto +++ b/src/main/protobuf/peer.proto @@ -175,3 +175,20 @@ message BlockchainInfo { bytes currentStateHash = 5; } + +//zhj +message MPbftPrepare { + Signature signature = 1; +} + +//zhj +message MPbftCommit { + repeated MPbftPrepare prepares = 1; + Signature signature = 2; +} + +//zhj +message MPbftReply { + repeated MPbftCommit commits = 1; + Signature signature = 2; +} diff --git a/src/main/scala/rep/app/conf/SystemProfile.scala b/src/main/scala/rep/app/conf/SystemProfile.scala index 3af0c450..2c85f7e2 100644 --- a/src/main/scala/rep/app/conf/SystemProfile.scala +++ b/src/main/scala/rep/app/conf/SystemProfile.scala @@ -59,8 +59,11 @@ object SystemProfile { private[this] var _HAS_PRELOAD_TRANS_OF_API = true private[this] var _IS_VERIFY_OF_ENDORSEMENT = true//is_verify_of_endorsement private[this] var _NUMBER_OF_ENDORSEMENT: Int = 2 - private[this] var _TYPE_OF_CONSENSUS:String = "CFRD" - + private[this] var _TYPE_OF_CONSENSUS:String = "PBFT" + + //zhj + private[this] var _PBFT_F: Int = 1 + private[this] var _BLOCKNUMBER_OF_RAFT: Int = 100 private[this] var _DBPATH:String = "" //leveldb数据库文件路径 @@ -97,7 +100,10 @@ object SystemProfile { private def REALTIMEGRAPH_ENABLE = _REALTIMEGRAPH_ENABLE private def TYPE_OF_CONSENSUS : String = _TYPE_OF_CONSENSUS - + + //zhj + private def PBFT_F = _PBFT_F + private def DBPATH:String = _DBPATH private def BLOCKPATH:String = _BLOCKPATH private def FILEMAX: Int = _FILEMAX @@ -263,7 +269,10 @@ object SystemProfile { FILEMAX_=(config.getInt("system.storage.filemax")) REALTIMEGRAPH_ENABLE_=(config.getInt("system.realtimegraph_enable")) } - + + //zhj + def getPbftF = PBFT_F + def getRealtimeGraph = REALTIMEGRAPH_ENABLE def getDBPath = DBPATH diff --git a/src/main/scala/rep/network/cache/pbft/TransactionPoolOfPBFT.scala b/src/main/scala/rep/network/cache/pbft/TransactionPoolOfPBFT.scala new file mode 100644 index 00000000..f7f492fc --- /dev/null +++ b/src/main/scala/rep/network/cache/pbft/TransactionPoolOfPBFT.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BA SIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +//zhj +package rep.network.cache.pbft + +import akka.actor.Props +import rep.network.cache.ITransactionPool +import rep.network.consensus.pbft.MsgOfPBFT +import rep.network.consensus.pbft.MsgOfPBFT.VoteOfBlocker +import rep.network.consensus.pbft.vote.VoterOfPBFT +import rep.network.module.ModuleActorType +import rep.network.module.pbft.PBFTActorType + +/** + * 交易缓冲池伴生对象 + * + * @author shidianyue + * @version 1.0 + */ +object TransactionPoolOfPBFT { + def props(name: String): Props = Props(classOf[TransactionPoolOfPBFT], name) + //交易检查结果 + case class CheckedTransactionResult(result: Boolean, msg: String) + +} +/** + * 交易缓冲池类 + * + * @author shidianyue + * @version 1.0 + * @param moduleName + */ + +class TransactionPoolOfPBFT(moduleName: String) extends ITransactionPool(moduleName) { + override protected def sendVoteMessage: Unit = { + pe.getActorRef(PBFTActorType.ActorType.voter) ! VoteOfBlocker("cache") + } +} diff --git a/src/main/scala/rep/network/confirmblock/pbft/ConfirmOfBlockOfPBFT.scala b/src/main/scala/rep/network/confirmblock/pbft/ConfirmOfBlockOfPBFT.scala new file mode 100644 index 00000000..0625bde9 --- /dev/null +++ b/src/main/scala/rep/network/confirmblock/pbft/ConfirmOfBlockOfPBFT.scala @@ -0,0 +1,155 @@ +/* + * Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BA SIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +//zhj +package rep.network.confirmblock.pbft + +import akka.actor.{ActorRef, Props} +import akka.util.Timeout +import rep.app.conf.SystemProfile +import rep.log.{RepLogger, RepTimeTracer} +import rep.network.autotransaction.Topic +import rep.network.base.ModuleBase +import rep.network.confirmblock.IConfirmOfBlock +import rep.network.consensus.common.MsgOfConsensus +import rep.network.consensus.common.MsgOfConsensus.{BatchStore, BlockRestore} +import rep.network.consensus.pbft.MsgOfPBFT +import rep.network.consensus.util.BlockVerify +import rep.network.module.ModuleActorType.ActorType +import rep.network.persistence.IStorager.SourceOfBlock +import rep.utils.GlobalUtils.EventType + +import scala.concurrent._ + +object ConfirmOfBlockOfPBFT { + def props(name: String): Props = Props(classOf[ConfirmOfBlockOfPBFT], name) +} + +class ConfirmOfBlockOfPBFT(moduleName: String) extends IConfirmOfBlock(moduleName) { + import context.dispatcher + + override def preStart(): Unit = { + RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("confirm Block module start")) + SubscribeTopic(mediator, self, selfAddr, Topic.Block, false) + } + import rep.protos.peer._ + + import scala.concurrent.duration._ + + case class DataSig(data:Array[Byte], sig : Signature) + + private def asyncVerifyEndorses(block: Block, replies : Seq[MPbftReply]): Boolean = { + val b = block.clearEndorsements.toByteArray + + val ds = scala.collection.mutable.Buffer[DataSig]() + replies.foreach( r => { + ds += DataSig(r.clearSignature.toByteArray, r.signature.get) + for (c <- r.commits) { + ds += DataSig(c.clearSignature.toByteArray, c.signature.get) + c.prepares.foreach(p=>{ + ds += DataSig(b, p.signature.get) + }) + } + }) + + /*val listOfFuture: Seq[Future[Boolean]] = block.endorsements.map(x => { + asyncVerifyEndorse(x, b) + }) */ + + val listOfFuture: Seq[Future[Boolean]] = ds.map(x => { + asyncVerifyEndorse(x.sig, x.data) + }) + + val futureOfList: Future[List[Boolean]] = Future.sequence(listOfFuture.toList).recover({ + case e: Exception => + null + }) + + val result1 = Await.result(futureOfList, timeout.duration).asInstanceOf[List[Boolean]] + + var result = true + if (result1 == null) { + result = false + } else { + result1.foreach(f => { + if (!f) { + result = false + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"comfirmOfBlock verify endorse is error, break,block height=${block.height},local height=${pe.getCurrentHeight}")) + } + }) + } + + result + } + + protected def handler(block: Block, actRefOfBlock: ActorRef): Unit ={ + RepLogger.error(RepLogger.Consensus_Logger,pe.getSysTag + ", Internal error, ConfirmOfBlockOfPBFT.handler") + } + + private def handler(block: Block, actRefOfBlock: ActorRef, replies : Seq[MPbftReply]) = { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"confirm verify endorsement start,height=${block.height}")) + var b = true + if (SystemProfile.getIsVerifyOfEndorsement) + b = asyncVerifyEndorses(block,replies) + if (b) { + pe.getBlockCacheMgr.addToCache(BlockRestore(block, SourceOfBlock.CONFIRMED_BLOCK, actRefOfBlock)) + pe.getActorRef(ActorType.storager) ! BatchStore + sendEvent(EventType.RECEIVE_INFO, mediator, pe.getSysTag, Topic.Block, Event.Action.BLOCK_NEW) + } + } + + override protected def checkedOfConfirmBlock(block: Block, actRefOfBlock: ActorRef): Unit ={ + RepLogger.error(RepLogger.Consensus_Logger,pe.getSysTag + ", Internal error, ConfirmOfBlockOfPBFT.checkedOfConfirmBlock") + } + + private def checkedOfConfirmBlock(block: Block, actRefOfBlock: ActorRef, replies : Seq[MPbftReply]) = { + if (pe.getCurrentBlockHash == "" && block.previousBlockHash.isEmpty()) { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"confirm verify blockhash,height=${block.height}")) + handler(block, actRefOfBlock, replies) + } else { + //与上一个块一致 + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"confirm verify blockhash,height=${block.height}")) + + /* if (SystemProfile.getNumberOfEndorsement == 1) { + if (block.height > pe.getCurrentHeight + 1) { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"confirm verify height,height=${block.height},localheight=${pe.getCurrentHeight }")) + pe.getActorRef(ActorType.synchrequester) ! SyncRequestOfStorager(sender,block.height) + } else { + handler(block, actRefOfBlock) + pe.setConfirmHeight(block.height) + } + } else { */ + if ( replies.size >= (SystemProfile.getPbftF + 1)) + handler(block, actRefOfBlock, replies) + //} + } + } + + override def receive = { + //Endorsement block + case MsgOfConsensus.ConfirmedBlock(block, actRefOfBlock) => + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", ConfirmOfBlockOfPBFT recv ConfirmedBlock(2 params): " + ", " + block.hashOfBlock) + RepTimeTracer.setStartTime(pe.getSysTag, "blockconfirm", System.currentTimeMillis(), block.height, block.transactions.size) + checkedOfConfirmBlock(block, actRefOfBlock, Seq.empty) + RepTimeTracer.setEndTime(pe.getSysTag, "blockconfirm", System.currentTimeMillis(), block.height, block.transactions.size) + case MsgOfPBFT.ConfirmedBlock(block, actRefOfBlock, replies) => + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", ConfirmOfBlockOfPBFT recv ConfirmedBlock: " + ", " + block.hashOfBlock) + RepTimeTracer.setStartTime(pe.getSysTag, "blockconfirm", System.currentTimeMillis(), block.height, block.transactions.size) + checkedOfConfirmBlock(block, actRefOfBlock, replies) + RepTimeTracer.setEndTime(pe.getSysTag, "blockconfirm", System.currentTimeMillis(), block.height, block.transactions.size) + case _ => //ignore + } + +} \ No newline at end of file diff --git a/src/main/scala/rep/network/consensus/common/algorithm/ISequencialAlgorithmOfVote.scala b/src/main/scala/rep/network/consensus/common/algorithm/ISequencialAlgorithmOfVote.scala index ef498e88..ab48cd47 100644 --- a/src/main/scala/rep/network/consensus/common/algorithm/ISequencialAlgorithmOfVote.scala +++ b/src/main/scala/rep/network/consensus/common/algorithm/ISequencialAlgorithmOfVote.scala @@ -1,5 +1,5 @@ package rep.network.consensus.common.algorithm - +//zhj import rep.log.RepLogger import rep.storage.util.pathUtil diff --git a/src/main/scala/rep/network/consensus/common/vote/IVoter.scala b/src/main/scala/rep/network/consensus/common/vote/IVoter.scala index 17635a5c..cc7f6606 100644 --- a/src/main/scala/rep/network/consensus/common/vote/IVoter.scala +++ b/src/main/scala/rep/network/consensus/common/vote/IVoter.scala @@ -76,7 +76,7 @@ abstract class IVoter(moduleName: String) extends ModuleBase(moduleName) { //系统属于初始化状态 if (NodeHelp.isSeedNode(pe.getSysTag)) { // 建立创世块消息 - pe.getActorRef(CFRDActorType.ActorType.gensisblock) ! GenesisBlock + pe.getActorRef(CFRDActorType.ActorType.gensisblock) ! GenesisBlock //zhj CFRD? } } else { if (!pe.isSynching) { diff --git a/src/main/scala/rep/network/consensus/pbft/MsgOfPBFT.scala b/src/main/scala/rep/network/consensus/pbft/MsgOfPBFT.scala new file mode 100644 index 00000000..1897f174 --- /dev/null +++ b/src/main/scala/rep/network/consensus/pbft/MsgOfPBFT.scala @@ -0,0 +1,68 @@ +package rep.network.consensus.pbft + +import akka.actor.{ActorRef, Address} +import rep.protos.peer.{Block, BlockchainInfo, MPbftCommit, MPbftPrepare, MPbftReply, Signature} +import rep.utils.GlobalUtils.BlockerInfo + +//zhj +/** + * Created by zhaohuanjun on 2020/03/20. + * PBFT共识协议的各类消息汇总 + */ +object MsgOfPBFT { + ////////////////////////////////Vote(抽签)消息,开始////////////////////////////// + //通知抽签模块可以抽签的消息 + //case object VoteOfBlocker + + case class VoteOfBlocker(flag:String) + + //通知抽签模块,需要强制抽签 + case object VoteOfForce + ////////////////////////////////Vote(抽签)消息,结束////////////////////////////// + + case class ConfirmedBlock(blc: Block, actRef: ActorRef, replies : Seq[MPbftReply]) + + ///////////////////////////////Block(出块)消息,开始////////////////////////////// + //抽签成功之后,向预出块的actor发送建立新块的消息,该消息由抽签actor发出 + case object CreateBlock + ///////////////////////////////Block(出块)消息,结束////////////////////////////// + + + //////////////////////////////endorsement(共识)消息,开始///////////////////////// + //背书结果消息 + case object ResultFlagOfEndorse{ + val BlockerSelfError = 1 + val CandidatorError = 2 + val BlockHeightError = 3 + val VerifyError = 4 + val EnodrseNodeIsSynching = 5 + val success = 0 + } + + //背书请求者消息 + case class RequesterOfEndorsement(blc: Block, blocker: String, endorer: Address) + case class ResendEndorseInfo(endorer: Address) + + //给背书人的背书消息 + //case class EndorsementInfo(blc: Block, blocker: String) + //EndorsementInfo => MsgPbftPrePrepare + //给背书人的背书消息 + case class MsgPbftPrePrepare(senderPath:String,block: Block, blocker: String) + + //背书收集者消息 + case class CollectEndorsement(blc: Block, blocker: String) + + //背书人返回的背书结果 + case class ResultOfEndorsed(result: Int, endor: Signature, BlockHash: String,endorserOfChainInfo:BlockchainInfo,endorserOfVote:BlockerInfo) + + //背书请求者返回的结果 + case class ResultOfEndorseRequester(result: Boolean, endor: Signature, BlockHash: String, endorser: Address) + //////////////////////////////endorsement(共识)消息,结束///////////////////////// + + + case class MsgPbftPrepare(senderPath:String,result: Int, block:Block, blocker: String, prepare: MPbftPrepare, chainInfo : BlockchainInfo) + case class MsgPbftCommit(senderPath:String,block: Block, blocker: String, commit: MPbftCommit, chainInfo : BlockchainInfo) + case class MsgPbftReply(block: Block, reply: MPbftReply, chainInfo : BlockchainInfo) + case class MsgPbftReplyOk(block: Block, replies : Seq[MPbftReply]) + +} diff --git a/src/main/scala/rep/network/consensus/pbft/block/BlockerOfPBFT.scala b/src/main/scala/rep/network/consensus/pbft/block/BlockerOfPBFT.scala new file mode 100644 index 00000000..5f77c93c --- /dev/null +++ b/src/main/scala/rep/network/consensus/pbft/block/BlockerOfPBFT.scala @@ -0,0 +1,249 @@ +/* + * Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BA SIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +//zhj +package rep.network.consensus.pbft.block + +import akka.actor.{ActorRef, Props} +import akka.pattern.{AskTimeoutException, ask} +import akka.util.Timeout +import rep.app.conf.{SystemProfile, TimePolicy} +import rep.log.{RepLogger, RepTimeTracer} +import rep.network.autotransaction.Topic +import rep.network.base.ModuleBase +import rep.network.consensus.common.MsgOfConsensus.{PreTransBlock, PreTransBlockResult} +import rep.network.consensus.pbft.MsgOfPBFT +import rep.network.consensus.pbft.MsgOfPBFT.{CollectEndorsement, VoteOfBlocker} +import rep.network.consensus.util.BlockHelp +import rep.network.module.ModuleActorType.ActorType +import rep.network.module.pbft.PBFTActorType +import rep.network.util.NodeHelp +import rep.protos.peer._ +import rep.storage.ImpDataAccess +import rep.utils.GlobalUtils.EventType + +import scala.concurrent._ +import scala.util.control.Breaks._ + +object BlockerOfPBFT { + def props(name: String): Props = Props(classOf[BlockerOfPBFT], name) +} + +/** + * 出块模块 + * + * @author shidianyue + * @version 1.0 + * @since 1.0 + * @param moduleName 模块名称 + */ +class BlockerOfPBFT(moduleName: String) extends ModuleBase(moduleName) { + + import rep.protos.peer.Transaction + + import scala.collection.mutable.ArrayBuffer + import scala.concurrent.duration._ + + val dataaccess: ImpDataAccess = ImpDataAccess.GetDataAccess(pe.getSysTag) + implicit val timeout = Timeout(TimePolicy.getTimeoutPreload.seconds) + + var preblock: Block = null + + override def preStart(): Unit = { + RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("Block module start")) + super.preStart() + } + + private def CollectedTransOfBlock(start: Int, num: Int, limitsize: Int): ArrayBuffer[Transaction] = { + var result = ArrayBuffer.empty[Transaction] + try { + val tmplist = pe.getTransPoolMgr.getTransListClone(start, num, pe.getSysTag) + if (tmplist.size > 0) { + val currenttime = System.currentTimeMillis() / 1000 + var transsize = 0 + breakable( + tmplist.foreach(f => { + transsize += f.toByteArray.size + if (transsize * 3 > limitsize) { + //区块的长度限制 + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"block too length,txid=${f.id}" + "~" + selfAddr)) + break + } else { + f +=: result + } + })) + if (result.isEmpty && tmplist.size >= SystemProfile.getMinBlockTransNum) { + result = CollectedTransOfBlock(start + num, num, limitsize) + } + } + } finally { + } + result + } + + private def ExecuteTransactionOfBlock(block: Block): Block = { + try { + //val future = pe.getActorRef(ActorType.preloaderoftransaction) ? Blocker.PreTransBlock(block, "preload") + val future = pe.getActorRef(ActorType.dispatchofpreload) ? PreTransBlock(block, "preload") + val result = Await.result(future, timeout.duration).asInstanceOf[PreTransBlockResult] + if (result.result) { + result.blc + } else { + null + } + } catch { + case e: AskTimeoutException => null + } + } + + private def CreateBlock(start: Int = 0): Block = { + RepTimeTracer.setStartTime(pe.getSysTag, "Block", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0) + RepTimeTracer.setStartTime(pe.getSysTag, "createBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0) + RepTimeTracer.setStartTime(pe.getSysTag, "collectTransToBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0) + val trans = CollectedTransOfBlock(start, SystemProfile.getLimitBlockTransNum, SystemProfile.getBlockLength).reverse + //todo 交易排序 + if (trans.size >= SystemProfile.getMinBlockTransNum) { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,CollectedTransOfBlock success,height=${pe.getBlocker.VoteHeight + 1},local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr)) + RepTimeTracer.setEndTime(pe.getSysTag, "collectTransToBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, trans.size) + //此处建立新块必须采用抽签模块的抽签结果来进行出块,否则出现刚抽完签,马上有新块的存储完成,就会出现错误 + var blc = BlockHelp.WaitingForExecutionOfBlock(pe.getBlocker.voteBlockHash, pe.getBlocker.VoteHeight + 1, trans.toSeq) + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,height=${blc.height},local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr)) + RepTimeTracer.setStartTime(pe.getSysTag, "PreloadTrans", System.currentTimeMillis(), blc.height, blc.transactions.size) + blc = ExecuteTransactionOfBlock(blc) + if (blc != null) { + RepTimeTracer.setEndTime(pe.getSysTag, "PreloadTrans", System.currentTimeMillis(), blc.height, blc.transactions.size) + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,prelaod success,height=${blc.height},local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr)) + blc = BlockHelp.AddBlockHash(blc) + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,AddBlockHash success,height=${blc.height},local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr)) + BlockHelp.AddSignToBlock(blc, pe.getSysTag) + } else { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("create new block error,preload error" + "~" + selfAddr)) + CreateBlock(start + trans.size) + } + } else { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("create new block error,trans count error" + "~" + selfAddr)) + null + } + } + + + private def CreateBlock4One(start: Int = 0): Block = { + RepTimeTracer.setStartTime(pe.getSysTag, "Block", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0) + RepTimeTracer.setStartTime(pe.getSysTag, "createBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0) + RepTimeTracer.setStartTime(pe.getSysTag, "collectTransToBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0) + val trans = CollectedTransOfBlock(start, SystemProfile.getLimitBlockTransNum, SystemProfile.getBlockLength).reverse.toSeq + //todo 交易排序 + if (trans.size >= SystemProfile.getMinBlockTransNum) { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,CollectedTransOfBlock success,height=${pe.getBlocker.VoteHeight },local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr)) + RepTimeTracer.setEndTime(pe.getSysTag, "collectTransToBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, trans.size) + //此处建立新块必须采用抽签模块的抽签结果来进行出块,否则出现刚抽完签,马上有新块的存储完成,就会出现错误 + var blc = BlockHelp.WaitingForExecutionOfBlock(pe.getCurrentBlockHash, pe.getCurrentHeight + 1, trans) + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,height=${blc.height},local height=${pe.getCurrentHeight}" + "~" + selfAddr)) + RepTimeTracer.setStartTime(pe.getSysTag, "PreloadTrans", System.currentTimeMillis(), blc.height, blc.transactions.size) + blc = ExecuteTransactionOfBlock(blc) + if (blc != null) { + RepTimeTracer.setEndTime(pe.getSysTag, "PreloadTrans", System.currentTimeMillis(), blc.height, blc.transactions.size) + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,prelaod success,height=${blc.height},local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr)) + blc = BlockHelp.AddBlockHash(blc) + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,AddBlockHash success,height=${blc.height},local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr)) + BlockHelp.AddSignToBlock(blc, pe.getSysTag) + } else { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("create new block error,preload error" + "~" + selfAddr)) + CreateBlock(start + trans.size) + } + } else { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("create new block error,trans count error" + "~" + selfAddr)) + null + } + } + + private def CreateBlockHandler = { + //if (preblock == null) { + var blc : Block = null + //if(SystemProfile.getNumberOfEndorsement == 1){ + // blc = CreateBlock4One(0) + //}else{ + blc = CreateBlock(0) + //} + + if (blc != null) { + RepTimeTracer.setEndTime(pe.getSysTag, "createBlock", System.currentTimeMillis(), blc.height, blc.transactions.size) + this.preblock = blc + schedulerLink = clearSched() + + // if (SystemProfile.getNumberOfEndorsement == 1) { + // pe.setCreateHeight(preblock.height) + // mediator ! Publish(Topic.Block, ConfirmedBlock(preblock, self)) + //}else{ + //在发出背书时,告诉对方我是当前出块人,取出系统的名称 + RepTimeTracer.setStartTime(pe.getSysTag, "Endorsement", System.currentTimeMillis(), blc.height, blc.transactions.size) + val ar = pe.getActorRef(PBFTActorType.ActorType.endorsementcollectioner) + //RepLogger.print(RepLogger.zLogger, pe.getSysTag + ", send CollectEndorsement to " + ar ) + ar ! CollectEndorsement(this.preblock, pe.getSysTag) + //} + } else { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("create new block error,CreateBlock is null" + "~" + selfAddr)) + pe.getActorRef(PBFTActorType.ActorType.voter) ! VoteOfBlocker("blocker") + } + //} + } + + override def receive = { + //创建块请求(给出块人) + case MsgOfPBFT.CreateBlock => + // //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", Blocker recv CreateBlock: " + "Now blocker=" + pe.getBlocker.blocker) + /*if(pe.getSysTag == "121000005l35120456.node1" && pe.count <= 10){ + pe.count = pe.count + 1 + throw new Exception("^^^^^^^^^^^^^^^^exception^^^^^^^^^^") + }*/ + if (!pe.isSynching) { + + // + /*if(SystemProfile.getNumberOfEndorsement == 1){ + if (NodeHelp.isBlocker(pe.getBlocker.blocker, pe.getSysTag)){ + sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Block, Event.Action.CANDIDATOR) + if (preblock == null || (preblock.previousBlockHash.toStringUtf8() != pe.getCurrentBlockHash)) { + //是出块节点 + CreateBlockHandler + } + } + + }else{ */ + ////RepLogger.print(RepLogger.zLogger, pe.getBlocker.voteBlockHash) + ////RepLogger.print(RepLogger.zLogger, pe.getCurrentBlockHash) + ////RepLogger.print(RepLogger.zLogger, if (preblock == null) null else preblock.previousBlockHash.toStringUtf8) + if (NodeHelp.isBlocker(pe.getBlocker.blocker, pe.getSysTag) + && pe.getBlocker.voteBlockHash == pe.getCurrentBlockHash) { + sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Block, Event.Action.CANDIDATOR) + + //是出块节点 + if (preblock == null || (preblock.previousBlockHash.toStringUtf8() != pe.getBlocker.voteBlockHash)) { + //RepLogger.print(RepLogger.zLogger, "CreateBlockHandler, " + "Me: "+pe.getSysTag) + CreateBlockHandler + } + } else { + //出块标识错误,暂时不用做任何处理 + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,do not blocker or blocker hash not equal current hash,height=${pe.getCurrentHeight}" + "~" + selfAddr)) + } + //} + } else { + //节点状态不对 + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,node status error,status is synching,height=${pe.getCurrentHeight}" + "~" + selfAddr)) + } + + case _ => //ignore + } + +} \ No newline at end of file diff --git a/src/main/scala/rep/network/consensus/pbft/block/EndorseCollector.scala b/src/main/scala/rep/network/consensus/pbft/block/EndorseCollector.scala new file mode 100644 index 00000000..26fb0262 --- /dev/null +++ b/src/main/scala/rep/network/consensus/pbft/block/EndorseCollector.scala @@ -0,0 +1,114 @@ +/* + * Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BA SIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package rep.network.consensus.pbft.block + +import akka.actor.Props +import akka.cluster.pubsub.DistributedPubSubMediator.Publish +import akka.routing._ +import rep.app.conf.SystemProfile +import rep.log.RepLogger +import rep.network.autotransaction.Topic +import rep.network.base.ModuleBase +import rep.network.consensus.pbft.MsgOfPBFT +import rep.network.consensus.pbft.MsgOfPBFT.{CollectEndorsement, MsgPbftReplyOk, RequesterOfEndorsement} +import rep.protos.peer._ +import rep.utils.GlobalUtils.EventType + +object EndorseCollector { + def props(name: String): Props = Props(classOf[EndorseCollector], name) +} + +class EndorseCollector(moduleName: String) extends ModuleBase(moduleName) { + import scala.collection.immutable._ + + private var router: Router = null + private var block: Block = null + private var blocker: String = null + private var recvedEndorse = new HashMap[String, Signature]() + + override def preStart(): Unit = { + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", EndorseCollector Start: " + ", " + self) + RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "EndorseCollector Start")) + } + + private def createRouter = { + if (router == null) { + var list: Array[Routee] = new Array[Routee](SystemProfile.getVoteNodeList.size()*2) + for (i <- 0 to SystemProfile.getVoteNodeList.size()*2 - 1) { + var ca = context.actorOf(EndorsementRequest4Future.props("endorsementrequester" + i), "endorsementrequester" + i) + context.watch(ca) + list(i) = new ActorRefRoutee(ca) + } + val rlist: IndexedSeq[Routee] = list.toIndexedSeq + router = Router(SmallestMailboxRoutingLogic(), rlist) + } + } + + private def resetEndorseInfo(block: Block, blocker: String) = { + this.block = block + this.blocker = blocker + this.recvedEndorse = this.recvedEndorse.empty + } + + private def clearEndorseInfo = { + this.block = null + this.blocker = null + this.recvedEndorse = this.recvedEndorse.empty + } + + override def receive = { + case CollectEndorsement(block, blocker) => + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", EndorseCollector recv CollectEndorsement: " + ", " + block.hashOfBlock) + if(!pe.isSynching){ + createRouter + if (this.block != null && this.block.hashOfBlock.toStringUtf8() == block.hashOfBlock.toStringUtf8()) { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"collectioner is waiting endorse result,height=${block.height},local height=${pe.getCurrentHeight}")) + } else { + if(block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash){ + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner recv endorsement,height=${block.height},local height=${pe.getCurrentHeight}")) + resetEndorseInfo(block, blocker) + pe.getNodeMgr.getStableNodes.foreach(f => { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner send endorsement to requester,height=${block.height},local height=${pe.getCurrentHeight}")) + router.route(RequesterOfEndorsement(block, blocker, f), self) + }) + }else{ + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner back out endorsement request,height=${block.height},local height=${pe.getCurrentHeight}")) + } + } + } + + case MsgPbftReplyOk(block, replies) => + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", EndorseCollector recv replyok: " + ", " + block.hashOfBlock) + if(!pe.isSynching) { + //block不空,该块的上一个块等于最后存储的hash,背书结果的块hash跟当前发出的块hash一致 + val hash = pe.getCurrentBlockHash + if (this.block != null) + if (this.block.previousBlockHash.toStringUtf8 == hash) + if (this.block.hashOfBlock == block.hashOfBlock) { + sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Endorsement, Event.Action.ENDORSEMENT) + //this.block = this.block.withReplies(block.replies) + mediator ! Publish(Topic.Block, new MsgOfPBFT.ConfirmedBlock(this.block, sender, replies)) + clearEndorseInfo + } + } else{ + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner back out endorsement result,local height=${pe.getCurrentHeight}")) + } + + case _ => + //RepLogger.print(RepLogger.zLogger, pe.getSysTag + ", EndorseCollector recv other message") + } +} \ No newline at end of file diff --git a/src/main/scala/rep/network/consensus/pbft/block/EndorsementRequest4Future.scala b/src/main/scala/rep/network/consensus/pbft/block/EndorsementRequest4Future.scala new file mode 100644 index 00000000..7261256b --- /dev/null +++ b/src/main/scala/rep/network/consensus/pbft/block/EndorsementRequest4Future.scala @@ -0,0 +1,117 @@ +/* + * Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BA SIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +//zhj +package rep.network.consensus.pbft.block + +import akka.actor.{ActorSelection, Address, Props} +import akka.pattern.AskTimeoutException +import akka.util.Timeout +import com.google.protobuf.ByteString +import rep.app.conf.{SystemProfile, TimePolicy} +import rep.log.RepLogger +import rep.network.base.ModuleBase +import rep.network.consensus.pbft.MsgOfPBFT +import rep.network.consensus.pbft.MsgOfPBFT.{MsgPbftPrePrepare, MsgPbftReply, MsgPbftReplyOk, RequesterOfEndorsement} +import rep.network.consensus.util.BlockVerify +import rep.protos.peer._ + +import scala.concurrent._ + +object EndorsementRequest4Future { + def props(name: String): Props = Props(classOf[EndorsementRequest4Future], name) +} + +class EndorsementRequest4Future(moduleName: String) extends ModuleBase(moduleName) { + import scala.concurrent.duration._ + case class HashReply(hash:ByteString, reply:MPbftReply) + private var recvedReplies = scala.collection.mutable.Buffer[HashReply]() + private var recvedRepliesCount = scala.collection.mutable.HashMap[ByteString, Int]() + + implicit val timeout = Timeout(TimePolicy.getTimeoutEndorse.seconds) + //private val endorsementActorName = "/user/modulemanager/endorser" + private val endorsementActorName = "/user/modulemanager/dispatchofRecvendorsement" + + override def preStart(): Unit = { + RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "EndorsementRequest4Future Start")) + } + + private def toAkkaUrl(addr: Address, actorName: String): String = { + return addr.toString + "/" + actorName; + } + + + private def handler(reqinfo: RequesterOfEndorsement) = { + schedulerLink = clearSched() + try { + val selection: ActorSelection = context.actorSelection(toAkkaUrl(reqinfo.endorer, endorsementActorName)); + val data = MsgPbftPrePrepare("", reqinfo.blc, reqinfo.blocker) + selection ! data + //val future1 = selection ? data + //Await.result(future1, timeout.duration).asInstanceOf[MsgPbftPrepare] + } catch { + case e: AskTimeoutException => + + case te: TimeoutException => + } + } + + //reply start------------------------------------------- + private def VerifyReply(block: Block, reply: MPbftReply): Boolean = { + val bb = reply.clearSignature.toByteArray + val signature = reply.signature.get//todo get? + val ev = BlockVerify.VerifyOneEndorseOfBlock(signature, bb, pe.getSysTag) + ev._1 + } + + private def ProcessMsgPbftReply(reply: MsgPbftReply){ + if (VerifyReply(reply.block,reply.reply)) { + val hash = reply.block.hashOfBlock + recvedReplies += HashReply(hash, reply.reply) + var count = 1 + if (recvedRepliesCount.contains(hash)) { + count = recvedRepliesCount.get(hash).get + 1 + } + recvedRepliesCount.put(hash, count) + if (count >= (SystemProfile.getPbftF + 1)) { + val replies = recvedReplies.filter(_.hash == reply.block.hashOfBlock).map(f=>f.reply) + .sortWith( (left,right)=> left.signature.get.certId.toString < right.signature.get.certId.toString) + //val blockWithReplies = reply.block.withReplies(replies) + context.parent ! MsgPbftReplyOk(reply.block, replies) + + recvedRepliesCount.remove(reply.block.hashOfBlock) + replies.foreach(f=> recvedReplies -= HashReply(reply.block.hashOfBlock, f)) + } + } + } + //reply end------------------------------------------- + + override def receive = { + case MsgPbftReply(block,reply,chainInfo) => + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", EndorsementRequest4Future recv reply: " + ", " + block.hashOfBlock) + ProcessMsgPbftReply(MsgPbftReply(block,reply,chainInfo)) + + case RequesterOfEndorsement(block, blocker, addr) => + //待请求背书的块的上一个块的hash不等于系统最新的上一个块的hash,停止发送背书 + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", EndorsementRequest4Future recv RequesterOfEndorsement: " + ", " + block.hashOfBlock) + if(block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash){ + handler(RequesterOfEndorsement(block, blocker, addr)) + }else{ + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"--------endorsementRequest4Future back out endorsement,prehash not equal pe.currenthash ,height=${block.height},local height=${pe.getCurrentHeight} ")) + } + + case _ => //ignore + } +} \ No newline at end of file diff --git a/src/main/scala/rep/network/consensus/pbft/block/GenesisBlocker.scala b/src/main/scala/rep/network/consensus/pbft/block/GenesisBlocker.scala new file mode 100644 index 00000000..bb5af3cb --- /dev/null +++ b/src/main/scala/rep/network/consensus/pbft/block/GenesisBlocker.scala @@ -0,0 +1,129 @@ +/* + * Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BA SIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package rep.network.consensus.pbft.block + +import akka.util.Timeout + +import scala.concurrent.duration._ +import akka.pattern.ask +import akka.pattern.AskTimeoutException + +import scala.concurrent._ +import akka.actor.{ActorRef, Address, Props} +import akka.cluster.pubsub.DistributedPubSubMediator.Publish +import com.google.protobuf.ByteString +import rep.app.conf.{SystemProfile, TimePolicy} +import rep.crypto.Sha256 +import rep.network._ +import rep.network.base.ModuleBase +import rep.protos.peer._ +import rep.storage.ImpDataAccess +import rep.utils.GlobalUtils.{BlockEvent, EventType, NodeStatus} + +import scala.collection.mutable +import com.sun.beans.decoder.FalseElementHandler + +import scala.util.control.Breaks +import rep.utils.IdTool + +import scala.util.control.Breaks._ +import rep.network.consensus.util.{BlockHelp, BlockVerify} +import rep.network.util.NodeHelp +import rep.log.RepLogger +import rep.network.autotransaction.Topic +import rep.network.consensus.common.MsgOfConsensus.{PreTransBlock, PreTransBlockResult} +import rep.network.consensus.pbft.MsgOfPBFT +import rep.network.consensus.pbft.MsgOfPBFT.ConfirmedBlock +import rep.network.module.ModuleActorType +import rep.network.module.pbft.PBFTActorType + +object GenesisBlocker { + def props(name: String): Props = Props(classOf[GenesisBlocker], name) + + case object GenesisBlock + +} + +/** + * 出块模块 + * + * @author shidianyue + * @version 1.0 + * @since 1.0 + * @param moduleName 模块名称 + */ +class GenesisBlocker(moduleName: String) extends ModuleBase(moduleName) { + + import context.dispatcher + import scala.concurrent.duration._ + import akka.actor.ActorSelection + import scala.collection.mutable.ArrayBuffer + import rep.protos.peer.{ Transaction } + + val dataaccess: ImpDataAccess = ImpDataAccess.GetDataAccess(pe.getSysTag) + implicit val timeout = Timeout(TimePolicy.getTimeoutPreload*20.seconds) + + var preblock: Block = null + + override def preStart(): Unit = { + RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "Block module start")) + SubscribeTopic(mediator, self, selfAddr, Topic.Block, true) + } + + + + private def ExecuteTransactionOfBlock(block: Block): Block = { + try { + //val future = pe.getActorRef(ActorType.preloaderoftransaction) ? PreTransBlock(block, "preload") + val future = pe.getActorRef(ModuleActorType.ActorType.dispatchofpreload) ? PreTransBlock(block, "preload") + val result = Await.result(future, timeout.duration).asInstanceOf[PreTransBlockResult] + if (result.result) { + result.blc + } else { + null + } + } catch { + case e: AskTimeoutException => null + } + } + + override def receive = { + //创建块请求(给出块人) + case GenesisBlocker.GenesisBlock => + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", pbft.block.GenesisBlock recv GenesisBlock: ") + if(dataaccess.getBlockChainInfo().height == 0 && NodeHelp.isSeedNode(pe.getSysTag) ){ + if(this.preblock != null){ + mediator ! Publish(Topic.Block, MsgOfPBFT.ConfirmedBlock(preblock, sender, Seq.empty)) + }else{ + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "Create genesis block")) + preblock = BlockHelp.CreateGenesisBlock + preblock = ExecuteTransactionOfBlock(preblock) + if (preblock != null) { + preblock = BlockHelp.AddBlockHash(preblock) + preblock = BlockHelp.AddSignToBlock(preblock, pe.getSysTag) + //sendEvent(EventType.RECEIVE_INFO, mediator, selfAddr, Topic.Block, Event.Action.BLOCK_NEW) + mediator ! Publish(Topic.Block, ConfirmedBlock(preblock, self, Seq.empty)) + //getActorRef(pe.getSysTag, ActorType.PERSISTENCE_MODULE) ! BlockRestore(blc, SourceOfBlock.CONFIRMED_BLOCK, self) + } + } + + } + + case _ => //ignore + } + +} \ No newline at end of file diff --git a/src/main/scala/rep/network/consensus/pbft/endorse/DispatchOfRecvEndorsement.scala b/src/main/scala/rep/network/consensus/pbft/endorse/DispatchOfRecvEndorsement.scala new file mode 100644 index 00000000..05b3e27f --- /dev/null +++ b/src/main/scala/rep/network/consensus/pbft/endorse/DispatchOfRecvEndorsement.scala @@ -0,0 +1,71 @@ +/* + * Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BA SIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +//zhj +package rep.network.consensus.pbft.endorse + +import akka.actor.Props +import akka.routing._ +import rep.app.conf.SystemProfile +import rep.log.RepLogger +import rep.network.base.ModuleBase +import rep.network.consensus.pbft.MsgOfPBFT.{MsgPbftCommit, MsgPbftPrePrepare, MsgPbftPrepare} + +object DispatchOfRecvEndorsement { + def props(name: String): Props = Props(classOf[DispatchOfRecvEndorsement], name) +} + + +class DispatchOfRecvEndorsement(moduleName: String) extends ModuleBase(moduleName) { + import scala.collection.immutable._ + + private var router: Router = null + + override def preStart(): Unit = { + RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "DispatchOfRecvEndorsement Start")) + } + + private def createRouter = { + if (router == null) { + var list: Array[Routee] = new Array[Routee](SystemProfile.getVoteNodeList.size()) + for (i <- 0 to SystemProfile.getVoteNodeList.size() - 1) { + var ca = context.actorOf(Endorser4Future.props("endorser" + i), "endorser" + i) + context.watch(ca) + list(i) = new ActorRefRoutee(ca) + } + val rlist: IndexedSeq[Routee] = list.toIndexedSeq + router = Router(SmallestMailboxRoutingLogic(), rlist) + } + } + + + + override def receive = { + + case MsgPbftPrePrepare(senderPath,block, blocker) => + createRouter + router.route(MsgPbftPrePrepare(senderPath,block, blocker), sender) + + case MsgPbftPrepare(senderPath,result, block, blocker, prepare, chainInfo) => + createRouter + router.route(MsgPbftPrepare(senderPath,result, block, blocker, prepare, chainInfo), sender) + + case MsgPbftCommit(senderPath,block,blocker,commit,chainInfo) => + createRouter + router.route(MsgPbftCommit(senderPath,block,blocker,commit,chainInfo), sender) + + case _ => //ignore + } +} \ No newline at end of file diff --git a/src/main/scala/rep/network/consensus/pbft/endorse/Endorser4Future.scala b/src/main/scala/rep/network/consensus/pbft/endorse/Endorser4Future.scala new file mode 100644 index 00000000..c78b09ed --- /dev/null +++ b/src/main/scala/rep/network/consensus/pbft/endorse/Endorser4Future.scala @@ -0,0 +1,230 @@ +/* + * Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BA SIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +//zhj + +package rep.network.consensus.pbft.endorse + + +import akka.actor.Props +import akka.pattern.ask +import akka.util.Timeout +import rep.app.conf.{SystemCertList, SystemProfile, TimePolicy} +import rep.network.base.ModuleBase +import rep.network.consensus.common.MsgOfConsensus.{PreTransBlock, PreTransBlockResult} +import rep.network.consensus.pbft.MsgOfPBFT.{MsgPbftCommit, MsgPbftPrePrepare, MsgPbftPrepare, ResultFlagOfEndorse} +import rep.network.module.ModuleActorType +import rep.network.module.pbft.PBFTActorType +import rep.network.util.NodeHelp +//import rep.network.consensus.vote.Voter.VoteOfBlocker +import rep.log.RepLogger +import rep.network.consensus.util.BlockVerify +import rep.network.sync.SyncMsg.StartSync + +import scala.util.control.Breaks._ + +object Endorser4Future { + def props(name: String): Props = Props(classOf[Endorser4Future], name) +} + +class Endorser4Future(moduleName: String) extends ModuleBase(moduleName) { + import context.dispatcher + import rep.protos.peer._ + import rep.storage.ImpDataAccess + + import scala.concurrent._ + import scala.concurrent.duration._ + + implicit val timeout = Timeout(TimePolicy.getTimeoutPreload.seconds) + + override def preStart(): Unit = { + RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("Endorser4Future Start")) + } + //preprepare start------------------------------------------- + private def AskPreloadTransactionOfBlock(block: Block): Future[Boolean] = + pe.getActorRef(ModuleActorType.ActorType.dispatchofpreload).ask(PreTransBlock(block, "endors"))(timeout). + mapTo[PreTransBlockResult].flatMap(f => { + val result = Promise[Boolean] + var tmpblock = f.blc.withHashOfBlock(block.hashOfBlock) + if (BlockVerify.VerifyHashOfBlock(tmpblock)) { + result.success(true) + } else { + result.success(false) + } + result.future + }).recover({ + case e: Throwable => + RepLogger.error(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"${pe.getSysTag}:entry AskPreloadTransactionOfBlock error")) + false + }) + + private def checkRepeatOfTrans(trans: Seq[Transaction]): Future[Boolean] = Future { + var isRepeat: Boolean = false + val aliaslist = trans.distinct + if (aliaslist.size != trans.size) { + isRepeat = true + } else { + val sr: ImpDataAccess = ImpDataAccess.GetDataAccess(pe.getSysTag) + breakable( + trans.foreach(f => { + if (sr.isExistTrans4Txid(f.id)) { + isRepeat = true + break + } + })) + } + isRepeat + } + + private def asyncVerifyTransaction(t: Transaction): Future[Boolean] = Future { + var result = false + + if (pe.getTransPoolMgr.findTrans(t.id)) { + result = true + } else { + val tmp = BlockVerify.VerifyOneSignOfTrans(t, pe.getSysTag) + if (tmp._1) { + result = true + } + } + result + } + + private def asyncVerifyTransactions(block: Block): Future[Boolean] = Future { + var result = true + val listOfFuture: Seq[Future[Boolean]] = block.transactions.map(x => { + asyncVerifyTransaction(x) + }) + + val futureOfList: Future[List[Boolean]] = Future.sequence(listOfFuture.toList) + + futureOfList.map(x => { + x.foreach(f => { + if (f) { + result = false + } + }) + }) + result + } + + + private def checkEndorseSign(block: Block): Future[Boolean] = Future { + var result = false + val r = BlockVerify.VerifyAllEndorseOfBlock(block, pe.getSysTag) + result = r._1 + result + } + + private def VerifyInfo(info: MsgPbftPrePrepare) = { + val transSign = asyncVerifyTransactions(info.block) + val transRepeat = checkRepeatOfTrans(info.block.transactions) + val endorseSign = checkEndorseSign(info.block) + val transExe = AskPreloadTransactionOfBlock(info.block) + val result = for { + v1 <- transSign + v2 <- transRepeat + v3 <- endorseSign + v4 <- transExe + } yield (v1 && !v2 && v3 && v4) + + Await.result(result, timeout.duration).asInstanceOf[Boolean] + } + + private def CheckMessage(block : Block, blocker: String):Boolean = { + var r = false + if (block.height > pe.getCurrentHeight + 1) { + pe.getActorRef(PBFTActorType.ActorType.synchrequester) ! StartSync(false) + } else { + if (NodeHelp.isCandidateNow(pe.getSysTag, SystemCertList.getSystemCertList) + //是候选节点,可以背书 + && (!pe.isSynching) + && (block.previousBlockHash.toStringUtf8 == pe.getBlocker.voteBlockHash) + && NodeHelp.isBlocker(blocker, pe.getBlocker.blocker)) { + r = true + } + } + r + } + + private def ProcessMsgPbftPrePrepare(prePrepare: MsgPbftPrePrepare): Unit = { + if (CheckMessage(prePrepare.block,prePrepare.blocker)) + if (prePrepare.blocker != pe.getSysTag) { + var b = true; + if (SystemProfile.getIsVerifyOfEndorsement) + b = VerifyInfo(prePrepare) + if (b) + pe.getActorRef(PBFTActorType.ActorType.pbftpreprepare) ! prePrepare + else + sender ! MsgPbftPrepare("",ResultFlagOfEndorse.VerifyError, null, null,null, pe.getSystemCurrentChainStatus) + } + } + //preprepare end------------------------------------------- + + //prepare start------------------------------------------- + private def VerifyPrepare(block: Block, prepare: MPbftPrepare): Boolean = { + val bb = block.clearEndorsements.toByteArray + val signature = prepare.signature.get//todo get? + val ev = BlockVerify.VerifyOneEndorseOfBlock(signature, bb, pe.getSysTag) + ev._1 + } + + private def ProcessMsgPbftPepare(prepare:MsgPbftPrepare){ + if (CheckMessage(prepare.block,prepare.blocker)) + if (prepare.result == ResultFlagOfEndorse.success) { + if (VerifyPrepare(prepare.block, prepare.prepare)) { + pe.getActorRef(PBFTActorType.ActorType.pbftprepare) ! prepare + } + } + } + //prepare end------------------------------------------- + + + //commit start------------------------------------------- + private def VerifyCommit(block: Block, commit: MPbftCommit): Boolean = { + val bb = commit.clearSignature.toByteArray + val signature = commit.signature.get//todo get? + val ev = BlockVerify.VerifyOneEndorseOfBlock(signature, bb, pe.getSysTag) + ev._1 + } + + private def ProcessMsgPbftCommit(commit: MsgPbftCommit){ + if (CheckMessage(commit.block,commit.blocker)) + if (VerifyCommit(commit.block, commit.commit)) { + pe.getActorRef(PBFTActorType.ActorType.pbftcommit) ! commit + } + } + //commit end------------------------------------------- + + override def receive = { + //Endorsement block + case MsgPbftPrePrepare(senderPath,block, blocker) => + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", Endorser4Future recv preprepare: " + blocker + ", " + block.hashOfBlock) + ProcessMsgPbftPrePrepare(MsgPbftPrePrepare(sender.path.toString, block, blocker)) + + + case MsgPbftPrepare(senderPath,result, block, blocker, prepare, chainInfo) => + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", Endorser4Future recv prepare: " + blocker + ", " + block.hashOfBlock) + ProcessMsgPbftPepare(MsgPbftPrepare(senderPath,result, block, blocker, prepare, chainInfo)) + + + case MsgPbftCommit(senderPath,block,blocker,commit,chainInfo) => + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", Endorser4Future recv commit: " + blocker + ", " + block.hashOfBlock) + ProcessMsgPbftCommit(MsgPbftCommit(senderPath,block,blocker,commit,chainInfo)) + + case _ => //ignore + } + +} \ No newline at end of file diff --git a/src/main/scala/rep/network/consensus/pbft/endorse/PbftCommit.scala b/src/main/scala/rep/network/consensus/pbft/endorse/PbftCommit.scala new file mode 100644 index 00000000..1321a827 --- /dev/null +++ b/src/main/scala/rep/network/consensus/pbft/endorse/PbftCommit.scala @@ -0,0 +1,78 @@ +/** + * @created zhaohuanjun 2020-03 +*/ +//zhj +package rep.network.consensus.pbft.endorse + +import akka.actor.Props +import akka.util.Timeout +import com.google.protobuf.ByteString +import com.google.protobuf.timestamp.Timestamp +import rep.app.conf.{SystemProfile, TimePolicy} +import rep.crypto.cert.SignTool +import rep.log.RepLogger +import rep.network.base.ModuleBase +import rep.network.consensus.pbft.MsgOfPBFT.{MsgPbftCommit, MsgPbftReply} +import rep.utils.{IdTool, TimeUtils} + +case object PbftCommit { + def props(name: String): Props = Props(classOf[PbftCommit], name) +} + +class PbftCommit(moduleName: String) extends ModuleBase(moduleName) { + import rep.protos.peer._ + + import scala.concurrent.duration._ + + case class HashCommit(hash:ByteString, commit:MPbftCommit) + + + private var recvedCommits = scala.collection.mutable.Buffer[HashCommit]() + private var recvedCommitsCount = scala.collection.mutable.HashMap[ByteString, Int]() + + implicit val timeout = Timeout(TimePolicy.getTimeoutPreload.seconds) + + override def preStart(): Unit = { + RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("PbftCommit Start")) + } + + private def ProcessMsgPbftCommit(commit: MsgPbftCommit){ + val commits = recvedCommits.filter(_.hash == commit.block.hashOfBlock).map(f=>f.commit) + .sortWith( (left,right)=> left.signature.get.certId.toString < right.signature.get.certId.toString) + val bytes = MPbftReply().withCommits(commits).toByteArray + //commits.map(f=>f.toByteString).reduce((a,f)=>a.concat(f)) + val certId = IdTool.getCertIdFromName(pe.getSysTag) + val millis = TimeUtils.getCurrentTime() + val sig = Signature(Option(certId),Option(Timestamp(millis / 1000, ((millis % 1000) * 1000000).toInt)), + ByteString.copyFrom(SignTool.sign(pe.getSysTag, bytes))) + var reply : MPbftReply = MPbftReply() + .withCommits(commits) + .withSignature(sig) + + val actor = context.actorSelection(commit.senderPath) + actor ! MsgPbftReply(commit.block,reply,pe.getSystemCurrentChainStatus) + + recvedCommitsCount.remove(commit.block.hashOfBlock) + commits.foreach(f=> recvedCommits -= HashCommit(commit.block.hashOfBlock, f)) + + } + + override def receive = { + + case MsgPbftCommit(senderPath,block,blocker,commit,chainInfo) => + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", PbftCommit recv commit: " + blocker + ", " + block.hashOfBlock) + //already verified + val hash = block.hashOfBlock + recvedCommits += HashCommit(hash, commit) + var count = 1 + if (recvedCommitsCount.contains(hash)) { + count = recvedCommitsCount.get(hash).get +1 + } + recvedCommitsCount.put(hash,count) + if ( count >= (2*SystemProfile.getPbftF+1)) + ProcessMsgPbftCommit(MsgPbftCommit(senderPath,block,blocker,commit,chainInfo)) + + case _ => //ignore + } + +} \ No newline at end of file diff --git a/src/main/scala/rep/network/consensus/pbft/endorse/PbftPrePrepare.scala b/src/main/scala/rep/network/consensus/pbft/endorse/PbftPrePrepare.scala new file mode 100644 index 00000000..591a4f4d --- /dev/null +++ b/src/main/scala/rep/network/consensus/pbft/endorse/PbftPrePrepare.scala @@ -0,0 +1,63 @@ +/** + * @created zhaohuanjun 2020-03 +*/ + +package rep.network.consensus.pbft.endorse + +/* + * Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BA SIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import akka.actor.{ActorSelection, Props} +import akka.util.Timeout +import rep.app.conf.TimePolicy +import rep.network.base.ModuleBase +import rep.log.RepLogger +import rep.network.consensus.pbft.MsgOfPBFT.{MsgPbftPrePrepare, MsgPbftPrepare, ResultFlagOfEndorse} +import rep.network.consensus.util.BlockHelp + +case object PbftPrePrepare { + def props(name: String): Props = Props(classOf[PbftPrePrepare], name) +} + +class PbftPrePrepare(moduleName: String) extends ModuleBase(moduleName) { + import rep.protos.peer._ + + import scala.concurrent.duration._ + + implicit val timeout = Timeout(TimePolicy.getTimeoutPreload.seconds) + + override def preStart(): Unit = { + RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("PbftPrePrepare Start")) + } + + private def ProcessMsgPbftPrePrepare(prePrepare: MsgPbftPrePrepare): Unit = { + pe.getNodeMgr.getStableNodes.foreach(f => { + val actorPath = f.toString + "/user/modulemanager/dispatchofRecvendorsement" + val actor : ActorSelection = context.actorSelection(actorPath) + val prepare : MPbftPrepare = MPbftPrepare().withSignature(BlockHelp.SignBlock(prePrepare.block, pe.getSysTag)) + actor ! MsgPbftPrepare(prePrepare.senderPath, ResultFlagOfEndorse.success, prePrepare.block, prePrepare.blocker,prepare, pe.getSystemCurrentChainStatus) + }) + } + + override def receive = { + case MsgPbftPrePrepare(senderPath,block, blocker) => + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", PbftPrePrepare recv preprepare: " + blocker + ", " + block.hashOfBlock) + ProcessMsgPbftPrePrepare(MsgPbftPrePrepare(senderPath, block, blocker)) + + case _ => //ignore + } + +} \ No newline at end of file diff --git a/src/main/scala/rep/network/consensus/pbft/endorse/PbftPrepare.scala b/src/main/scala/rep/network/consensus/pbft/endorse/PbftPrepare.scala new file mode 100644 index 00000000..2cc41e32 --- /dev/null +++ b/src/main/scala/rep/network/consensus/pbft/endorse/PbftPrepare.scala @@ -0,0 +1,94 @@ +/** + * @created zhaohuanjun 2020-03 +*/ +//zhj +package rep.network.consensus.pbft.endorse + +/* + * Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BA SIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import akka.actor.{ActorSelection, Props} +import akka.util.Timeout +import com.google.protobuf.ByteString +import com.google.protobuf.timestamp.Timestamp +import rep.app.conf.{SystemProfile, TimePolicy} +import rep.crypto.cert.SignTool +import rep.log.RepLogger +import rep.network.base.ModuleBase +import rep.network.consensus.pbft.MsgOfPBFT.{MsgPbftCommit, MsgPbftPrepare} +import rep.utils.{IdTool, TimeUtils} + +case object PbftPrepare { + def props(name: String): Props = Props(classOf[PbftPrepare], name) +} + +class PbftPrepare(moduleName: String) extends ModuleBase(moduleName) { + import rep.protos.peer._ + + import scala.concurrent.duration._ + + case class HashPrepare(hash:ByteString, prepare:MPbftPrepare) + + private var recvedPrepares = scala.collection.mutable.Buffer[HashPrepare]() + private val recvedPreparesCount = scala.collection.mutable.HashMap[ByteString, Int]() + + implicit val timeout = Timeout(TimePolicy.getTimeoutPreload.seconds) + + override def preStart(): Unit = { + RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("PbftPrepare Start")) + } + + private def ProcessMsgPbftPepare(prepare:MsgPbftPrepare) = { + val prepares = recvedPrepares.filter(_.hash == prepare.block.hashOfBlock).map(f=>f.prepare) + .sortWith( (left,right)=> left.signature.get.certId.toString < right.signature.get.certId.toString) + val bytes = MPbftCommit().withPrepares(prepares).toByteArray//prepares.reduce((a,f)=>a.toByteString.concat(f.toByteString)) + val certId = IdTool.getCertIdFromName(pe.getSysTag) + val millis = TimeUtils.getCurrentTime() + val sig = Signature(Option(certId),Option(Timestamp(millis / 1000, ((millis % 1000) * 1000000).toInt)), + ByteString.copyFrom(SignTool.sign(pe.getSysTag, bytes))) + var commit : MPbftCommit = MPbftCommit() + .withPrepares(prepares) + .withSignature(sig) + + pe.getNodeMgr.getStableNodes.foreach(f => { + val actorPath = f.toString + "/user/modulemanager/dispatchofRecvendorsement" + val actor : ActorSelection = context.actorSelection(actorPath) + actor ! MsgPbftCommit(prepare.senderPath,prepare.block,prepare.blocker,commit,pe.getSystemCurrentChainStatus) + }) + + recvedPreparesCount.remove(prepare.block.hashOfBlock) + prepares.foreach(f=> recvedPrepares -= HashPrepare(prepare.block.hashOfBlock, f)) + } + + override def receive = { + + case MsgPbftPrepare(senderPath,result, block, blocker, prepare, chainInfo) => + //already verified + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", PbftPrepare recv prepare: " + blocker + ", " + block.hashOfBlock) + val hash = block.hashOfBlock + recvedPrepares += HashPrepare(hash, prepare) + var count = 1 + if (recvedPreparesCount.contains(hash)) { + count = recvedPreparesCount.get(hash).get +1 + } + recvedPreparesCount.put(hash,count) + if ( count >= 2*SystemProfile.getPbftF) + ProcessMsgPbftPepare(MsgPbftPrepare(senderPath,result, block, blocker, prepare, chainInfo)) + + case _ => //ignore + } + +} \ No newline at end of file diff --git a/src/main/scala/rep/network/consensus/pbft/vote/VoterOfPBFT.scala b/src/main/scala/rep/network/consensus/pbft/vote/VoterOfPBFT.scala new file mode 100644 index 00000000..64bf1278 --- /dev/null +++ b/src/main/scala/rep/network/consensus/pbft/vote/VoterOfPBFT.scala @@ -0,0 +1,93 @@ +//zhj + +package rep.network.consensus.pbft.vote + +import akka.actor.Props +import rep.app.conf.{SystemCertList, TimePolicy} +import rep.log.RepLogger +import rep.network.consensus.pbft.MsgOfPBFT.{CreateBlock, VoteOfBlocker, VoteOfForce} +import rep.network.consensus.common.algorithm.{IRandomAlgorithmOfVote, ISequencialAlgorithmOfVote} +import rep.network.consensus.common.vote.IVoter +import rep.network.module.pbft.PBFTActorType +import rep.network.util.NodeHelp + +object VoterOfPBFT{ + def props(name: String): Props = Props(classOf[VoterOfPBFT],name) +} + +class VoterOfPBFT(moduleName: String) extends IVoter(moduleName: String) { + import context.dispatcher + + import scala.concurrent.duration._ + + this.algorithmInVoted = new ISequencialAlgorithmOfVote + + override def preStart(): Unit = { + RepLogger.info(RepLogger.Vote_Logger, this.getLogMsgPrefix("VoterOfPBFT module start")) + } + + override protected def NoticeBlockerMsg: Unit = { + if (this.Blocker.blocker.equals(pe.getSysTag)) { + //发送建立新块的消息 + pe.getActorRef(PBFTActorType.ActorType.blocker) ! CreateBlock + } + } + + override protected def DelayVote: Unit = { + this.voteCount += 1 + var time = this.voteCount * TimePolicy.getVoteRetryDelay + schedulerLink = clearSched() + schedulerLink = scheduler.scheduleOnce(TimePolicy.getVoteRetryDelay.millis, self, VoteOfBlocker("voter")) + } + + override protected def vote(isForce: Boolean): Unit = { + if (checkTranNum || isForce) { + val currentblockhash = pe.getCurrentBlockHash + val currentheight = pe.getCurrentHeight + if (this.Blocker.voteBlockHash == "") { + this.cleanVoteInfo + this.resetCandidator(currentblockhash) + this.resetBlocker(0, currentblockhash, currentheight) + RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag},first voter,blocker=${this.Blocker.blocker},voteidx=${this.Blocker.VoteIndex}" + "~" + selfAddr)) + } else { + if (!this.Blocker.voteBlockHash.equals(currentblockhash)) { + //抽签的基础块已经变化,需要重续选择候选人 + this.cleanVoteInfo + this.resetCandidator(currentblockhash) + this.resetBlocker(0, currentblockhash, currentheight) + RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag},hash change,reset voter,height=${currentheight},hash=${currentblockhash},blocker=${this.Blocker.blocker},voteidx=${this.Blocker.VoteIndex}" + "~" + selfAddr)) + } else { + if (this.Blocker.blocker == "") { + this.cleanVoteInfo + this.resetCandidator(currentblockhash) + this.resetBlocker(0, currentblockhash, currentheight) + RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag},blocker=null,reset voter,height=${currentheight},blocker=${this.Blocker.blocker},voteidx=${this.Blocker.VoteIndex}" + "~" + selfAddr)) + } else { + if (((System.currentTimeMillis() - this.Blocker.voteTime) / 1000 > TimePolicy.getTimeOutBlock) + ||(!pe.getNodeMgr.getStableNodeNames.contains(this.Blocker.blocker))) { //zhj + //说明出块超时 + this.voteCount = 0 + this.resetBlocker(this.Blocker.VoteIndex + 1, currentblockhash, currentheight) + RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag},block timeout,reset voter,height=${currentheight},blocker=${this.Blocker.blocker},voteidx=${this.Blocker.VoteIndex}" + "~" + selfAddr)) + } else { + NoticeBlockerMsg + } + } + } + } + } else { + RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag},transaction is not enough,waiting transaction,height=${pe.getCurrentHeight}" + "~" + selfAddr)) + } + } + + override def receive: Receive = { + case VoteOfBlocker(flag:String) => + ////RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", Voter recv VoteOfBlocker: " + flag) + if (NodeHelp.isCandidateNow(pe.getSysTag, SystemCertList.getSystemCertList)) { + voteMsgHandler(false) + } + case VoteOfForce=> + //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", Voter recv VoteOfForce: ") + voteMsgHandler(true) + } +} diff --git a/src/main/scala/rep/network/module/pbft/ModuleManagerOfPBFT.scala b/src/main/scala/rep/network/module/pbft/ModuleManagerOfPBFT.scala new file mode 100644 index 00000000..5c48749c --- /dev/null +++ b/src/main/scala/rep/network/module/pbft/ModuleManagerOfPBFT.scala @@ -0,0 +1,59 @@ +//zhj + +package rep.network.module.pbft + +import akka.actor.Props +import rep.log.RepLogger +import rep.network.cache.pbft.TransactionPoolOfPBFT +import rep.network.confirmblock.common.ConfirmOfBlock +import rep.network.confirmblock.pbft.ConfirmOfBlockOfPBFT +import rep.network.consensus.pbft.block.{BlockerOfPBFT, EndorseCollector} +import rep.network.module.{IModuleManager, ModuleActorType} +import rep.network.sync.response.SynchronizeResponser +import rep.network.consensus.pbft.endorse.{PbftCommit, PbftPrePrepare, PbftPrepare} +import rep.network.consensus.pbft.MsgOfPBFT.VoteOfBlocker +import rep.network.consensus.pbft.endorse.DispatchOfRecvEndorsement +import rep.network.persistence.StoragerOfPBFT +import rep.network.consensus.pbft.vote +import rep.network.consensus.pbft.vote.VoterOfPBFT +import rep.network.sync.request.pbft.SynchRequesterOfPBFT + +/** + * Created by zhaohuanjun on 2020/03/30. + * PBFT的模块管理类,继承公共的模块管理类,实现PBFT的自己的模块 + */ +object ModuleManagerOfPBFT{ + def props(name: String, sysTag: String, enableStatistic: Boolean, enableWebSocket: Boolean, isStartup: Boolean): Props = Props(classOf[ModuleManagerOfPBFT], name, sysTag, enableStatistic: Boolean, enableWebSocket: Boolean, isStartup: Boolean) + +} + +class ModuleManagerOfPBFT(moduleName: String, sysTag: String, enableStatistic: Boolean, enableWebSocket: Boolean, isStartup: Boolean) extends IModuleManager(moduleName,sysTag, enableStatistic, enableWebSocket, isStartup){ + + override def preStart(): Unit = { + RepLogger.info(RepLogger.System_Logger, this.getLogMsgPrefix( "ModuleManagerOfPBFT Start")) + } + + override def startupConsensus: Unit = { + pe.getActorRef(PBFTActorType.ActorType.voter) ! VoteOfBlocker("startup") + } + + override def loadConsensusModule = { + pe.register(ModuleActorType.ActorType.transactionpool, context.actorOf(TransactionPoolOfPBFT.props("transactionpool"), "transactionpool"))//zhj + pe.register(ModuleActorType.ActorType.storager,context.actorOf(StoragerOfPBFT.props("storager"), "storager")) + pe.register(PBFTActorType.ActorType.blocker,context.actorOf(BlockerOfPBFT.props("blocker"), "blocker")) + pe.register(PBFTActorType.ActorType.confirmerofblock,context.actorOf(ConfirmOfBlockOfPBFT.props("confirmerofblock"), "confirmerofblock")) + pe.register(PBFTActorType.ActorType.endorsementcollectioner,context.actorOf(EndorseCollector.props("endorsementcollectioner"), "endorsementcollectioner")) + pe.register(PBFTActorType.ActorType.dispatchofRecvendorsement,context.actorOf(DispatchOfRecvEndorsement.props("dispatchofRecvendorsement"), "dispatchofRecvendorsement")) + pe.register(PBFTActorType.ActorType.voter,context.actorOf(VoterOfPBFT.props("voter"), "voter")) + + + pe.register(PBFTActorType.ActorType.synchrequester,context.actorOf(SynchRequesterOfPBFT.props("synchrequester"), "synchrequester")) + pe.register(PBFTActorType.ActorType.synchresponser,context.actorOf(SynchronizeResponser.props("synchresponser"), "synchresponser")) + + pe.register(PBFTActorType.ActorType.pbftpreprepare, context.actorOf(PbftPrePrepare.props("pbftpreprepare"), "pbftpreprepare")) + pe.register(PBFTActorType.ActorType.pbftprepare, context.actorOf(PbftPrepare.props("pbftprepare"), "pbftprepare")) + pe.register(PBFTActorType.ActorType.pbftcommit, context.actorOf(PbftCommit.props("pbftcommit"), "pbftcommit")) + + } + +} diff --git a/src/main/scala/rep/network/module/pbft/PBFTActorType.scala b/src/main/scala/rep/network/module/pbft/PBFTActorType.scala new file mode 100644 index 00000000..fcec2143 --- /dev/null +++ b/src/main/scala/rep/network/module/pbft/PBFTActorType.scala @@ -0,0 +1,28 @@ +//zhj + +package rep.network.module.pbft + +/** + * Created by jiangbuyun on 2020/03/15. + * CFRD管理的actor + */ +object PBFTActorType { + //cfrd共识模式的actor类型的注册,关键字以30开头 + case object ActorType{ + val blocker = 201 + val voter = 202 + val endorsementcollectioner = 203 + + val confirmerofblock = 204 + val dispatchofRecvendorsement = 205 + val gensisblock = 206 + + val synchrequester = 207 + val synchresponser = 208 + + val pbftpreprepare = 209 + val pbftprepare = 210 + val pbftcommit = 211 + + } +} diff --git a/src/main/scala/rep/network/persistence/pbft/StoragerOfPBFT.scala b/src/main/scala/rep/network/persistence/pbft/StoragerOfPBFT.scala new file mode 100644 index 00000000..7130305a --- /dev/null +++ b/src/main/scala/rep/network/persistence/pbft/StoragerOfPBFT.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BA SIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package rep.network.persistence + +import akka.actor.Props +import rep.log.RepLogger +import rep.network.consensus.pbft.MsgOfPBFT.VoteOfBlocker +import rep.network.module.pbft.PBFTActorType + +object StoragerOfPBFT { + def props(name: String): Props = Props(classOf[StoragerOfPBFT], name) +} + +class StoragerOfPBFT(moduleName: String) extends IStorager (moduleName: String) { + + override def preStart(): Unit = { + RepLogger.info(RepLogger.Storager_Logger, this.getLogMsgPrefix( "Storager Start")) + } + + override protected def sendVoteMessage: Unit = { + pe.getActorRef( PBFTActorType.ActorType.voter) ! VoteOfBlocker("persistence") + } + +} \ No newline at end of file diff --git a/src/main/scala/rep/network/sync/parser/pbft/IPBFTOfSynchAnalyzer.scala b/src/main/scala/rep/network/sync/parser/pbft/IPBFTOfSynchAnalyzer.scala new file mode 100644 index 00000000..67fea553 --- /dev/null +++ b/src/main/scala/rep/network/sync/parser/pbft/IPBFTOfSynchAnalyzer.scala @@ -0,0 +1,67 @@ +package rep.network.sync.parser.pbft + +import rep.log.RepLogger +import rep.network.sync.SyncMsg.AnalysisResult +import rep.network.sync.SyncMsg +import rep.network.sync.parser.ISynchAnalyzer +import rep.network.tools.NodeMgr +import rep.network.util.NodeHelp +import rep.protos.peer.BlockchainInfo + +/** + * Created by jiangbuyun on 2020/03/18. + * 基于PBF共识协议的同步区块信息分析的实现类 + */ + +class IPBFTOfSynchAnalyzer(systemName: String, lchaininfo: BlockchainInfo, nodemgr: NodeMgr) extends ISynchAnalyzer( systemName, lchaininfo, nodemgr) { + + override def Parser(reslist: List[SyncMsg.ResponseInfo], isStartupSynch: Boolean): Unit = { + val lh = lchaininfo.height + val nodes = nodemgr.getStableNodes + + if (NodeHelp.ConsensusConditionChecked(reslist.length, nodes.size)) { + //获取到到chaininfo信息的数量,得到大多数节点的响应,进一步判断同步的策略 + //获取返回的chaininfo信息中,大多数节点的相同高度的最大值 + val heightStatis = reslist.groupBy(x => x.response.height).map(x => (x._1, x._2.length)).toList.sortBy(x => -x._2) + val maxheight = heightStatis.head._1 + var nodesOfmaxHeight = heightStatis.head._2 + + RepLogger.debug(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"--------Info,获取同步的返回信息,结果=${reslist.mkString("|")}")) + + RepLogger.info(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"--------Info,获取同步的返回信息,统计结果=${heightStatis.mkString("|")}")) + + if (NodeHelp.ConsensusConditionChecked(nodesOfmaxHeight, nodes.size)) { + //得到了真正大多数节点相同的高度 + val agreementResult = checkHashAgreement(maxheight, reslist, nodes.size, 1) + if (agreementResult._1) { + //当前同步高度的最后高度的块hash一致 + if (maxheight > lh) { + this.greaterThanLocalHeight(reslist, maxheight, agreementResult._2) + } else if (maxheight == lh) { + this.equalLocalHeight(reslist, maxheight, agreementResult._2) + } else { + if (isStartupSynch) { + this.lessThanLocalHeight(maxheight, agreementResult._2) + } else { + this.aresult = AnalysisResult(1, "") + } + RepLogger.error(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix("--------info,本地高度大于远端高度,停止同步")) + } + } else { + //当前同步高度的最后高度的块hash不一致,输出错误信息,停止同步 + this.aresult = AnalysisResult(0, "当前同步高度的最后高度的块hash不一致,输出错误信息,停止同步") + RepLogger.error(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix("--------error,当前同步高度的最后高度的块hash不一致,输出错误信息,停止同步")) + } + } else { + //最多数量的高度,达不到共识的要求,输出错误信息停止同步 + this.aresult = AnalysisResult(2, s"最多数量的高度,达不到共识的要求,输出错误信息停止同步 response size=${reslist.size}") + RepLogger.error(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"--------error,最多数量的高度,达不到共识的要求,输出错误信息停止同步 response size=${reslist.size}")) + } + } else { + //获取到到chaininfo信息的数量,没有得到大多数节点的响应,输出错误信息停止同步 + this.aresult = AnalysisResult(0, s"获取到到chaininfo信息的数量,没有得到大多数节点的响应,输出错误信息停止同步 response size=${reslist.size}") + RepLogger.error(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"--------error,获取到到chaininfo信息的数量,没有得到大多数节点的响应,输出错误信息停止同步 response size=${reslist.size}")) + } + } + +} diff --git a/src/main/scala/rep/network/sync/request/pbft/SynchRequesterOfPBFT.scala b/src/main/scala/rep/network/sync/request/pbft/SynchRequesterOfPBFT.scala new file mode 100644 index 00000000..c8bed4c1 --- /dev/null +++ b/src/main/scala/rep/network/sync/request/pbft/SynchRequesterOfPBFT.scala @@ -0,0 +1,76 @@ +package rep.network.sync.request.pbft + + +import akka.actor.Props +import rep.app.conf.SystemProfile +import rep.log.RepLogger +import rep.network.module.{IModuleManager, ModuleActorType} +import rep.network.sync.SyncMsg.{MaxBlockInfo, StartSync, SyncRequestOfStorager} +import rep.network.sync.parser.ISynchAnalyzer +import rep.network.sync.request.ISynchRequester +import rep.network.sync.parser.pbft.IPBFTOfSynchAnalyzer + +import rep.network.util.NodeHelp + +import scala.concurrent.duration._ + +/** + * Created by zhaohuanjun on 2020/03/30. + * 基于PBFT共识协议的同步actor的实现类 + */ + +object SynchRequesterOfPBFT{ + def props(name: String): Props = Props(classOf[SynchRequesterOfPBFT], name) +} + +class SynchRequesterOfPBFT(moduleName: String) extends ISynchRequester(moduleName: String) { + import context.dispatcher + + override protected def getAnalyzerInSynch: ISynchAnalyzer = { + new IPBFTOfSynchAnalyzer(pe.getSysTag, pe.getSystemCurrentChainStatus, pe.getNodeMgr) + } + + override def receive: Receive = { + case StartSync(isNoticeModuleMgr: Boolean) => + schedulerLink = clearSched() + var rb = true + initSystemChainInfo + if (pe.getNodeMgr.getStableNodes.size >= SystemProfile.getVoteNoteMin && !pe.isSynching) { + pe.setSynching(true) + try { + rb = Handler(isNoticeModuleMgr) + } catch { + case e: Exception => + rb = false + RepLogger.trace(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"request synch excep,msg=${e.getMessage}")) + } + pe.setSynching(false) + if (rb) { + if (isNoticeModuleMgr) + pe.getActorRef(ModuleActorType.ActorType.modulemanager) ! IModuleManager.startup_Consensus + } else { + schedulerLink = scheduler.scheduleOnce(1.second, self, StartSync(isNoticeModuleMgr)) + } + + } else { + RepLogger.trace(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"too few node,min=${SystemProfile.getVoteNoteMin} or synching from actorAddr" + "~" + NodeHelp.getNodePath(sender()))) + } + + case SyncRequestOfStorager(responser, maxHeight) => + RepLogger.trace(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"entry blockdata synch,maxheight=${maxHeight},responser=${responser}")) + if (!pe.isSynching) { + pe.setSynching(true) + RepLogger.trace(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"start blockdata synch,currentheight=${pe.getCurrentHeight},maxheight=${maxHeight}")) + try { + getBlockDatas(pe.getCurrentHeight, maxHeight, responser) + } catch { + case e: Exception => + pe.setSynching(false) + } + RepLogger.trace(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"stop blockdata synch,maxheight=${maxHeight}")) + pe.setSynching(false) + } + } + + override protected def setStartVoteInfo(maxblockinfo:MaxBlockInfo): Unit = {} +} diff --git a/src/main/scala/rep/network/tools/NodeMgr.scala b/src/main/scala/rep/network/tools/NodeMgr.scala index 2cce88ec..4230b6a6 100644 --- a/src/main/scala/rep/network/tools/NodeMgr.scala +++ b/src/main/scala/rep/network/tools/NodeMgr.scala @@ -64,6 +64,11 @@ class NodeMgr { } } + //zhj add + def getStableNodeNames: Set[String] = { + stableNodes.values.toSet + } + def getStableNodes: Set[Address] = { stableNodes.keys.toSet } diff --git a/src/main/scala/rep/network/util/NodeHelp.scala b/src/main/scala/rep/network/util/NodeHelp.scala index 9eb29c45..8b8ecdf9 100644 --- a/src/main/scala/rep/network/util/NodeHelp.scala +++ b/src/main/scala/rep/network/util/NodeHelp.scala @@ -55,15 +55,19 @@ object NodeHelp { } def ConsensusConditionChecked(inputNumber: Int, nodeNumber: Int): Boolean = { - var scaledata = SystemProfile.getNumberOfEndorsement - if(SystemProfile.getNumberOfEndorsement == 1){ - scaledata = 2 - } - - if(scaledata == 2){ - (inputNumber - 1) >= Math.floor(((nodeNumber) * 1.0) / scaledata) - }else{ - (inputNumber - 1) >= Math.floor((((nodeNumber) * 1.0) / scaledata)*2) + if (SystemProfile.getTypeOfConsensus == "PBFT") { //zhj + true + } else { + var scaledata = SystemProfile.getNumberOfEndorsement + if (SystemProfile.getNumberOfEndorsement == 1) { + scaledata = 2 + } + + if (scaledata == 2) { + (inputNumber - 1) >= Math.floor(((nodeNumber) * 1.0) / scaledata) + } else { + (inputNumber - 1) >= Math.floor((((nodeNumber) * 1.0) / scaledata) * 2) + } } }