zhaohuanjun add pbft

This commit is contained in:
zhaohj@sina.com 2020-04-02 11:14:15 +08:00
parent db713450bd
commit a5c05987fa
24 changed files with 1830 additions and 15 deletions

View File

@ -175,3 +175,20 @@ message BlockchainInfo {
bytes currentStateHash = 5;
}
//zhj
message MPbftPrepare {
Signature signature = 1;
}
//zhj
message MPbftCommit {
repeated MPbftPrepare prepares = 1;
Signature signature = 2;
}
//zhj
message MPbftReply {
repeated MPbftCommit commits = 1;
Signature signature = 2;
}

View File

@ -59,8 +59,11 @@ object SystemProfile {
private[this] var _HAS_PRELOAD_TRANS_OF_API = true
private[this] var _IS_VERIFY_OF_ENDORSEMENT = true//is_verify_of_endorsement
private[this] var _NUMBER_OF_ENDORSEMENT: Int = 2
private[this] var _TYPE_OF_CONSENSUS:String = "CFRD"
private[this] var _TYPE_OF_CONSENSUS:String = "PBFT"
//zhj
private[this] var _PBFT_F: Int = 1
private[this] var _BLOCKNUMBER_OF_RAFT: Int = 100
private[this] var _DBPATH:String = "" //leveldb数据库文件路径
@ -97,7 +100,10 @@ object SystemProfile {
private def REALTIMEGRAPH_ENABLE = _REALTIMEGRAPH_ENABLE
private def TYPE_OF_CONSENSUS : String = _TYPE_OF_CONSENSUS
//zhj
private def PBFT_F = _PBFT_F
private def DBPATH:String = _DBPATH
private def BLOCKPATH:String = _BLOCKPATH
private def FILEMAX: Int = _FILEMAX
@ -263,7 +269,10 @@ object SystemProfile {
FILEMAX_=(config.getInt("system.storage.filemax"))
REALTIMEGRAPH_ENABLE_=(config.getInt("system.realtimegraph_enable"))
}
//zhj
def getPbftF = PBFT_F
def getRealtimeGraph = REALTIMEGRAPH_ENABLE
def getDBPath = DBPATH

View File

@ -0,0 +1,51 @@
/*
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BA SIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//zhj
package rep.network.cache.pbft
import akka.actor.Props
import rep.network.cache.ITransactionPool
import rep.network.consensus.pbft.MsgOfPBFT
import rep.network.consensus.pbft.MsgOfPBFT.VoteOfBlocker
import rep.network.consensus.pbft.vote.VoterOfPBFT
import rep.network.module.ModuleActorType
import rep.network.module.pbft.PBFTActorType
/**
* 交易缓冲池伴生对象
*
* @author shidianyue
* @version 1.0
*/
object TransactionPoolOfPBFT {
def props(name: String): Props = Props(classOf[TransactionPoolOfPBFT], name)
//交易检查结果
case class CheckedTransactionResult(result: Boolean, msg: String)
}
/**
* 交易缓冲池类
*
* @author shidianyue
* @version 1.0
* @param moduleName
*/
class TransactionPoolOfPBFT(moduleName: String) extends ITransactionPool(moduleName) {
override protected def sendVoteMessage: Unit = {
pe.getActorRef(PBFTActorType.ActorType.voter) ! VoteOfBlocker("cache")
}
}

View File

@ -0,0 +1,155 @@
/*
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BA SIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//zhj
package rep.network.confirmblock.pbft
import akka.actor.{ActorRef, Props}
import akka.util.Timeout
import rep.app.conf.SystemProfile
import rep.log.{RepLogger, RepTimeTracer}
import rep.network.autotransaction.Topic
import rep.network.base.ModuleBase
import rep.network.confirmblock.IConfirmOfBlock
import rep.network.consensus.common.MsgOfConsensus
import rep.network.consensus.common.MsgOfConsensus.{BatchStore, BlockRestore}
import rep.network.consensus.pbft.MsgOfPBFT
import rep.network.consensus.util.BlockVerify
import rep.network.module.ModuleActorType.ActorType
import rep.network.persistence.IStorager.SourceOfBlock
import rep.utils.GlobalUtils.EventType
import scala.concurrent._
object ConfirmOfBlockOfPBFT {
def props(name: String): Props = Props(classOf[ConfirmOfBlockOfPBFT], name)
}
class ConfirmOfBlockOfPBFT(moduleName: String) extends IConfirmOfBlock(moduleName) {
import context.dispatcher
override def preStart(): Unit = {
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("confirm Block module start"))
SubscribeTopic(mediator, self, selfAddr, Topic.Block, false)
}
import rep.protos.peer._
import scala.concurrent.duration._
case class DataSig(data:Array[Byte], sig : Signature)
private def asyncVerifyEndorses(block: Block, replies : Seq[MPbftReply]): Boolean = {
val b = block.clearEndorsements.toByteArray
val ds = scala.collection.mutable.Buffer[DataSig]()
replies.foreach( r => {
ds += DataSig(r.clearSignature.toByteArray, r.signature.get)
for (c <- r.commits) {
ds += DataSig(c.clearSignature.toByteArray, c.signature.get)
c.prepares.foreach(p=>{
ds += DataSig(b, p.signature.get)
})
}
})
/*val listOfFuture: Seq[Future[Boolean]] = block.endorsements.map(x => {
asyncVerifyEndorse(x, b)
}) */
val listOfFuture: Seq[Future[Boolean]] = ds.map(x => {
asyncVerifyEndorse(x.sig, x.data)
})
val futureOfList: Future[List[Boolean]] = Future.sequence(listOfFuture.toList).recover({
case e: Exception =>
null
})
val result1 = Await.result(futureOfList, timeout.duration).asInstanceOf[List[Boolean]]
var result = true
if (result1 == null) {
result = false
} else {
result1.foreach(f => {
if (!f) {
result = false
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"comfirmOfBlock verify endorse is error, break,block height=${block.height},local height=${pe.getCurrentHeight}"))
}
})
}
result
}
protected def handler(block: Block, actRefOfBlock: ActorRef): Unit ={
RepLogger.error(RepLogger.Consensus_Logger,pe.getSysTag + ", Internal error, ConfirmOfBlockOfPBFT.handler")
}
private def handler(block: Block, actRefOfBlock: ActorRef, replies : Seq[MPbftReply]) = {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"confirm verify endorsement start,height=${block.height}"))
var b = true
if (SystemProfile.getIsVerifyOfEndorsement)
b = asyncVerifyEndorses(block,replies)
if (b) {
pe.getBlockCacheMgr.addToCache(BlockRestore(block, SourceOfBlock.CONFIRMED_BLOCK, actRefOfBlock))
pe.getActorRef(ActorType.storager) ! BatchStore
sendEvent(EventType.RECEIVE_INFO, mediator, pe.getSysTag, Topic.Block, Event.Action.BLOCK_NEW)
}
}
override protected def checkedOfConfirmBlock(block: Block, actRefOfBlock: ActorRef): Unit ={
RepLogger.error(RepLogger.Consensus_Logger,pe.getSysTag + ", Internal error, ConfirmOfBlockOfPBFT.checkedOfConfirmBlock")
}
private def checkedOfConfirmBlock(block: Block, actRefOfBlock: ActorRef, replies : Seq[MPbftReply]) = {
if (pe.getCurrentBlockHash == "" && block.previousBlockHash.isEmpty()) {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"confirm verify blockhash,height=${block.height}"))
handler(block, actRefOfBlock, replies)
} else {
//与上一个块一致
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"confirm verify blockhash,height=${block.height}"))
/* if (SystemProfile.getNumberOfEndorsement == 1) {
if (block.height > pe.getCurrentHeight + 1) {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"confirm verify height,height=${block.height}localheight=${pe.getCurrentHeight }"))
pe.getActorRef(ActorType.synchrequester) ! SyncRequestOfStorager(sender,block.height)
} else {
handler(block, actRefOfBlock)
pe.setConfirmHeight(block.height)
}
} else { */
if ( replies.size >= (SystemProfile.getPbftF + 1))
handler(block, actRefOfBlock, replies)
//}
}
}
override def receive = {
//Endorsement block
case MsgOfConsensus.ConfirmedBlock(block, actRefOfBlock) =>
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", ConfirmOfBlockOfPBFT recv ConfirmedBlock(2 params): " + ", " + block.hashOfBlock)
RepTimeTracer.setStartTime(pe.getSysTag, "blockconfirm", System.currentTimeMillis(), block.height, block.transactions.size)
checkedOfConfirmBlock(block, actRefOfBlock, Seq.empty)
RepTimeTracer.setEndTime(pe.getSysTag, "blockconfirm", System.currentTimeMillis(), block.height, block.transactions.size)
case MsgOfPBFT.ConfirmedBlock(block, actRefOfBlock, replies) =>
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", ConfirmOfBlockOfPBFT recv ConfirmedBlock: " + ", " + block.hashOfBlock)
RepTimeTracer.setStartTime(pe.getSysTag, "blockconfirm", System.currentTimeMillis(), block.height, block.transactions.size)
checkedOfConfirmBlock(block, actRefOfBlock, replies)
RepTimeTracer.setEndTime(pe.getSysTag, "blockconfirm", System.currentTimeMillis(), block.height, block.transactions.size)
case _ => //ignore
}
}

View File

@ -1,5 +1,5 @@
package rep.network.consensus.common.algorithm
//zhj
import rep.log.RepLogger
import rep.storage.util.pathUtil

View File

@ -76,7 +76,7 @@ abstract class IVoter(moduleName: String) extends ModuleBase(moduleName) {
//系统属于初始化状态
if (NodeHelp.isSeedNode(pe.getSysTag)) {
// 建立创世块消息
pe.getActorRef(CFRDActorType.ActorType.gensisblock) ! GenesisBlock
pe.getActorRef(CFRDActorType.ActorType.gensisblock) ! GenesisBlock //zhj CFRD?
}
} else {
if (!pe.isSynching) {

View File

@ -0,0 +1,68 @@
package rep.network.consensus.pbft
import akka.actor.{ActorRef, Address}
import rep.protos.peer.{Block, BlockchainInfo, MPbftCommit, MPbftPrepare, MPbftReply, Signature}
import rep.utils.GlobalUtils.BlockerInfo
//zhj
/**
* Created by zhaohuanjun on 2020/03/20.
* PBFT共识协议的各类消息汇总
*/
object MsgOfPBFT {
////////////////////////////////Vote抽签消息开始//////////////////////////////
//通知抽签模块可以抽签的消息
//case object VoteOfBlocker
case class VoteOfBlocker(flag:String)
//通知抽签模块需要强制抽签
case object VoteOfForce
////////////////////////////////Vote抽签消息结束//////////////////////////////
case class ConfirmedBlock(blc: Block, actRef: ActorRef, replies : Seq[MPbftReply])
///////////////////////////////Block出块消息开始//////////////////////////////
//抽签成功之后向预出块的actor发送建立新块的消息该消息由抽签actor发出
case object CreateBlock
///////////////////////////////Block出块消息结束//////////////////////////////
//////////////////////////////endorsement共识消息开始/////////////////////////
//背书结果消息
case object ResultFlagOfEndorse{
val BlockerSelfError = 1
val CandidatorError = 2
val BlockHeightError = 3
val VerifyError = 4
val EnodrseNodeIsSynching = 5
val success = 0
}
//背书请求者消息
case class RequesterOfEndorsement(blc: Block, blocker: String, endorer: Address)
case class ResendEndorseInfo(endorer: Address)
//给背书人的背书消息
//case class EndorsementInfo(blc: Block, blocker: String)
//EndorsementInfo => MsgPbftPrePrepare
//给背书人的背书消息
case class MsgPbftPrePrepare(senderPath:String,block: Block, blocker: String)
//背书收集者消息
case class CollectEndorsement(blc: Block, blocker: String)
//背书人返回的背书结果
case class ResultOfEndorsed(result: Int, endor: Signature, BlockHash: String,endorserOfChainInfo:BlockchainInfo,endorserOfVote:BlockerInfo)
//背书请求者返回的结果
case class ResultOfEndorseRequester(result: Boolean, endor: Signature, BlockHash: String, endorser: Address)
//////////////////////////////endorsement共识消息结束/////////////////////////
case class MsgPbftPrepare(senderPath:String,result: Int, block:Block, blocker: String, prepare: MPbftPrepare, chainInfo : BlockchainInfo)
case class MsgPbftCommit(senderPath:String,block: Block, blocker: String, commit: MPbftCommit, chainInfo : BlockchainInfo)
case class MsgPbftReply(block: Block, reply: MPbftReply, chainInfo : BlockchainInfo)
case class MsgPbftReplyOk(block: Block, replies : Seq[MPbftReply])
}

View File

@ -0,0 +1,249 @@
/*
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BA SIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//zhj
package rep.network.consensus.pbft.block
import akka.actor.{ActorRef, Props}
import akka.pattern.{AskTimeoutException, ask}
import akka.util.Timeout
import rep.app.conf.{SystemProfile, TimePolicy}
import rep.log.{RepLogger, RepTimeTracer}
import rep.network.autotransaction.Topic
import rep.network.base.ModuleBase
import rep.network.consensus.common.MsgOfConsensus.{PreTransBlock, PreTransBlockResult}
import rep.network.consensus.pbft.MsgOfPBFT
import rep.network.consensus.pbft.MsgOfPBFT.{CollectEndorsement, VoteOfBlocker}
import rep.network.consensus.util.BlockHelp
import rep.network.module.ModuleActorType.ActorType
import rep.network.module.pbft.PBFTActorType
import rep.network.util.NodeHelp
import rep.protos.peer._
import rep.storage.ImpDataAccess
import rep.utils.GlobalUtils.EventType
import scala.concurrent._
import scala.util.control.Breaks._
object BlockerOfPBFT {
def props(name: String): Props = Props(classOf[BlockerOfPBFT], name)
}
/**
* 出块模块
*
* @author shidianyue
* @version 1.0
* @since 1.0
* @param moduleName 模块名称
*/
class BlockerOfPBFT(moduleName: String) extends ModuleBase(moduleName) {
import rep.protos.peer.Transaction
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
val dataaccess: ImpDataAccess = ImpDataAccess.GetDataAccess(pe.getSysTag)
implicit val timeout = Timeout(TimePolicy.getTimeoutPreload.seconds)
var preblock: Block = null
override def preStart(): Unit = {
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("Block module start"))
super.preStart()
}
private def CollectedTransOfBlock(start: Int, num: Int, limitsize: Int): ArrayBuffer[Transaction] = {
var result = ArrayBuffer.empty[Transaction]
try {
val tmplist = pe.getTransPoolMgr.getTransListClone(start, num, pe.getSysTag)
if (tmplist.size > 0) {
val currenttime = System.currentTimeMillis() / 1000
var transsize = 0
breakable(
tmplist.foreach(f => {
transsize += f.toByteArray.size
if (transsize * 3 > limitsize) {
//区块的长度限制
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"block too length,txid=${f.id}" + "~" + selfAddr))
break
} else {
f +=: result
}
}))
if (result.isEmpty && tmplist.size >= SystemProfile.getMinBlockTransNum) {
result = CollectedTransOfBlock(start + num, num, limitsize)
}
}
} finally {
}
result
}
private def ExecuteTransactionOfBlock(block: Block): Block = {
try {
//val future = pe.getActorRef(ActorType.preloaderoftransaction) ? Blocker.PreTransBlock(block, "preload")
val future = pe.getActorRef(ActorType.dispatchofpreload) ? PreTransBlock(block, "preload")
val result = Await.result(future, timeout.duration).asInstanceOf[PreTransBlockResult]
if (result.result) {
result.blc
} else {
null
}
} catch {
case e: AskTimeoutException => null
}
}
private def CreateBlock(start: Int = 0): Block = {
RepTimeTracer.setStartTime(pe.getSysTag, "Block", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0)
RepTimeTracer.setStartTime(pe.getSysTag, "createBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0)
RepTimeTracer.setStartTime(pe.getSysTag, "collectTransToBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0)
val trans = CollectedTransOfBlock(start, SystemProfile.getLimitBlockTransNum, SystemProfile.getBlockLength).reverse
//todo 交易排序
if (trans.size >= SystemProfile.getMinBlockTransNum) {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,CollectedTransOfBlock success,height=${pe.getBlocker.VoteHeight + 1},local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr))
RepTimeTracer.setEndTime(pe.getSysTag, "collectTransToBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, trans.size)
//此处建立新块必须采用抽签模块的抽签结果来进行出块否则出现刚抽完签马上有新块的存储完成就会出现错误
var blc = BlockHelp.WaitingForExecutionOfBlock(pe.getBlocker.voteBlockHash, pe.getBlocker.VoteHeight + 1, trans.toSeq)
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,height=${blc.height},local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr))
RepTimeTracer.setStartTime(pe.getSysTag, "PreloadTrans", System.currentTimeMillis(), blc.height, blc.transactions.size)
blc = ExecuteTransactionOfBlock(blc)
if (blc != null) {
RepTimeTracer.setEndTime(pe.getSysTag, "PreloadTrans", System.currentTimeMillis(), blc.height, blc.transactions.size)
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,prelaod success,height=${blc.height},local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr))
blc = BlockHelp.AddBlockHash(blc)
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,AddBlockHash success,height=${blc.height},local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr))
BlockHelp.AddSignToBlock(blc, pe.getSysTag)
} else {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("create new block error,preload error" + "~" + selfAddr))
CreateBlock(start + trans.size)
}
} else {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("create new block error,trans count error" + "~" + selfAddr))
null
}
}
private def CreateBlock4One(start: Int = 0): Block = {
RepTimeTracer.setStartTime(pe.getSysTag, "Block", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0)
RepTimeTracer.setStartTime(pe.getSysTag, "createBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0)
RepTimeTracer.setStartTime(pe.getSysTag, "collectTransToBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0)
val trans = CollectedTransOfBlock(start, SystemProfile.getLimitBlockTransNum, SystemProfile.getBlockLength).reverse.toSeq
//todo 交易排序
if (trans.size >= SystemProfile.getMinBlockTransNum) {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,CollectedTransOfBlock success,height=${pe.getBlocker.VoteHeight },local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr))
RepTimeTracer.setEndTime(pe.getSysTag, "collectTransToBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, trans.size)
//此处建立新块必须采用抽签模块的抽签结果来进行出块否则出现刚抽完签马上有新块的存储完成就会出现错误
var blc = BlockHelp.WaitingForExecutionOfBlock(pe.getCurrentBlockHash, pe.getCurrentHeight + 1, trans)
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,height=${blc.height},local height=${pe.getCurrentHeight}" + "~" + selfAddr))
RepTimeTracer.setStartTime(pe.getSysTag, "PreloadTrans", System.currentTimeMillis(), blc.height, blc.transactions.size)
blc = ExecuteTransactionOfBlock(blc)
if (blc != null) {
RepTimeTracer.setEndTime(pe.getSysTag, "PreloadTrans", System.currentTimeMillis(), blc.height, blc.transactions.size)
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,prelaod success,height=${blc.height},local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr))
blc = BlockHelp.AddBlockHash(blc)
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,AddBlockHash success,height=${blc.height},local height=${pe.getBlocker.VoteHeight}" + "~" + selfAddr))
BlockHelp.AddSignToBlock(blc, pe.getSysTag)
} else {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("create new block error,preload error" + "~" + selfAddr))
CreateBlock(start + trans.size)
}
} else {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("create new block error,trans count error" + "~" + selfAddr))
null
}
}
private def CreateBlockHandler = {
//if (preblock == null) {
var blc : Block = null
//if(SystemProfile.getNumberOfEndorsement == 1){
// blc = CreateBlock4One(0)
//}else{
blc = CreateBlock(0)
//}
if (blc != null) {
RepTimeTracer.setEndTime(pe.getSysTag, "createBlock", System.currentTimeMillis(), blc.height, blc.transactions.size)
this.preblock = blc
schedulerLink = clearSched()
// if (SystemProfile.getNumberOfEndorsement == 1) {
// pe.setCreateHeight(preblock.height)
// mediator ! Publish(Topic.Block, ConfirmedBlock(preblock, self))
//}else{
//在发出背书时告诉对方我是当前出块人取出系统的名称
RepTimeTracer.setStartTime(pe.getSysTag, "Endorsement", System.currentTimeMillis(), blc.height, blc.transactions.size)
val ar = pe.getActorRef(PBFTActorType.ActorType.endorsementcollectioner)
//RepLogger.print(RepLogger.zLogger, pe.getSysTag + ", send CollectEndorsement to " + ar )
ar ! CollectEndorsement(this.preblock, pe.getSysTag)
//}
} else {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("create new block error,CreateBlock is null" + "~" + selfAddr))
pe.getActorRef(PBFTActorType.ActorType.voter) ! VoteOfBlocker("blocker")
}
//}
}
override def receive = {
//创建块请求给出块人
case MsgOfPBFT.CreateBlock =>
// //RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", Blocker recv CreateBlock: " + "Now blocker=" + pe.getBlocker.blocker)
/*if(pe.getSysTag == "121000005l35120456.node1" && pe.count <= 10){
pe.count = pe.count + 1
throw new Exception("^^^^^^^^^^^^^^^^exception^^^^^^^^^^")
}*/
if (!pe.isSynching) {
//
/*if(SystemProfile.getNumberOfEndorsement == 1){
if (NodeHelp.isBlocker(pe.getBlocker.blocker, pe.getSysTag)){
sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Block, Event.Action.CANDIDATOR)
if (preblock == null || (preblock.previousBlockHash.toStringUtf8() != pe.getCurrentBlockHash)) {
//是出块节点
CreateBlockHandler
}
}
}else{ */
////RepLogger.print(RepLogger.zLogger, pe.getBlocker.voteBlockHash)
////RepLogger.print(RepLogger.zLogger, pe.getCurrentBlockHash)
////RepLogger.print(RepLogger.zLogger, if (preblock == null) null else preblock.previousBlockHash.toStringUtf8)
if (NodeHelp.isBlocker(pe.getBlocker.blocker, pe.getSysTag)
&& pe.getBlocker.voteBlockHash == pe.getCurrentBlockHash) {
sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Block, Event.Action.CANDIDATOR)
//是出块节点
if (preblock == null || (preblock.previousBlockHash.toStringUtf8() != pe.getBlocker.voteBlockHash)) {
//RepLogger.print(RepLogger.zLogger, "CreateBlockHandler, " + "Me: "+pe.getSysTag)
CreateBlockHandler
}
} else {
//出块标识错误,暂时不用做任何处理
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,do not blocker or blocker hash not equal current hash,height=${pe.getCurrentHeight}" + "~" + selfAddr))
}
//}
} else {
//节点状态不对
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,node status error,status is synching,height=${pe.getCurrentHeight}" + "~" + selfAddr))
}
case _ => //ignore
}
}

View File

@ -0,0 +1,114 @@
/*
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BA SIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rep.network.consensus.pbft.block
import akka.actor.Props
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.routing._
import rep.app.conf.SystemProfile
import rep.log.RepLogger
import rep.network.autotransaction.Topic
import rep.network.base.ModuleBase
import rep.network.consensus.pbft.MsgOfPBFT
import rep.network.consensus.pbft.MsgOfPBFT.{CollectEndorsement, MsgPbftReplyOk, RequesterOfEndorsement}
import rep.protos.peer._
import rep.utils.GlobalUtils.EventType
object EndorseCollector {
def props(name: String): Props = Props(classOf[EndorseCollector], name)
}
class EndorseCollector(moduleName: String) extends ModuleBase(moduleName) {
import scala.collection.immutable._
private var router: Router = null
private var block: Block = null
private var blocker: String = null
private var recvedEndorse = new HashMap[String, Signature]()
override def preStart(): Unit = {
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", EndorseCollector Start: " + ", " + self)
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "EndorseCollector Start"))
}
private def createRouter = {
if (router == null) {
var list: Array[Routee] = new Array[Routee](SystemProfile.getVoteNodeList.size()*2)
for (i <- 0 to SystemProfile.getVoteNodeList.size()*2 - 1) {
var ca = context.actorOf(EndorsementRequest4Future.props("endorsementrequester" + i), "endorsementrequester" + i)
context.watch(ca)
list(i) = new ActorRefRoutee(ca)
}
val rlist: IndexedSeq[Routee] = list.toIndexedSeq
router = Router(SmallestMailboxRoutingLogic(), rlist)
}
}
private def resetEndorseInfo(block: Block, blocker: String) = {
this.block = block
this.blocker = blocker
this.recvedEndorse = this.recvedEndorse.empty
}
private def clearEndorseInfo = {
this.block = null
this.blocker = null
this.recvedEndorse = this.recvedEndorse.empty
}
override def receive = {
case CollectEndorsement(block, blocker) =>
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", EndorseCollector recv CollectEndorsement: " + ", " + block.hashOfBlock)
if(!pe.isSynching){
createRouter
if (this.block != null && this.block.hashOfBlock.toStringUtf8() == block.hashOfBlock.toStringUtf8()) {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"collectioner is waiting endorse result,height=${block.height},local height=${pe.getCurrentHeight}"))
} else {
if(block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash){
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner recv endorsement,height=${block.height},local height=${pe.getCurrentHeight}"))
resetEndorseInfo(block, blocker)
pe.getNodeMgr.getStableNodes.foreach(f => {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner send endorsement to requester,height=${block.height},local height=${pe.getCurrentHeight}"))
router.route(RequesterOfEndorsement(block, blocker, f), self)
})
}else{
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner back out endorsement request,height=${block.height},local height=${pe.getCurrentHeight}"))
}
}
}
case MsgPbftReplyOk(block, replies) =>
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", EndorseCollector recv replyok: " + ", " + block.hashOfBlock)
if(!pe.isSynching) {
//block不空该块的上一个块等于最后存储的hash背书结果的块hash跟当前发出的块hash一致
val hash = pe.getCurrentBlockHash
if (this.block != null)
if (this.block.previousBlockHash.toStringUtf8 == hash)
if (this.block.hashOfBlock == block.hashOfBlock) {
sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Endorsement, Event.Action.ENDORSEMENT)
//this.block = this.block.withReplies(block.replies)
mediator ! Publish(Topic.Block, new MsgOfPBFT.ConfirmedBlock(this.block, sender, replies))
clearEndorseInfo
}
} else{
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner back out endorsement result,local height=${pe.getCurrentHeight}"))
}
case _ =>
//RepLogger.print(RepLogger.zLogger, pe.getSysTag + ", EndorseCollector recv other message")
}
}

View File

@ -0,0 +1,117 @@
/*
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BA SIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//zhj
package rep.network.consensus.pbft.block
import akka.actor.{ActorSelection, Address, Props}
import akka.pattern.AskTimeoutException
import akka.util.Timeout
import com.google.protobuf.ByteString
import rep.app.conf.{SystemProfile, TimePolicy}
import rep.log.RepLogger
import rep.network.base.ModuleBase
import rep.network.consensus.pbft.MsgOfPBFT
import rep.network.consensus.pbft.MsgOfPBFT.{MsgPbftPrePrepare, MsgPbftReply, MsgPbftReplyOk, RequesterOfEndorsement}
import rep.network.consensus.util.BlockVerify
import rep.protos.peer._
import scala.concurrent._
object EndorsementRequest4Future {
def props(name: String): Props = Props(classOf[EndorsementRequest4Future], name)
}
class EndorsementRequest4Future(moduleName: String) extends ModuleBase(moduleName) {
import scala.concurrent.duration._
case class HashReply(hash:ByteString, reply:MPbftReply)
private var recvedReplies = scala.collection.mutable.Buffer[HashReply]()
private var recvedRepliesCount = scala.collection.mutable.HashMap[ByteString, Int]()
implicit val timeout = Timeout(TimePolicy.getTimeoutEndorse.seconds)
//private val endorsementActorName = "/user/modulemanager/endorser"
private val endorsementActorName = "/user/modulemanager/dispatchofRecvendorsement"
override def preStart(): Unit = {
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "EndorsementRequest4Future Start"))
}
private def toAkkaUrl(addr: Address, actorName: String): String = {
return addr.toString + "/" + actorName;
}
private def handler(reqinfo: RequesterOfEndorsement) = {
schedulerLink = clearSched()
try {
val selection: ActorSelection = context.actorSelection(toAkkaUrl(reqinfo.endorer, endorsementActorName));
val data = MsgPbftPrePrepare("", reqinfo.blc, reqinfo.blocker)
selection ! data
//val future1 = selection ? data
//Await.result(future1, timeout.duration).asInstanceOf[MsgPbftPrepare]
} catch {
case e: AskTimeoutException =>
case te: TimeoutException =>
}
}
//reply start-------------------------------------------
private def VerifyReply(block: Block, reply: MPbftReply): Boolean = {
val bb = reply.clearSignature.toByteArray
val signature = reply.signature.get//todo get?
val ev = BlockVerify.VerifyOneEndorseOfBlock(signature, bb, pe.getSysTag)
ev._1
}
private def ProcessMsgPbftReply(reply: MsgPbftReply){
if (VerifyReply(reply.block,reply.reply)) {
val hash = reply.block.hashOfBlock
recvedReplies += HashReply(hash, reply.reply)
var count = 1
if (recvedRepliesCount.contains(hash)) {
count = recvedRepliesCount.get(hash).get + 1
}
recvedRepliesCount.put(hash, count)
if (count >= (SystemProfile.getPbftF + 1)) {
val replies = recvedReplies.filter(_.hash == reply.block.hashOfBlock).map(f=>f.reply)
.sortWith( (left,right)=> left.signature.get.certId.toString < right.signature.get.certId.toString)
//val blockWithReplies = reply.block.withReplies(replies)
context.parent ! MsgPbftReplyOk(reply.block, replies)
recvedRepliesCount.remove(reply.block.hashOfBlock)
replies.foreach(f=> recvedReplies -= HashReply(reply.block.hashOfBlock, f))
}
}
}
//reply end-------------------------------------------
override def receive = {
case MsgPbftReply(block,reply,chainInfo) =>
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", EndorsementRequest4Future recv reply: " + ", " + block.hashOfBlock)
ProcessMsgPbftReply(MsgPbftReply(block,reply,chainInfo))
case RequesterOfEndorsement(block, blocker, addr) =>
//待请求背书的块的上一个块的hash不等于系统最新的上一个块的hash停止发送背书
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", EndorsementRequest4Future recv RequesterOfEndorsement: " + ", " + block.hashOfBlock)
if(block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash){
handler(RequesterOfEndorsement(block, blocker, addr))
}else{
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"--------endorsementRequest4Future back out endorsement,prehash not equal pe.currenthash ,height=${block.height},local height=${pe.getCurrentHeight} "))
}
case _ => //ignore
}
}

View File

@ -0,0 +1,129 @@
/*
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BA SIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rep.network.consensus.pbft.block
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import akka.pattern.AskTimeoutException
import scala.concurrent._
import akka.actor.{ActorRef, Address, Props}
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import com.google.protobuf.ByteString
import rep.app.conf.{SystemProfile, TimePolicy}
import rep.crypto.Sha256
import rep.network._
import rep.network.base.ModuleBase
import rep.protos.peer._
import rep.storage.ImpDataAccess
import rep.utils.GlobalUtils.{BlockEvent, EventType, NodeStatus}
import scala.collection.mutable
import com.sun.beans.decoder.FalseElementHandler
import scala.util.control.Breaks
import rep.utils.IdTool
import scala.util.control.Breaks._
import rep.network.consensus.util.{BlockHelp, BlockVerify}
import rep.network.util.NodeHelp
import rep.log.RepLogger
import rep.network.autotransaction.Topic
import rep.network.consensus.common.MsgOfConsensus.{PreTransBlock, PreTransBlockResult}
import rep.network.consensus.pbft.MsgOfPBFT
import rep.network.consensus.pbft.MsgOfPBFT.ConfirmedBlock
import rep.network.module.ModuleActorType
import rep.network.module.pbft.PBFTActorType
object GenesisBlocker {
def props(name: String): Props = Props(classOf[GenesisBlocker], name)
case object GenesisBlock
}
/**
* 出块模块
*
* @author shidianyue
* @version 1.0
* @since 1.0
* @param moduleName 模块名称
*/
class GenesisBlocker(moduleName: String) extends ModuleBase(moduleName) {
import context.dispatcher
import scala.concurrent.duration._
import akka.actor.ActorSelection
import scala.collection.mutable.ArrayBuffer
import rep.protos.peer.{ Transaction }
val dataaccess: ImpDataAccess = ImpDataAccess.GetDataAccess(pe.getSysTag)
implicit val timeout = Timeout(TimePolicy.getTimeoutPreload*20.seconds)
var preblock: Block = null
override def preStart(): Unit = {
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "Block module start"))
SubscribeTopic(mediator, self, selfAddr, Topic.Block, true)
}
private def ExecuteTransactionOfBlock(block: Block): Block = {
try {
//val future = pe.getActorRef(ActorType.preloaderoftransaction) ? PreTransBlock(block, "preload")
val future = pe.getActorRef(ModuleActorType.ActorType.dispatchofpreload) ? PreTransBlock(block, "preload")
val result = Await.result(future, timeout.duration).asInstanceOf[PreTransBlockResult]
if (result.result) {
result.blc
} else {
null
}
} catch {
case e: AskTimeoutException => null
}
}
override def receive = {
//创建块请求给出块人
case GenesisBlocker.GenesisBlock =>
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", pbft.block.GenesisBlock recv GenesisBlock: ")
if(dataaccess.getBlockChainInfo().height == 0 && NodeHelp.isSeedNode(pe.getSysTag) ){
if(this.preblock != null){
mediator ! Publish(Topic.Block, MsgOfPBFT.ConfirmedBlock(preblock, sender, Seq.empty))
}else{
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "Create genesis block"))
preblock = BlockHelp.CreateGenesisBlock
preblock = ExecuteTransactionOfBlock(preblock)
if (preblock != null) {
preblock = BlockHelp.AddBlockHash(preblock)
preblock = BlockHelp.AddSignToBlock(preblock, pe.getSysTag)
//sendEvent(EventType.RECEIVE_INFO, mediator, selfAddr, Topic.Block, Event.Action.BLOCK_NEW)
mediator ! Publish(Topic.Block, ConfirmedBlock(preblock, self, Seq.empty))
//getActorRef(pe.getSysTag, ActorType.PERSISTENCE_MODULE) ! BlockRestore(blc, SourceOfBlock.CONFIRMED_BLOCK, self)
}
}
}
case _ => //ignore
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BA SIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//zhj
package rep.network.consensus.pbft.endorse
import akka.actor.Props
import akka.routing._
import rep.app.conf.SystemProfile
import rep.log.RepLogger
import rep.network.base.ModuleBase
import rep.network.consensus.pbft.MsgOfPBFT.{MsgPbftCommit, MsgPbftPrePrepare, MsgPbftPrepare}
object DispatchOfRecvEndorsement {
def props(name: String): Props = Props(classOf[DispatchOfRecvEndorsement], name)
}
class DispatchOfRecvEndorsement(moduleName: String) extends ModuleBase(moduleName) {
import scala.collection.immutable._
private var router: Router = null
override def preStart(): Unit = {
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "DispatchOfRecvEndorsement Start"))
}
private def createRouter = {
if (router == null) {
var list: Array[Routee] = new Array[Routee](SystemProfile.getVoteNodeList.size())
for (i <- 0 to SystemProfile.getVoteNodeList.size() - 1) {
var ca = context.actorOf(Endorser4Future.props("endorser" + i), "endorser" + i)
context.watch(ca)
list(i) = new ActorRefRoutee(ca)
}
val rlist: IndexedSeq[Routee] = list.toIndexedSeq
router = Router(SmallestMailboxRoutingLogic(), rlist)
}
}
override def receive = {
case MsgPbftPrePrepare(senderPath,block, blocker) =>
createRouter
router.route(MsgPbftPrePrepare(senderPath,block, blocker), sender)
case MsgPbftPrepare(senderPath,result, block, blocker, prepare, chainInfo) =>
createRouter
router.route(MsgPbftPrepare(senderPath,result, block, blocker, prepare, chainInfo), sender)
case MsgPbftCommit(senderPath,block,blocker,commit,chainInfo) =>
createRouter
router.route(MsgPbftCommit(senderPath,block,blocker,commit,chainInfo), sender)
case _ => //ignore
}
}

View File

@ -0,0 +1,230 @@
/*
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BA SIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//zhj
package rep.network.consensus.pbft.endorse
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
import rep.app.conf.{SystemCertList, SystemProfile, TimePolicy}
import rep.network.base.ModuleBase
import rep.network.consensus.common.MsgOfConsensus.{PreTransBlock, PreTransBlockResult}
import rep.network.consensus.pbft.MsgOfPBFT.{MsgPbftCommit, MsgPbftPrePrepare, MsgPbftPrepare, ResultFlagOfEndorse}
import rep.network.module.ModuleActorType
import rep.network.module.pbft.PBFTActorType
import rep.network.util.NodeHelp
//import rep.network.consensus.vote.Voter.VoteOfBlocker
import rep.log.RepLogger
import rep.network.consensus.util.BlockVerify
import rep.network.sync.SyncMsg.StartSync
import scala.util.control.Breaks._
object Endorser4Future {
def props(name: String): Props = Props(classOf[Endorser4Future], name)
}
class Endorser4Future(moduleName: String) extends ModuleBase(moduleName) {
import context.dispatcher
import rep.protos.peer._
import rep.storage.ImpDataAccess
import scala.concurrent._
import scala.concurrent.duration._
implicit val timeout = Timeout(TimePolicy.getTimeoutPreload.seconds)
override def preStart(): Unit = {
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("Endorser4Future Start"))
}
//preprepare start-------------------------------------------
private def AskPreloadTransactionOfBlock(block: Block): Future[Boolean] =
pe.getActorRef(ModuleActorType.ActorType.dispatchofpreload).ask(PreTransBlock(block, "endors"))(timeout).
mapTo[PreTransBlockResult].flatMap(f => {
val result = Promise[Boolean]
var tmpblock = f.blc.withHashOfBlock(block.hashOfBlock)
if (BlockVerify.VerifyHashOfBlock(tmpblock)) {
result.success(true)
} else {
result.success(false)
}
result.future
}).recover({
case e: Throwable =>
RepLogger.error(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"${pe.getSysTag}:entry AskPreloadTransactionOfBlock error"))
false
})
private def checkRepeatOfTrans(trans: Seq[Transaction]): Future[Boolean] = Future {
var isRepeat: Boolean = false
val aliaslist = trans.distinct
if (aliaslist.size != trans.size) {
isRepeat = true
} else {
val sr: ImpDataAccess = ImpDataAccess.GetDataAccess(pe.getSysTag)
breakable(
trans.foreach(f => {
if (sr.isExistTrans4Txid(f.id)) {
isRepeat = true
break
}
}))
}
isRepeat
}
private def asyncVerifyTransaction(t: Transaction): Future[Boolean] = Future {
var result = false
if (pe.getTransPoolMgr.findTrans(t.id)) {
result = true
} else {
val tmp = BlockVerify.VerifyOneSignOfTrans(t, pe.getSysTag)
if (tmp._1) {
result = true
}
}
result
}
private def asyncVerifyTransactions(block: Block): Future[Boolean] = Future {
var result = true
val listOfFuture: Seq[Future[Boolean]] = block.transactions.map(x => {
asyncVerifyTransaction(x)
})
val futureOfList: Future[List[Boolean]] = Future.sequence(listOfFuture.toList)
futureOfList.map(x => {
x.foreach(f => {
if (f) {
result = false
}
})
})
result
}
private def checkEndorseSign(block: Block): Future[Boolean] = Future {
var result = false
val r = BlockVerify.VerifyAllEndorseOfBlock(block, pe.getSysTag)
result = r._1
result
}
private def VerifyInfo(info: MsgPbftPrePrepare) = {
val transSign = asyncVerifyTransactions(info.block)
val transRepeat = checkRepeatOfTrans(info.block.transactions)
val endorseSign = checkEndorseSign(info.block)
val transExe = AskPreloadTransactionOfBlock(info.block)
val result = for {
v1 <- transSign
v2 <- transRepeat
v3 <- endorseSign
v4 <- transExe
} yield (v1 && !v2 && v3 && v4)
Await.result(result, timeout.duration).asInstanceOf[Boolean]
}
private def CheckMessage(block : Block, blocker: String):Boolean = {
var r = false
if (block.height > pe.getCurrentHeight + 1) {
pe.getActorRef(PBFTActorType.ActorType.synchrequester) ! StartSync(false)
} else {
if (NodeHelp.isCandidateNow(pe.getSysTag, SystemCertList.getSystemCertList)
//是候选节点可以背书
&& (!pe.isSynching)
&& (block.previousBlockHash.toStringUtf8 == pe.getBlocker.voteBlockHash)
&& NodeHelp.isBlocker(blocker, pe.getBlocker.blocker)) {
r = true
}
}
r
}
private def ProcessMsgPbftPrePrepare(prePrepare: MsgPbftPrePrepare): Unit = {
if (CheckMessage(prePrepare.block,prePrepare.blocker))
if (prePrepare.blocker != pe.getSysTag) {
var b = true;
if (SystemProfile.getIsVerifyOfEndorsement)
b = VerifyInfo(prePrepare)
if (b)
pe.getActorRef(PBFTActorType.ActorType.pbftpreprepare) ! prePrepare
else
sender ! MsgPbftPrepare("",ResultFlagOfEndorse.VerifyError, null, null,null, pe.getSystemCurrentChainStatus)
}
}
//preprepare end-------------------------------------------
//prepare start-------------------------------------------
private def VerifyPrepare(block: Block, prepare: MPbftPrepare): Boolean = {
val bb = block.clearEndorsements.toByteArray
val signature = prepare.signature.get//todo get?
val ev = BlockVerify.VerifyOneEndorseOfBlock(signature, bb, pe.getSysTag)
ev._1
}
private def ProcessMsgPbftPepare(prepare:MsgPbftPrepare){
if (CheckMessage(prepare.block,prepare.blocker))
if (prepare.result == ResultFlagOfEndorse.success) {
if (VerifyPrepare(prepare.block, prepare.prepare)) {
pe.getActorRef(PBFTActorType.ActorType.pbftprepare) ! prepare
}
}
}
//prepare end-------------------------------------------
//commit start-------------------------------------------
private def VerifyCommit(block: Block, commit: MPbftCommit): Boolean = {
val bb = commit.clearSignature.toByteArray
val signature = commit.signature.get//todo get?
val ev = BlockVerify.VerifyOneEndorseOfBlock(signature, bb, pe.getSysTag)
ev._1
}
private def ProcessMsgPbftCommit(commit: MsgPbftCommit){
if (CheckMessage(commit.block,commit.blocker))
if (VerifyCommit(commit.block, commit.commit)) {
pe.getActorRef(PBFTActorType.ActorType.pbftcommit) ! commit
}
}
//commit end-------------------------------------------
override def receive = {
//Endorsement block
case MsgPbftPrePrepare(senderPath,block, blocker) =>
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", Endorser4Future recv preprepare: " + blocker + ", " + block.hashOfBlock)
ProcessMsgPbftPrePrepare(MsgPbftPrePrepare(sender.path.toString, block, blocker))
case MsgPbftPrepare(senderPath,result, block, blocker, prepare, chainInfo) =>
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", Endorser4Future recv prepare: " + blocker + ", " + block.hashOfBlock)
ProcessMsgPbftPepare(MsgPbftPrepare(senderPath,result, block, blocker, prepare, chainInfo))
case MsgPbftCommit(senderPath,block,blocker,commit,chainInfo) =>
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", Endorser4Future recv commit: " + blocker + ", " + block.hashOfBlock)
ProcessMsgPbftCommit(MsgPbftCommit(senderPath,block,blocker,commit,chainInfo))
case _ => //ignore
}
}

View File

@ -0,0 +1,78 @@
/**
* @created zhaohuanjun 2020-03
*/
//zhj
package rep.network.consensus.pbft.endorse
import akka.actor.Props
import akka.util.Timeout
import com.google.protobuf.ByteString
import com.google.protobuf.timestamp.Timestamp
import rep.app.conf.{SystemProfile, TimePolicy}
import rep.crypto.cert.SignTool
import rep.log.RepLogger
import rep.network.base.ModuleBase
import rep.network.consensus.pbft.MsgOfPBFT.{MsgPbftCommit, MsgPbftReply}
import rep.utils.{IdTool, TimeUtils}
case object PbftCommit {
def props(name: String): Props = Props(classOf[PbftCommit], name)
}
class PbftCommit(moduleName: String) extends ModuleBase(moduleName) {
import rep.protos.peer._
import scala.concurrent.duration._
case class HashCommit(hash:ByteString, commit:MPbftCommit)
private var recvedCommits = scala.collection.mutable.Buffer[HashCommit]()
private var recvedCommitsCount = scala.collection.mutable.HashMap[ByteString, Int]()
implicit val timeout = Timeout(TimePolicy.getTimeoutPreload.seconds)
override def preStart(): Unit = {
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("PbftCommit Start"))
}
private def ProcessMsgPbftCommit(commit: MsgPbftCommit){
val commits = recvedCommits.filter(_.hash == commit.block.hashOfBlock).map(f=>f.commit)
.sortWith( (left,right)=> left.signature.get.certId.toString < right.signature.get.certId.toString)
val bytes = MPbftReply().withCommits(commits).toByteArray
//commits.map(f=>f.toByteString).reduce((a,f)=>a.concat(f))
val certId = IdTool.getCertIdFromName(pe.getSysTag)
val millis = TimeUtils.getCurrentTime()
val sig = Signature(Option(certId),Option(Timestamp(millis / 1000, ((millis % 1000) * 1000000).toInt)),
ByteString.copyFrom(SignTool.sign(pe.getSysTag, bytes)))
var reply : MPbftReply = MPbftReply()
.withCommits(commits)
.withSignature(sig)
val actor = context.actorSelection(commit.senderPath)
actor ! MsgPbftReply(commit.block,reply,pe.getSystemCurrentChainStatus)
recvedCommitsCount.remove(commit.block.hashOfBlock)
commits.foreach(f=> recvedCommits -= HashCommit(commit.block.hashOfBlock, f))
}
override def receive = {
case MsgPbftCommit(senderPath,block,blocker,commit,chainInfo) =>
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", PbftCommit recv commit: " + blocker + ", " + block.hashOfBlock)
//already verified
val hash = block.hashOfBlock
recvedCommits += HashCommit(hash, commit)
var count = 1
if (recvedCommitsCount.contains(hash)) {
count = recvedCommitsCount.get(hash).get +1
}
recvedCommitsCount.put(hash,count)
if ( count >= (2*SystemProfile.getPbftF+1))
ProcessMsgPbftCommit(MsgPbftCommit(senderPath,block,blocker,commit,chainInfo))
case _ => //ignore
}
}

View File

@ -0,0 +1,63 @@
/**
* @created zhaohuanjun 2020-03
*/
package rep.network.consensus.pbft.endorse
/*
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BA SIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import akka.actor.{ActorSelection, Props}
import akka.util.Timeout
import rep.app.conf.TimePolicy
import rep.network.base.ModuleBase
import rep.log.RepLogger
import rep.network.consensus.pbft.MsgOfPBFT.{MsgPbftPrePrepare, MsgPbftPrepare, ResultFlagOfEndorse}
import rep.network.consensus.util.BlockHelp
case object PbftPrePrepare {
def props(name: String): Props = Props(classOf[PbftPrePrepare], name)
}
class PbftPrePrepare(moduleName: String) extends ModuleBase(moduleName) {
import rep.protos.peer._
import scala.concurrent.duration._
implicit val timeout = Timeout(TimePolicy.getTimeoutPreload.seconds)
override def preStart(): Unit = {
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("PbftPrePrepare Start"))
}
private def ProcessMsgPbftPrePrepare(prePrepare: MsgPbftPrePrepare): Unit = {
pe.getNodeMgr.getStableNodes.foreach(f => {
val actorPath = f.toString + "/user/modulemanager/dispatchofRecvendorsement"
val actor : ActorSelection = context.actorSelection(actorPath)
val prepare : MPbftPrepare = MPbftPrepare().withSignature(BlockHelp.SignBlock(prePrepare.block, pe.getSysTag))
actor ! MsgPbftPrepare(prePrepare.senderPath, ResultFlagOfEndorse.success, prePrepare.block, prePrepare.blocker,prepare, pe.getSystemCurrentChainStatus)
})
}
override def receive = {
case MsgPbftPrePrepare(senderPath,block, blocker) =>
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", PbftPrePrepare recv preprepare: " + blocker + ", " + block.hashOfBlock)
ProcessMsgPbftPrePrepare(MsgPbftPrePrepare(senderPath, block, blocker))
case _ => //ignore
}
}

View File

@ -0,0 +1,94 @@
/**
* @created zhaohuanjun 2020-03
*/
//zhj
package rep.network.consensus.pbft.endorse
/*
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BA SIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import akka.actor.{ActorSelection, Props}
import akka.util.Timeout
import com.google.protobuf.ByteString
import com.google.protobuf.timestamp.Timestamp
import rep.app.conf.{SystemProfile, TimePolicy}
import rep.crypto.cert.SignTool
import rep.log.RepLogger
import rep.network.base.ModuleBase
import rep.network.consensus.pbft.MsgOfPBFT.{MsgPbftCommit, MsgPbftPrepare}
import rep.utils.{IdTool, TimeUtils}
case object PbftPrepare {
def props(name: String): Props = Props(classOf[PbftPrepare], name)
}
class PbftPrepare(moduleName: String) extends ModuleBase(moduleName) {
import rep.protos.peer._
import scala.concurrent.duration._
case class HashPrepare(hash:ByteString, prepare:MPbftPrepare)
private var recvedPrepares = scala.collection.mutable.Buffer[HashPrepare]()
private val recvedPreparesCount = scala.collection.mutable.HashMap[ByteString, Int]()
implicit val timeout = Timeout(TimePolicy.getTimeoutPreload.seconds)
override def preStart(): Unit = {
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("PbftPrepare Start"))
}
private def ProcessMsgPbftPepare(prepare:MsgPbftPrepare) = {
val prepares = recvedPrepares.filter(_.hash == prepare.block.hashOfBlock).map(f=>f.prepare)
.sortWith( (left,right)=> left.signature.get.certId.toString < right.signature.get.certId.toString)
val bytes = MPbftCommit().withPrepares(prepares).toByteArray//prepares.reduce((a,f)=>a.toByteString.concat(f.toByteString))
val certId = IdTool.getCertIdFromName(pe.getSysTag)
val millis = TimeUtils.getCurrentTime()
val sig = Signature(Option(certId),Option(Timestamp(millis / 1000, ((millis % 1000) * 1000000).toInt)),
ByteString.copyFrom(SignTool.sign(pe.getSysTag, bytes)))
var commit : MPbftCommit = MPbftCommit()
.withPrepares(prepares)
.withSignature(sig)
pe.getNodeMgr.getStableNodes.foreach(f => {
val actorPath = f.toString + "/user/modulemanager/dispatchofRecvendorsement"
val actor : ActorSelection = context.actorSelection(actorPath)
actor ! MsgPbftCommit(prepare.senderPath,prepare.block,prepare.blocker,commit,pe.getSystemCurrentChainStatus)
})
recvedPreparesCount.remove(prepare.block.hashOfBlock)
prepares.foreach(f=> recvedPrepares -= HashPrepare(prepare.block.hashOfBlock, f))
}
override def receive = {
case MsgPbftPrepare(senderPath,result, block, blocker, prepare, chainInfo) =>
//already verified
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", PbftPrepare recv prepare: " + blocker + ", " + block.hashOfBlock)
val hash = block.hashOfBlock
recvedPrepares += HashPrepare(hash, prepare)
var count = 1
if (recvedPreparesCount.contains(hash)) {
count = recvedPreparesCount.get(hash).get +1
}
recvedPreparesCount.put(hash,count)
if ( count >= 2*SystemProfile.getPbftF)
ProcessMsgPbftPepare(MsgPbftPrepare(senderPath,result, block, blocker, prepare, chainInfo))
case _ => //ignore
}
}

View File

@ -0,0 +1,93 @@
//zhj
package rep.network.consensus.pbft.vote
import akka.actor.Props
import rep.app.conf.{SystemCertList, TimePolicy}
import rep.log.RepLogger
import rep.network.consensus.pbft.MsgOfPBFT.{CreateBlock, VoteOfBlocker, VoteOfForce}
import rep.network.consensus.common.algorithm.{IRandomAlgorithmOfVote, ISequencialAlgorithmOfVote}
import rep.network.consensus.common.vote.IVoter
import rep.network.module.pbft.PBFTActorType
import rep.network.util.NodeHelp
object VoterOfPBFT{
def props(name: String): Props = Props(classOf[VoterOfPBFT],name)
}
class VoterOfPBFT(moduleName: String) extends IVoter(moduleName: String) {
import context.dispatcher
import scala.concurrent.duration._
this.algorithmInVoted = new ISequencialAlgorithmOfVote
override def preStart(): Unit = {
RepLogger.info(RepLogger.Vote_Logger, this.getLogMsgPrefix("VoterOfPBFT module start"))
}
override protected def NoticeBlockerMsg: Unit = {
if (this.Blocker.blocker.equals(pe.getSysTag)) {
//发送建立新块的消息
pe.getActorRef(PBFTActorType.ActorType.blocker) ! CreateBlock
}
}
override protected def DelayVote: Unit = {
this.voteCount += 1
var time = this.voteCount * TimePolicy.getVoteRetryDelay
schedulerLink = clearSched()
schedulerLink = scheduler.scheduleOnce(TimePolicy.getVoteRetryDelay.millis, self, VoteOfBlocker("voter"))
}
override protected def vote(isForce: Boolean): Unit = {
if (checkTranNum || isForce) {
val currentblockhash = pe.getCurrentBlockHash
val currentheight = pe.getCurrentHeight
if (this.Blocker.voteBlockHash == "") {
this.cleanVoteInfo
this.resetCandidator(currentblockhash)
this.resetBlocker(0, currentblockhash, currentheight)
RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag},first voter,blocker=${this.Blocker.blocker},voteidx=${this.Blocker.VoteIndex}" + "~" + selfAddr))
} else {
if (!this.Blocker.voteBlockHash.equals(currentblockhash)) {
//抽签的基础块已经变化需要重续选择候选人
this.cleanVoteInfo
this.resetCandidator(currentblockhash)
this.resetBlocker(0, currentblockhash, currentheight)
RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag},hash change,reset voter,height=${currentheight},hash=${currentblockhash},blocker=${this.Blocker.blocker},voteidx=${this.Blocker.VoteIndex}" + "~" + selfAddr))
} else {
if (this.Blocker.blocker == "") {
this.cleanVoteInfo
this.resetCandidator(currentblockhash)
this.resetBlocker(0, currentblockhash, currentheight)
RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag},blocker=null,reset voter,height=${currentheight},blocker=${this.Blocker.blocker},voteidx=${this.Blocker.VoteIndex}" + "~" + selfAddr))
} else {
if (((System.currentTimeMillis() - this.Blocker.voteTime) / 1000 > TimePolicy.getTimeOutBlock)
||(!pe.getNodeMgr.getStableNodeNames.contains(this.Blocker.blocker))) { //zhj
//说明出块超时
this.voteCount = 0
this.resetBlocker(this.Blocker.VoteIndex + 1, currentblockhash, currentheight)
RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag},block timeout,reset voter,height=${currentheight},blocker=${this.Blocker.blocker},voteidx=${this.Blocker.VoteIndex}" + "~" + selfAddr))
} else {
NoticeBlockerMsg
}
}
}
}
} else {
RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag},transaction is not enough,waiting transaction,height=${pe.getCurrentHeight}" + "~" + selfAddr))
}
}
override def receive: Receive = {
case VoteOfBlocker(flag:String) =>
////RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", Voter recv VoteOfBlocker: " + flag)
if (NodeHelp.isCandidateNow(pe.getSysTag, SystemCertList.getSystemCertList)) {
voteMsgHandler(false)
}
case VoteOfForce=>
//RepLogger.print(RepLogger.zLogger,pe.getSysTag + ", Voter recv VoteOfForce: ")
voteMsgHandler(true)
}
}

View File

@ -0,0 +1,59 @@
//zhj
package rep.network.module.pbft
import akka.actor.Props
import rep.log.RepLogger
import rep.network.cache.pbft.TransactionPoolOfPBFT
import rep.network.confirmblock.common.ConfirmOfBlock
import rep.network.confirmblock.pbft.ConfirmOfBlockOfPBFT
import rep.network.consensus.pbft.block.{BlockerOfPBFT, EndorseCollector}
import rep.network.module.{IModuleManager, ModuleActorType}
import rep.network.sync.response.SynchronizeResponser
import rep.network.consensus.pbft.endorse.{PbftCommit, PbftPrePrepare, PbftPrepare}
import rep.network.consensus.pbft.MsgOfPBFT.VoteOfBlocker
import rep.network.consensus.pbft.endorse.DispatchOfRecvEndorsement
import rep.network.persistence.StoragerOfPBFT
import rep.network.consensus.pbft.vote
import rep.network.consensus.pbft.vote.VoterOfPBFT
import rep.network.sync.request.pbft.SynchRequesterOfPBFT
/**
* Created by zhaohuanjun on 2020/03/30.
* PBFT的模块管理类继承公共的模块管理类实现PBFT的自己的模块
*/
object ModuleManagerOfPBFT{
def props(name: String, sysTag: String, enableStatistic: Boolean, enableWebSocket: Boolean, isStartup: Boolean): Props = Props(classOf[ModuleManagerOfPBFT], name, sysTag, enableStatistic: Boolean, enableWebSocket: Boolean, isStartup: Boolean)
}
class ModuleManagerOfPBFT(moduleName: String, sysTag: String, enableStatistic: Boolean, enableWebSocket: Boolean, isStartup: Boolean) extends IModuleManager(moduleName,sysTag, enableStatistic, enableWebSocket, isStartup){
override def preStart(): Unit = {
RepLogger.info(RepLogger.System_Logger, this.getLogMsgPrefix( "ModuleManagerOfPBFT Start"))
}
override def startupConsensus: Unit = {
pe.getActorRef(PBFTActorType.ActorType.voter) ! VoteOfBlocker("startup")
}
override def loadConsensusModule = {
pe.register(ModuleActorType.ActorType.transactionpool, context.actorOf(TransactionPoolOfPBFT.props("transactionpool"), "transactionpool"))//zhj
pe.register(ModuleActorType.ActorType.storager,context.actorOf(StoragerOfPBFT.props("storager"), "storager"))
pe.register(PBFTActorType.ActorType.blocker,context.actorOf(BlockerOfPBFT.props("blocker"), "blocker"))
pe.register(PBFTActorType.ActorType.confirmerofblock,context.actorOf(ConfirmOfBlockOfPBFT.props("confirmerofblock"), "confirmerofblock"))
pe.register(PBFTActorType.ActorType.endorsementcollectioner,context.actorOf(EndorseCollector.props("endorsementcollectioner"), "endorsementcollectioner"))
pe.register(PBFTActorType.ActorType.dispatchofRecvendorsement,context.actorOf(DispatchOfRecvEndorsement.props("dispatchofRecvendorsement"), "dispatchofRecvendorsement"))
pe.register(PBFTActorType.ActorType.voter,context.actorOf(VoterOfPBFT.props("voter"), "voter"))
pe.register(PBFTActorType.ActorType.synchrequester,context.actorOf(SynchRequesterOfPBFT.props("synchrequester"), "synchrequester"))
pe.register(PBFTActorType.ActorType.synchresponser,context.actorOf(SynchronizeResponser.props("synchresponser"), "synchresponser"))
pe.register(PBFTActorType.ActorType.pbftpreprepare, context.actorOf(PbftPrePrepare.props("pbftpreprepare"), "pbftpreprepare"))
pe.register(PBFTActorType.ActorType.pbftprepare, context.actorOf(PbftPrepare.props("pbftprepare"), "pbftprepare"))
pe.register(PBFTActorType.ActorType.pbftcommit, context.actorOf(PbftCommit.props("pbftcommit"), "pbftcommit"))
}
}

View File

@ -0,0 +1,28 @@
//zhj
package rep.network.module.pbft
/**
* Created by jiangbuyun on 2020/03/15.
* CFRD管理的actor
*/
object PBFTActorType {
//cfrd共识模式的actor类型的注册关键字以30开头
case object ActorType{
val blocker = 201
val voter = 202
val endorsementcollectioner = 203
val confirmerofblock = 204
val dispatchofRecvendorsement = 205
val gensisblock = 206
val synchrequester = 207
val synchresponser = 208
val pbftpreprepare = 209
val pbftprepare = 210
val pbftcommit = 211
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BA SIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rep.network.persistence
import akka.actor.Props
import rep.log.RepLogger
import rep.network.consensus.pbft.MsgOfPBFT.VoteOfBlocker
import rep.network.module.pbft.PBFTActorType
object StoragerOfPBFT {
def props(name: String): Props = Props(classOf[StoragerOfPBFT], name)
}
class StoragerOfPBFT(moduleName: String) extends IStorager (moduleName: String) {
override def preStart(): Unit = {
RepLogger.info(RepLogger.Storager_Logger, this.getLogMsgPrefix( "Storager Start"))
}
override protected def sendVoteMessage: Unit = {
pe.getActorRef( PBFTActorType.ActorType.voter) ! VoteOfBlocker("persistence")
}
}

View File

@ -0,0 +1,67 @@
package rep.network.sync.parser.pbft
import rep.log.RepLogger
import rep.network.sync.SyncMsg.AnalysisResult
import rep.network.sync.SyncMsg
import rep.network.sync.parser.ISynchAnalyzer
import rep.network.tools.NodeMgr
import rep.network.util.NodeHelp
import rep.protos.peer.BlockchainInfo
/**
* Created by jiangbuyun on 2020/03/18.
* 基于PBF共识协议的同步区块信息分析的实现类
*/
class IPBFTOfSynchAnalyzer(systemName: String, lchaininfo: BlockchainInfo, nodemgr: NodeMgr) extends ISynchAnalyzer( systemName, lchaininfo, nodemgr) {
override def Parser(reslist: List[SyncMsg.ResponseInfo], isStartupSynch: Boolean): Unit = {
val lh = lchaininfo.height
val nodes = nodemgr.getStableNodes
if (NodeHelp.ConsensusConditionChecked(reslist.length, nodes.size)) {
//获取到到chaininfo信息的数量得到大多数节点的响应进一步判断同步的策略
//获取返回的chaininfo信息中大多数节点的相同高度的最大值
val heightStatis = reslist.groupBy(x => x.response.height).map(x => (x._1, x._2.length)).toList.sortBy(x => -x._2)
val maxheight = heightStatis.head._1
var nodesOfmaxHeight = heightStatis.head._2
RepLogger.debug(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"--------Info,获取同步的返回信息,结果=${reslist.mkString("|")}"))
RepLogger.info(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"--------Info,获取同步的返回信息,统计结果=${heightStatis.mkString("|")}"))
if (NodeHelp.ConsensusConditionChecked(nodesOfmaxHeight, nodes.size)) {
//得到了真正大多数节点相同的高度
val agreementResult = checkHashAgreement(maxheight, reslist, nodes.size, 1)
if (agreementResult._1) {
//当前同步高度的最后高度的块hash一致
if (maxheight > lh) {
this.greaterThanLocalHeight(reslist, maxheight, agreementResult._2)
} else if (maxheight == lh) {
this.equalLocalHeight(reslist, maxheight, agreementResult._2)
} else {
if (isStartupSynch) {
this.lessThanLocalHeight(maxheight, agreementResult._2)
} else {
this.aresult = AnalysisResult(1, "")
}
RepLogger.error(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix("--------info,本地高度大于远端高度,停止同步"))
}
} else {
//当前同步高度的最后高度的块hash不一致输出错误信息停止同步
this.aresult = AnalysisResult(0, "当前同步高度的最后高度的块hash不一致输出错误信息停止同步")
RepLogger.error(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix("--------error,当前同步高度的最后高度的块hash不一致输出错误信息停止同步"))
}
} else {
//最多数量的高度达不到共识的要求输出错误信息停止同步
this.aresult = AnalysisResult(2, s"最多数量的高度,达不到共识的要求,输出错误信息停止同步 response size=${reslist.size}")
RepLogger.error(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"--------error,最多数量的高度,达不到共识的要求,输出错误信息停止同步 response size=${reslist.size}"))
}
} else {
//获取到到chaininfo信息的数量没有得到大多数节点的响应输出错误信息停止同步
this.aresult = AnalysisResult(0, s"获取到到chaininfo信息的数量没有得到大多数节点的响应输出错误信息停止同步 response size=${reslist.size}")
RepLogger.error(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"--------error,获取到到chaininfo信息的数量没有得到大多数节点的响应输出错误信息停止同步 response size=${reslist.size}"))
}
}
}

View File

@ -0,0 +1,76 @@
package rep.network.sync.request.pbft
import akka.actor.Props
import rep.app.conf.SystemProfile
import rep.log.RepLogger
import rep.network.module.{IModuleManager, ModuleActorType}
import rep.network.sync.SyncMsg.{MaxBlockInfo, StartSync, SyncRequestOfStorager}
import rep.network.sync.parser.ISynchAnalyzer
import rep.network.sync.request.ISynchRequester
import rep.network.sync.parser.pbft.IPBFTOfSynchAnalyzer
import rep.network.util.NodeHelp
import scala.concurrent.duration._
/**
* Created by zhaohuanjun on 2020/03/30.
* 基于PBFT共识协议的同步actor的实现类
*/
object SynchRequesterOfPBFT{
def props(name: String): Props = Props(classOf[SynchRequesterOfPBFT], name)
}
class SynchRequesterOfPBFT(moduleName: String) extends ISynchRequester(moduleName: String) {
import context.dispatcher
override protected def getAnalyzerInSynch: ISynchAnalyzer = {
new IPBFTOfSynchAnalyzer(pe.getSysTag, pe.getSystemCurrentChainStatus, pe.getNodeMgr)
}
override def receive: Receive = {
case StartSync(isNoticeModuleMgr: Boolean) =>
schedulerLink = clearSched()
var rb = true
initSystemChainInfo
if (pe.getNodeMgr.getStableNodes.size >= SystemProfile.getVoteNoteMin && !pe.isSynching) {
pe.setSynching(true)
try {
rb = Handler(isNoticeModuleMgr)
} catch {
case e: Exception =>
rb = false
RepLogger.trace(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"request synch excep,msg=${e.getMessage}"))
}
pe.setSynching(false)
if (rb) {
if (isNoticeModuleMgr)
pe.getActorRef(ModuleActorType.ActorType.modulemanager) ! IModuleManager.startup_Consensus
} else {
schedulerLink = scheduler.scheduleOnce(1.second, self, StartSync(isNoticeModuleMgr))
}
} else {
RepLogger.trace(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"too few node,min=${SystemProfile.getVoteNoteMin} or synching from actorAddr" + "" + NodeHelp.getNodePath(sender())))
}
case SyncRequestOfStorager(responser, maxHeight) =>
RepLogger.trace(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"entry blockdata synch,maxheight=${maxHeight},responser=${responser}"))
if (!pe.isSynching) {
pe.setSynching(true)
RepLogger.trace(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"start blockdata synch,currentheight=${pe.getCurrentHeight},maxheight=${maxHeight}"))
try {
getBlockDatas(pe.getCurrentHeight, maxHeight, responser)
} catch {
case e: Exception =>
pe.setSynching(false)
}
RepLogger.trace(RepLogger.BlockSyncher_Logger, this.getLogMsgPrefix(s"stop blockdata synch,maxheight=${maxHeight}"))
pe.setSynching(false)
}
}
override protected def setStartVoteInfo(maxblockinfo:MaxBlockInfo): Unit = {}
}

View File

@ -64,6 +64,11 @@ class NodeMgr {
}
}
//zhj add
def getStableNodeNames: Set[String] = {
stableNodes.values.toSet
}
def getStableNodes: Set[Address] = {
stableNodes.keys.toSet
}

View File

@ -55,15 +55,19 @@ object NodeHelp {
}
def ConsensusConditionChecked(inputNumber: Int, nodeNumber: Int): Boolean = {
var scaledata = SystemProfile.getNumberOfEndorsement
if(SystemProfile.getNumberOfEndorsement == 1){
scaledata = 2
}
if(scaledata == 2){
(inputNumber - 1) >= Math.floor(((nodeNumber) * 1.0) / scaledata)
}else{
(inputNumber - 1) >= Math.floor((((nodeNumber) * 1.0) / scaledata)*2)
if (SystemProfile.getTypeOfConsensus == "PBFT") { //zhj
true
} else {
var scaledata = SystemProfile.getNumberOfEndorsement
if (SystemProfile.getNumberOfEndorsement == 1) {
scaledata = 2
}
if (scaledata == 2) {
(inputNumber - 1) >= Math.floor(((nodeNumber) * 1.0) / scaledata)
} else {
(inputNumber - 1) >= Math.floor((((nodeNumber) * 1.0) / scaledata) * 2)
}
}
}