解决出块中间停顿,不能连续出块的问题。

This commit is contained in:
wuwei1972 2019-05-16 16:59:25 +08:00
parent c0a3dabca2
commit a971b1ccd9
16 changed files with 273 additions and 154 deletions

View File

@ -76,6 +76,7 @@ abstract class ModuleBase(name: String) extends Actor with ClusterActor with B
case "gensisblock" => 17
case "api" => 18
case "transactiondispatcher" => 19
case "dispatchofRecvendorsement" => 20
case _ => 0
}
}

View File

@ -144,7 +144,8 @@ class Blocker(moduleName: String) extends ModuleBase(moduleName) {
//todo 交易排序
if (trans.size >= SystemProfile.getMinBlockTransNum) {
RepTimeTracer.setEndTime(pe.getSysTag, "collectTransToBlock", System.currentTimeMillis(),pe.getCurrentHeight + 1,trans.size)
var blc = BlockHelp.WaitingForExecutionOfBlock(pe.getCurrentBlockHash, pe.getCurrentHeight + 1, trans)
//此处建立新块必须采用抽签模块的抽签结果来进行出块否则出现刚抽完签马上有新块的存储完成就会出现错误
var blc = BlockHelp.WaitingForExecutionOfBlock(pe.getBlocker.voteBlockHash, pe.getBlocker.VoteHeight + 1, trans)
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,height=${blc.height},local height=${pe.getCurrentHeight}" + "~" + selfAddr))
RepTimeTracer.setStartTime(pe.getSysTag, "PreloadTrans", System.currentTimeMillis(),blc.height,blc.transactions.size)
blc = ExecuteTransactionOfBlock(blc)
@ -185,30 +186,18 @@ class Blocker(moduleName: String) extends ModuleBase(moduleName) {
sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Block, Event.Action.CANDIDATOR)
//是出块节点
if (preblock == null) {
if (preblock == null || (preblock.previousBlockHash.toStringUtf8() != pe.getBlocker.voteBlockHash)) {
CreateBlockHandler
} else {
if (preblock.previousBlockHash.toStringUtf8() != pe.getBlocker.voteBlockHash) {
//上一个块已经变化需要重新出块
CreateBlockHandler
}
}
} else {
//出块标识错误,暂时不用做任何处理
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,do not blocker or blocker hash not equal current hash,height=${this.preblock.height}" + "~" + selfAddr))
}
} else {
//节点状态不对
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,node status error,status is synching,height=${this.preblock.height}" + "~" + selfAddr))
}
//出块超时
/*case Blocker.EndorseOfBlockTimeOut =>
schedulerLink = clearSched()
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,send endorse collector,height=${this.preblock.height},local height=${pe.getCurrentHeight}" + "~" + selfAddr))
pe.getActorRef(ActorType.endorsementcollectioner) ! EndorseMsg.CollectEndorsement(this.preblock, pe.getBlocker.blocker)
schedulerLink = scheduler.scheduleOnce(TimePolicy.getTimeoutEndorse seconds, self, Blocker.EndorseOfBlockTimeOut)*/
case _ => //ignore
}

View File

@ -34,7 +34,7 @@ import scala.util.control.Breaks._
import scala.util.control.Exception.Finally
import java.util.concurrent.ConcurrentHashMap
import rep.network.consensus.block.Blocker.{ ConfirmedBlock }
import rep.network.persistence.Storager.{ BlockRestore, SourceOfBlock }
import rep.network.persistence.Storager.{ BlockRestore, SourceOfBlock ,BatchStore}
import rep.network.consensus.util.{ BlockVerify, BlockHelp }
import rep.log.RepLogger
import rep.log.RepTimeTracer
@ -102,8 +102,10 @@ class ConfirmOfBlock(moduleName: String) extends ModuleBase(moduleName) {
if (BlockVerify.VerifyEndorserSorted(block.endorsements.toArray[Signature]) == 1 || (block.height==1 && pe.getCurrentBlockHash == "" && block.previousBlockHash.isEmpty())) {
//背书信息排序正确
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "confirm verify endorsement sort"))
pe.getBlockCacheMgr.addToCache(BlockRestore(block, SourceOfBlock.CONFIRMED_BLOCK, actRefOfBlock))
pe.getActorRef(ActorType.storager) ! BatchStore
sendEvent(EventType.RECEIVE_INFO, mediator, pe.getSysTag, Topic.Block, Event.Action.BLOCK_NEW)
pe.getActorRef(ActorType.storager) ! BlockRestore(block, SourceOfBlock.CONFIRMED_BLOCK, actRefOfBlock)
//pe.getActorRef(ActorType.storager) ! BlockRestore(block, SourceOfBlock.CONFIRMED_BLOCK, actRefOfBlock)
} else {
////背书信息排序错误
}

View File

@ -21,7 +21,7 @@ import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.routing._;
import rep.app.conf.{ SystemProfile, TimePolicy }
import rep.network.base.ModuleBase
import rep.network.consensus.endorse.EndorseMsg.{ RequesterOfEndorsement, ResultOfEndorseRequester, CollectEndorsement }
import rep.network.consensus.endorse.EndorseMsg.{ RequesterOfEndorsement, ResultOfEndorseRequester, CollectEndorsement,ResendEndorseInfo }
import rep.network.consensus.block.Blocker.ConfirmedBlock
import rep.network.tools.PeerExtension
import rep.network.Topic
@ -38,7 +38,6 @@ import rep.log.RepLogger
import rep.log.RepTimeTracer
object EndorseCollector {
case object ResendEndorseInfo
def props(name: String): Props = Props(classOf[EndorseCollector], name)
}
@ -58,10 +57,8 @@ class EndorseCollector(moduleName: String) extends ModuleBase(moduleName) {
private def createRouter = {
if (router == null) {
var list: Array[Routee] = new Array[Routee](SystemProfile.getVoteNodeList.size())
for (i <- 0 to SystemProfile.getVoteNodeList.size() - 1) {
//EndorsementRequest4Future
//var ca = context.actorOf(EnodorsementRequester.props("endorsementrequester" + i), "endorsementrequester" + i)
var list: Array[Routee] = new Array[Routee](SystemProfile.getVoteNodeList.size()*3)
for (i <- 0 to SystemProfile.getVoteNodeList.size()*3 - 1) {
var ca = context.actorOf(EndorsementRequest4Future.props("endorsementrequester" + i), "endorsementrequester" + i)
context.watch(ca)
list(i) = new ActorRefRoutee(ca)
@ -75,31 +72,20 @@ class EndorseCollector(moduleName: String) extends ModuleBase(moduleName) {
this.block = block
this.blocker = blocker
this.recvedEndorse = this.recvedEndorse.empty
//schedulerLink = clearSched()
}
private def clearEndorseInfo = {
this.block = null
this.blocker = null
this.recvedEndorse = this.recvedEndorse.empty
//schedulerLink = clearSched()
}
private def resendEndorser = {
//schedulerLink = clearSched()
pe.getNodeMgr.getStableNodes.foreach(f => {
if (!recvedEndorse.contains(f.toString)) {
router.route(RequesterOfEndorsement(block, blocker, f), self)
}
})
//schedulerLink = scheduler.scheduleOnce(TimePolicy.getTimeoutEndorse seconds, self, EndorseCollector.ResendEndorseInfo)
}
private def CheckAndFinishHandler {
sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Endorsement, Event.Action.ENDORSEMENT)
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("collectioner check is finish "))
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("entry collectioner check "))
if (NodeHelp.ConsensusConditionChecked(this.recvedEndorse.size + 1, pe.getNodeMgr.getNodes.size)) {
//schedulerLink = clearSched()
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("collectioner package endorsement to block"))
this.recvedEndorse.foreach(f => {
this.block = BlockHelp.AddEndorsementToBlock(this.block, f._2)
@ -120,26 +106,24 @@ class EndorseCollector(moduleName: String) extends ModuleBase(moduleName) {
override def receive = {
case CollectEndorsement(block, blocker) =>
createRouter
if (this.block != null && this.block.hashOfBlock.toStringUtf8().equals(block.hashOfBlock.toStringUtf8())) {
if (this.block != null && this.block.hashOfBlock.toStringUtf8() == block.hashOfBlock.toStringUtf8()) {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"collectioner is waiting endorse result,height=${block.height},local height=${pe.getCurrentHeight}"))
} else {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner recv endorsement,height=${block.height},local height=${pe.getCurrentHeight}"))
resetEndorseInfo(block, blocker)
pe.getNodeMgr.getStableNodes.foreach(f => {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner send endorsement to requester,height=${block.height},local height=${pe.getCurrentHeight}"))
router.route(RequesterOfEndorsement(block, blocker, f), self)
})
//schedulerLink = scheduler.scheduleOnce(TimePolicy.getTimeoutEndorse seconds, self, EndorseCollector.ResendEndorseInfo)
if(block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash){
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner recv endorsement,height=${block.height},local height=${pe.getCurrentHeight}"))
resetEndorseInfo(block, blocker)
pe.getNodeMgr.getStableNodes.foreach(f => {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner send endorsement to requester,height=${block.height},local height=${pe.getCurrentHeight}"))
router.route(RequesterOfEndorsement(block, blocker, f), self)
})
}else{
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner back out endorsement request,height=${block.height},local height=${pe.getCurrentHeight}"))
}
}
/*case EndorseCollector.ResendEndorseInfo =>
if (this.block != null) {
logMsg(LogType.INFO, "collectioner resend endorsement")
resendEndorser
}*/
case ResultOfEndorseRequester(result, endors, blockhash, endorser) =>
if (this.block != null) {
if (this.block.hashOfBlock.toStringUtf8().equals(blockhash)) {
//block不空该块的上一个块等于最后存储的hash背书结果的块hash跟当前发出的块hash一致
if (this.block != null && this.block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash && this.block.hashOfBlock.toStringUtf8() == blockhash) {
if (result) {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner recv endorsement result,height=${block.height},local height=${pe.getCurrentHeight}"))
recvedEndorse += endorser.toString -> endors
@ -147,7 +131,18 @@ class EndorseCollector(moduleName: String) extends ModuleBase(moduleName) {
} else {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"collectioner recv endorsement result,is error,height=${block.height},local height=${pe.getCurrentHeight}"))
}
}else{
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner back out endorsement result,local height=${pe.getCurrentHeight}"))
}
case ResendEndorseInfo(endorer)=>
if (this.block != null && this.block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash ) {
if(this.router != null){
router.route(RequesterOfEndorsement(this.block, this.blocker, endorer), self)
}else{
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner's router is null,height=${block.height},local height=${pe.getCurrentHeight}"))
}
}else{
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner back out resend endorsement request,local height=${pe.getCurrentHeight}"))
}
case _ => //ignore
}

View File

@ -27,7 +27,7 @@ import com.google.protobuf.ByteString
import com.google.protobuf.timestamp.Timestamp
import rep.app.conf.{ SystemProfile, TimePolicy }
import rep.network.base.ModuleBase
import rep.network.consensus.endorse.EndorseMsg.{ EndorsementInfo, ResultOfEndorsed, RequesterOfEndorsement, ResultOfEndorseRequester, ResultFlagOfEndorse }
import rep.network.consensus.endorse.EndorseMsg.{ EndorsementInfo, ResultOfEndorsed, RequesterOfEndorsement, ResultOfEndorseRequester, ResultFlagOfEndorse,ResendEndorseInfo }
import rep.network.tools.PeerExtension
import rep.network.Topic
import rep.protos.peer._
@ -38,6 +38,7 @@ import scala.util.control.Breaks
import rep.utils.GlobalUtils.{ EventType, ActorType }
import rep.network.sync.SyncMsg.StartSync
import rep.log.RepLogger
import rep.log.RepTimeTracer
object EndorsementRequest4Future {
def props(name: String): Props = Props(classOf[EndorsementRequest4Future], name)
@ -49,7 +50,8 @@ class EndorsementRequest4Future(moduleName: String) extends ModuleBase(moduleNam
import scala.concurrent.duration._
implicit val timeout = Timeout(TimePolicy.getTimeoutEndorse seconds)
private val endorsementActorName = "/user/modulemanager/endorser"
//private val endorsementActorName = "/user/modulemanager/endorser"
private val endorsementActorName = "/user/modulemanager/dispatchofRecvendorsement"
override def preStart(): Unit = {
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "EndorsementRequest4Future Start"))
@ -78,51 +80,67 @@ class EndorsementRequest4Future(moduleName: String) extends ModuleBase(moduleNam
private def EndorsementVerify(block: Block, result: ResultOfEndorsed): Boolean = {
val bb = block.clearEndorsements.toByteArray
val ev = BlockVerify.VerifyOneEndorseOfBlock(result.endor, bb, pe.getSysTag)
if (ev._1) {
true
} else {
false
}
ev._1
}
private def handler(reqinfo: RequesterOfEndorsement) = {
schedulerLink = clearSched()
val result = this.ExecuteOfEndorsement(reqinfo.endorer, EndorsementInfo(reqinfo.blc, reqinfo.blocker))
if (result != null) {
if (result.result == ResultFlagOfEndorse.success) {
if (EndorsementVerify(reqinfo.blc, result)) {
val re = ResultOfEndorseRequester(true, result.endor, result.BlockHash, reqinfo.endorer)
context.parent ! re
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"--------endorsementRequest4Future, send endorsement, height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} "))
} else {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"--------endorsementRequest4Future recv endorsement result is error, result=${result.result},height=${reqinfo.blc.height},local height=${pe.getCurrentHeight}"))
context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer)
}
} else {
if (result.result == ResultFlagOfEndorse.BlockHeightError) {
if (result.endorserOfChainInfo.height > pe.getCurrentHeight + 1) {
pe.getActorRef(ActorType.synchrequester) ! StartSync(false)
context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer)
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"--------endorsementRequest4Future recv endorsement result must synch,height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} "))
RepTimeTracer.setStartTime(pe.getSysTag, s"Endorsement-request-${moduleName}", System.currentTimeMillis(),reqinfo.blc.height,reqinfo.blc.transactions.size)
val result = this.ExecuteOfEndorsement(reqinfo.endorer, EndorsementInfo(reqinfo.blc, reqinfo.blocker))
if (result != null) {
if (result.result == ResultFlagOfEndorse.success) {
if (EndorsementVerify(reqinfo.blc, result)) {
val re = ResultOfEndorseRequester(true, result.endor, result.BlockHash, reqinfo.endorer)
context.parent ! re
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"--------endorsementRequest4Future, send endorsement, height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} "))
} else {
schedulerLink = scheduler.scheduleOnce(TimePolicy.getTimeoutEndorse seconds, self, RequesterOfEndorsement(reqinfo.blc, reqinfo.blocker, reqinfo.endorer))
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"--------endorsementRequest4Future recv endorsement result is error, result=${result.result},height=${reqinfo.blc.height},local height=${pe.getCurrentHeight}"))
context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer)
}
} else {
context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer)
if (result.result == ResultFlagOfEndorse.BlockHeightError) {
if (result.endorserOfChainInfo.height > pe.getCurrentHeight + 1) {
//todo 需要从块缓冲判断是否启动块同步
pe.getActorRef(ActorType.synchrequester) ! StartSync(false)
context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer)
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"--------endorsementRequest4Future recv endorsement result must synch,height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} "))
} else {
//if(pe.getCurrentHeight >= reqinfo.blc.height){
//schedulerLink = scheduler.scheduleOnce(30 milliseconds, self, RequesterOfEndorsement(reqinfo.blc, reqinfo.blocker, reqinfo.endorer))
// schedulerLink = scheduler.scheduleOnce(30 milliseconds, self, RequesterOfEndorsement(reqinfo.blc, reqinfo.blocker, reqinfo.endorer))
//}
/*try{
Thread.sleep(30)
context.parent ! ResendEndorseInfo(reqinfo.endorer)
}catch{
case e:Exception =>
RepLogger.error(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"--------endorsementRequest4Future sleep happen error,height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} "))
}*/
context.parent ! ResendEndorseInfo(reqinfo.endorer)
}
} else {
//context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer)
context.parent ! ResendEndorseInfo(reqinfo.endorer)
}
}
} else {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"--------endorsementRequest4Future recv endorsement result is null,height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} "))
//context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer)
context.parent ! ResendEndorseInfo(reqinfo.endorer)
}
} else {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"--------endorsementRequest4Future recv endorsement result is null,height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} "))
context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer)
}
RepTimeTracer.setEndTime(pe.getSysTag, s"Endorsement-request-${moduleName}", System.currentTimeMillis(),reqinfo.blc.height,reqinfo.blc.transactions.size)
}
override def receive = {
case RequesterOfEndorsement(block, blocker, addr) =>
handler(RequesterOfEndorsement(block, blocker, addr))
//case ResultOfEndorsed(result, endor, blockhash)=>
// handlerOfResult(ResultOfEndorsed(result, endor, blockhash))
//待请求背书的块的上一个块的hash不等于系统最新的上一个块的hash停止发送背书
if(block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash){
handler(RequesterOfEndorsement(block, blocker, addr))
}else{
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"--------endorsementRequest4Future back out endorsement,prehash not equal pe.currenthash ,height=${block.height},local height=${pe.getCurrentHeight} "))
}
case _ => //ignore
}
}

View File

@ -0,0 +1,60 @@
package rep.network.consensus.endorse
import akka.actor.{ Actor, ActorRef, Props, Address, ActorSelection }
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.routing._;
import rep.app.conf.{ SystemProfile, TimePolicy }
import rep.network.base.ModuleBase
import rep.network.tools.PeerExtension
import rep.network.Topic
import rep.protos.peer._
import rep.utils.GlobalUtils.{ EventType }
import rep.utils._
import scala.collection.mutable._
import rep.network.consensus.util.BlockVerify
import scala.util.control.Breaks
import rep.network.util.NodeHelp
import rep.network.consensus.util.BlockHelp
import rep.network.consensus.util.BlockVerify
import rep.log.RepLogger
import rep.log.RepTimeTracer
import rep.network.consensus.endorse.EndorseMsg.{ EndorsementInfo}
object DispatchOfRecvEndorsement {
def props(name: String): Props = Props(classOf[DispatchOfRecvEndorsement], name)
}
class DispatchOfRecvEndorsement(moduleName: String) extends ModuleBase(moduleName) {
import context.dispatcher
import scala.concurrent.duration._
import scala.collection.immutable._
private var router: Router = null
override def preStart(): Unit = {
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "DispatchOfRecvEndorsement Start"))
}
private def createRouter = {
if (router == null) {
var list: Array[Routee] = new Array[Routee](SystemProfile.getVoteNodeList.size())
for (i <- 0 to SystemProfile.getVoteNodeList.size() - 1) {
var ca = context.actorOf(Endorser4Future.props("endorser" + i), "endorser" + i)
context.watch(ca)
list(i) = new ActorRefRoutee(ca)
}
val rlist: IndexedSeq[Routee] = list.toIndexedSeq
router = Router(SmallestMailboxRoutingLogic(), rlist)
}
}
override def receive = {
case EndorsementInfo(block, blocker) =>
createRouter
router.route(EndorsementInfo(block, blocker), sender)
case _ => //ignore
}
}

View File

@ -33,6 +33,8 @@ object EndorseMsg {
//背书请求者消息
case class RequesterOfEndorsement(blc: Block, blocker: String, endorer: Address)
case class ResendEndorseInfo(endorer: Address)
//给背书人的背书消息
case class EndorsementInfo(blc: Block, blocker: String)

View File

@ -158,14 +158,15 @@ class Endorser4Future(moduleName: String) extends ModuleBase(moduleName) {
//if (info.blc.previousBlockHash.toStringUtf8 == pe.getCurrentBlockHash && NodeHelp.isBlocker(info.blocker, pe.getBlocker.blocker)) {
if (info.blc.previousBlockHash.toStringUtf8 == pe.getBlocker.voteBlockHash && NodeHelp.isBlocker(info.blocker, pe.getBlocker.blocker)) {
//可以进入背书
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "vote result equalallow entry endorse,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}"))
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"vote result equalallow entry endorse,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}"))
0
} else {
//todo 需要判断区块缓存再决定是否需要启动同步
if(info.blc.height > pe.getCurrentHeight+1){
pe.getActorRef(ActorType.synchrequester) ! StartSync(false)
}
//当前块hash和抽签的出块人都不一致暂时不能够进行背书可以进行缓存
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "block hash is not equal or blocker is not equal,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}"))
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"block hash is not equal or blocker is not equal,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}"))
2
}
} else {
@ -199,8 +200,6 @@ class Endorser4Future(moduleName: String) extends ModuleBase(moduleName) {
//println(s"${pe.getSysTag}:entry 6")
if (result1) {
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"${pe.getSysTag}:entry 7"))
//if(AskPreloadTransactionOfBlock(info.blc)){
//println("entry 9")
val endtime = System.currentTimeMillis()
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"#################endorsement spent time=${endtime-starttime},,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}"))
sendEvent(EventType.RECEIVE_INFO, mediator, pe.getSysTag, Topic.Endorsement,Event.Action.ENDORSEMENT)
@ -218,7 +217,7 @@ class Endorser4Future(moduleName: String) extends ModuleBase(moduleName) {
r match {
case 0 =>
//entry endorse
VerifyInfo(info: EndorsementInfo)
VerifyInfo(info)
case 2 =>
//cache endorse,waiting revote
sender ! ResultOfEndorsed(ResultFlagOfEndorse.BlockHeightError, null, info.blc.hashOfBlock.toStringUtf8(),pe.getSystemCurrentChainStatus,pe.getBlocker)
@ -226,7 +225,7 @@ class Endorser4Future(moduleName: String) extends ModuleBase(moduleName) {
case 1 =>
//do not endorse
sender ! ResultOfEndorsed(ResultFlagOfEndorse.BlockerSelfError, null, info.blc.hashOfBlock.toStringUtf8(),pe.getSystemCurrentChainStatus,pe.getBlocker)
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"it is blocker,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}"))
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"itself,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}"))
case 3 =>
//do not endorse
sender ! ResultOfEndorsed(ResultFlagOfEndorse.CandidatorError, null, info.blc.hashOfBlock.toStringUtf8(),pe.getSystemCurrentChainStatus,pe.getBlocker)
@ -238,9 +237,9 @@ class Endorser4Future(moduleName: String) extends ModuleBase(moduleName) {
//Endorsement block
case EndorsementInfo(block, blocker) =>
if(!pe.isSynching){
RepTimeTracer.setStartTime(pe.getSysTag, "recvendorsement", System.currentTimeMillis(),block.height,block.transactions.size)
RepTimeTracer.setStartTime(pe.getSysTag, s"recvendorsement-${moduleName}", System.currentTimeMillis(),block.height,block.transactions.size)
EndorseHandler(EndorsementInfo(block, blocker))
RepTimeTracer.setEndTime(pe.getSysTag, "recvendorsement", System.currentTimeMillis(),block.height,block.transactions.size)
RepTimeTracer.setEndTime(pe.getSysTag, s"recvendorsement-${moduleName}", System.currentTimeMillis(),block.height,block.transactions.size)
}else{
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"do not endorse,it is synching,recv endorse request,endorse height=${block.height},local height=${pe.getCurrentHeight}"))
}

View File

@ -129,10 +129,8 @@ class PreloaderForTransaction(moduleName: String) extends ModuleBase(moduleName)
override def receive = {
case PreTransBlock(block,prefixOfDbTag) =>
RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "entry preload"))
//logTime("block preload inner time", System.currentTimeMillis(), true)
if ((block.previousBlockHash.toStringUtf8().equals(pe.getCurrentBlockHash) || block.previousBlockHash == ByteString.EMPTY) &&
if ((block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash || block.previousBlockHash == ByteString.EMPTY) &&
block.height == (pe.getCurrentHeight + 1)) {
var preLoadTrans = mutable.HashMap.empty[String, Transaction]
preLoadTrans = block.transactions.map(trans => (trans.id, trans))(breakOut): mutable.HashMap[String, Transaction]
@ -156,7 +154,6 @@ class PreloaderForTransaction(moduleName: String) extends ModuleBase(moduleName)
//全部或者部分交易成功
sender ! PreTransBlockResult(newblock.get,true)
}
//logTime("block preload inner time", System.currentTimeMillis(), false)
}
case _ => //ignore
}

View File

@ -29,7 +29,7 @@ import rep.network.cluster.MemberListener
import rep.network.sync.{ SynchronizeResponser, SynchronizeRequester4Future }
import rep.sc.TransactionDispatcher
import rep.network.consensus.block.{ GenesisBlocker, ConfirmOfBlock, EndorseCollector, Blocker }
import rep.network.consensus.endorse.{Endorser4Future}
import rep.network.consensus.endorse.{Endorser4Future,DispatchOfRecvEndorsement}
import rep.network.consensus.transaction.{PreloaderForTransaction}
import rep.network.consensus.vote.Voter
@ -102,8 +102,11 @@ class ModuleManager(moduleName: String, sysTag: String, enableStatistic: Boolean
context.actorOf(ConfirmOfBlock.props("confirmerofblock"), "confirmerofblock")
context.actorOf(EndorseCollector.props("endorsementcollectioner"), "endorsementcollectioner")
//context.actorOf(Endorser.props("endorser"), "endorser")
context.actorOf(Endorser4Future.props("endorser"), "endorser")
//context.actorOf(Endorser4Future.props("endorser"), "endorser")
context.actorOf(DispatchOfRecvEndorsement.props("dispatchofRecvendorsement"), "dispatchofRecvendorsement")
if(this.isStartup){
context.actorOf(TransactionDispatcher.props("transactiondispatcher"), "transactiondispatcher")
//context.actorOf(PreloaderForTransaction.props("preloaderoftransaction"),"preloaderoftransaction")

View File

@ -40,6 +40,7 @@ object Storager {
def props(name: String): Props = Props(classOf[Storager], name)
final case class BlockRestore(blk: Block, SourceOfBlock: Int, blker: ActorRef)
final case object BatchStore
case object SourceOfBlock {
val CONFIRMED_BLOCK = 1
@ -59,7 +60,7 @@ object Storager {
class Storager(moduleName: String) extends ModuleBase(moduleName) {
import context.dispatcher
import scala.concurrent.duration._
import rep.network.persistence.Storager.{ BlockRestore, SourceOfBlock }
import rep.network.persistence.Storager.{ BlockRestore, SourceOfBlock ,BatchStore}
override def preStart(): Unit = {
RepLogger.info(RepLogger.Storager_Logger, this.getLogMsgPrefix( "Storager Start"))
@ -72,16 +73,17 @@ class Storager(moduleName: String) extends ModuleBase(moduleName) {
private def SaveBlock(blkRestore: BlockRestore): Integer = {
var re: Integer = 0
try {
RepTimeTracer.setStartTime(pe.getSysTag, "storage-save", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size)
RepLogger.trace(RepLogger.Storager_Logger, this.getLogMsgPrefix( s"PreBlockHash(Before presistence): ${pe.getCurrentBlockHash}" + "~" + selfAddr))
val result = dataaccess.restoreBlock(blkRestore.blk)
if (result._1) {
RepLogger.trace(RepLogger.Storager_Logger, this.getLogMsgPrefix( s"Restore blocks success,node number: ${pe.getSysTag},block number=${blkRestore.blk.height}" + "~" + selfAddr))
RepTimeTracer.setEndTime(pe.getSysTag, "storage", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size)
if(pe.getSysTag == pe.getBlocker.blocker && pe.getBlocker.VoteHeight+1 == blkRestore.blk.height){
RepTimeTracer.setEndTime(pe.getSysTag, "Block", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size)
}
RepTimeTracer.setEndTime(pe.getSysTag, "storage-save", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size)
if(blkRestore.SourceOfBlock == SourceOfBlock.CONFIRMED_BLOCK && pe.getSysTag == pe.getBlocker.blocker && pe.getBlocker.VoteHeight+1 == blkRestore.blk.height){
RepTimeTracer.setEndTime(pe.getSysTag, "Block", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size)
}
pe.getTransPoolMgr.removeTrans(blkRestore.blk.transactions)
pe.resetSystemCurrentChainStatus(new BlockchainInfo(result._2, result._3, ByteString.copyFromUtf8(result._4), ByteString.copyFromUtf8(result._5),ByteString.copyFromUtf8(result._6)))
@ -127,8 +129,7 @@ class Storager(moduleName: String) extends ModuleBase(moduleName) {
}
}
private def Handler(_blkRestore: BlockRestore)={
pe.getBlockCacheMgr.addToCache(_blkRestore)
private def Handler={
try {
var localchaininfo = pe.getSystemCurrentChainStatus
if (localchaininfo.height <= 0) {
@ -142,6 +143,7 @@ class Storager(moduleName: String) extends ModuleBase(moduleName) {
breakable(
while(loop <= maxheight){
val _blkRestore = pe.getBlockCacheMgr.getBlockFromCache(loop)
if(loop > localchaininfo.height+1){
//发送同步消息
if(!pe.isSynching){
@ -149,7 +151,7 @@ class Storager(moduleName: String) extends ModuleBase(moduleName) {
}
break
}else{
RestoreBlock(pe.getBlockCacheMgr.getBlockFromCache(loop))
RestoreBlock(_blkRestore)
}
loop += 1l
}
@ -196,12 +198,15 @@ class Storager(moduleName: String) extends ModuleBase(moduleName) {
override def receive = {
case blkRestore: BlockRestore =>
RepLogger.trace(RepLogger.Storager_Logger, this.getLogMsgPrefix( s"node number:${pe.getSysTag},restore single block,height:${blkRestore.blk.height}" + "~" + selfAddr))
RepTimeTracer.setStartTime(pe.getSysTag, "storage", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size)
Handler(blkRestore)
/*RepTimeTracer.setEndTime(pe.getSysTag, "storage", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size)
if(pe.getSysTag == pe.getBlocker.blocker){
RepTimeTracer.setEndTime(pe.getSysTag, "Block", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size)
}*/
RepTimeTracer.setStartTime(pe.getSysTag, "storage-handle", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size)
pe.getBlockCacheMgr.addToCache(blkRestore)
Handler
RepTimeTracer.setEndTime(pe.getSysTag, "storage-handle", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size)
case BatchStore =>
RepTimeTracer.setStartTime(pe.getSysTag, "storage-handle-noarg", System.currentTimeMillis(),pe.getCurrentHeight,120)
Handler
RepTimeTracer.setEndTime(pe.getSysTag, "storage-handle-noarg", System.currentTimeMillis(),pe.getCurrentHeight,120)
case _ => //ignore
}

View File

@ -35,6 +35,7 @@ import rep.network.consensus.util.BlockHelp
import rep.log.RepLogger
import rep.utils.SerializeUtils.deserialise
import rep.storage.util.pathUtil
import rep.log.RepTimeTracer
/**
@ -413,7 +414,7 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName)
* @param newblock Block 待判断的块,lastblock Block 已知的最后区块
* @return 如果是最后的区块返回true否则返回false
*/
private def isLastBlock(newblock: Block, lastblock: Block): Boolean = {
private def isLastBlock(newblock: Block, lastblock: blockindex): Boolean = {
var b: Boolean = false
if (lastblock == null) {
if (newblock.previousBlockHash.isEmpty()) {
@ -421,12 +422,12 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName)
} else {
b = false
}
} else {
val prve = newblock.previousBlockHash.toStringUtf8()
//目前直接从上一个块中获取
val cur = lastblock.hashOfBlock.toStringUtf8()
if (prve.equals(cur) && (lastblock.height + 1) == newblock.height) {
//val cur = lastblock.hashOfBlock.toStringUtf8()
//if (prve.equals(cur) && (lastblock.height + 1) == newblock.height) {
if(prve == lastblock.getBlockHash()){
b = true
} else {
b = false
@ -537,18 +538,25 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName)
val oldh = getBlockHeight()
val oldno = this.getMaxFileNo()
val oldtxnumber = this.getBlockAllTxNumber()
var prevblock: Block = null
var prevblock: blockindex = null
if (oldh > 0) {
val tbs = this.getBlockByHeight(oldh)
prevblock = Block.parseFrom(tbs)
RepTimeTracer.setStartTime(this.SystemName, "storage-save-get-preblock", System.currentTimeMillis(),block.height,block.transactions.size)
//val tbs = this.getBlockByHeight(oldh)
prevblock = getBlockIdxByHeight(block.height-1)
//prevblock = Block.parseFrom(tbs)
RepTimeTracer.setEndTime(this.SystemName, "storage-save-get-preblock", System.currentTimeMillis(),block.height,block.transactions.size)
}
if (isLastBlock(block, prevblock)) {
try {
this.BeginTrans
RepTimeTracer.setStartTime(this.SystemName, "storage-save-write-operlog", System.currentTimeMillis(),block.height,block.transactions.size)
WriteOperLogToDBWithRestoreBlock(block)
RepTimeTracer.setEndTime(this.SystemName, "storage-save-write-operlog", System.currentTimeMillis(),block.height,block.transactions.size)
RepTimeTracer.setStartTime(this.SystemName, "storage-save-commit", System.currentTimeMillis(),block.height,block.transactions.size)
if (this.commitAndAddBlock(block, oldh, oldno, oldtxnumber)) {
RepTimeTracer.setEndTime(this.SystemName, "storage-save-commit", System.currentTimeMillis(),block.height,block.transactions.size)
this.CommitTrans
(true, block.height, oldtxnumber + block.transactions.length, block.hashOfBlock.toStringUtf8(), block.previousBlockHash.toStringUtf8(), block.stateHash.toStringUtf8())
} else {
@ -635,8 +643,9 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName)
this.setBlockAllTxNumber(newtxnumber)
//jiangbuyun modify 20180430,块写入文件系统时增加块长度写入文件中方便以后没有leveldb时可以完全依靠块文件快速恢复系统,该位置实现字节数组的合并
//bhelp.writeBlock(bidx.getBlockFileNo(), bidx.getBlockFilePos(), rbb)
RepTimeTracer.setStartTime(this.SystemName, "storage-save-write-file", System.currentTimeMillis(),block.height,block.transactions.size)
filemgr.writeBlock(bidx.getBlockFileNo(), bidx.getBlockFilePos() - 8, pathUtil.longToByte(blenght) ++ rbb)
RepTimeTracer.setEndTime(this.SystemName, "storage-save-write-file", System.currentTimeMillis(),block.height,block.transactions.size)
b = true
RepLogger.trace(
RepLogger.Storager_Logger,

View File

@ -6,6 +6,7 @@ import rep.storage.util.pathUtil
class BlockFileMgr(val SystemName: String) {
private var bw: BlockFileWriter = null
private var br: BlockFileReader = new BlockFileReader(this.SystemName)
private def checkFileWriter(fileno: Long) = {
synchronized {
@ -84,8 +85,8 @@ class BlockFileMgr(val SystemName: String) {
* @return 返回读取的区块字节数组
*/
def readBlock(fileno: Long, startpos: Long, length: Int): Array[Byte] = {
var reader = new BlockFileReader(this.SystemName)
reader.readBlock(fileno, startpos, length)
//var reader = new BlockFileReader(this.SystemName)
br.readBlock(fileno, startpos, length)
}
def longToByte(number:Long):Array[Byte]={

View File

@ -8,7 +8,35 @@ import java.nio.channels.FileChannel;
class BlockFileReader(val SystemName:String) {
private val FileName = "Repchain_BlockFile_"
private val BlockDataPath = StoreConfig4Scala.getBlockPath(SystemName)
private var rf: RandomAccessFile = null;
private var channel: FileChannel = null;
private var fileindex:Long = 0
private def openChannel(fileno:Long)={
if(rf == null || (rf != null && fileindex != fileno)){
createChannel(fileno)
}
}
private def createChannel(fileno:Long)={
synchronized {
this.FreeResouce
val fn4path = this.BlockDataPath + File.separator + FileName + fileno;
try {
var f = new File(fn4path)
if (f.exists()) {
rf = new RandomAccessFile(fn4path, "r");
channel = rf.getChannel();
fileindex = fileno
}
} catch {
case e: Exception => throw e
}
}
}
/**
* @author jiangbuyun
* @version 1.0
@ -19,17 +47,12 @@ class BlockFileReader(val SystemName:String) {
*/
def readBlock(fileno: Long, startpos: Long, length: Int): Array[Byte] = {
var rb: Array[Byte] = null
val np = StoreConfig4Scala.getBlockPath(SystemName) + File.separator + FileName + fileno
synchronized {
var rf: RandomAccessFile = null
var channel: FileChannel = null
try {
var f = new File(np)
if (!f.exists()) {
rb
} else {
rf = new RandomAccessFile(np, "r")
channel = rf.getChannel()
openChannel(fileno)
if(channel != null){
channel.position(startpos)
var buf: ByteBuffer = ByteBuffer.allocate(length)
channel.read(buf)
@ -40,27 +63,34 @@ class BlockFileReader(val SystemName:String) {
case e: Exception =>
e.printStackTrace()
throw e
} finally {
if (channel != null) {
try {
channel.close();
} catch {
case e: Exception =>
e.printStackTrace()
}
}
if (rf != null) {
try {
rf.close();
} catch {
case e: Exception =>
e.printStackTrace()
}
}
}
}
rb
}
def FreeResouce = {
if (channel != null) {
try {
channel.close();
channel = null
} catch {
case e: Exception =>
channel = null
e.printStackTrace()
}
}
if (rf != null) {
try {
rf.close();
rf = null
} catch {
case e: Exception =>
rf = null
e.printStackTrace();
}
}
this.fileindex = 0
}
}

View File

@ -81,6 +81,7 @@ object GlobalUtils {
val gensisblock = 17
val api = 18
val transactiondispatcher = 19
val dispatchofRecvendorsement = 20
}

View File

@ -253,9 +253,9 @@ object blockDataCheck extends App {
//writePretty(b.endorsements)
}
println(readBlockToString(da4,1))
/*println(readBlockToString(da4,1))
println(readBlockToString(da4,2))
println(readBlockToString(da4,3))
println(readBlockToString(da4,3))*/
/*da5.FindByLike("rechain_", 1).foreach(f=>{
println(f._1)
@ -282,6 +282,13 @@ object blockDataCheck extends App {
}
}
var l : Long = 119649
for(i<-0 to 10){
getblockerForheight(da1, l)
l = l + 1
}
def getblockerForheight(da: ImpDataAccess, h: Long) = {
var nodes = new Array[String](5)
nodes(0) = "12110107bi45jh675g.node2"
@ -292,7 +299,7 @@ object blockDataCheck extends App {
val b = da.getBlock4ObjectByHeight(h)
val candidatorCur = candidators(da.getSystemName, b.hashOfBlock.toStringUtf8(), nodes.toSet, Sha256.hash(b.hashOfBlock.toStringUtf8()))
println(s"height=$ch,systemname=${da.SystemName},${candidatorCur.mkString("|")}")
println(s"height=$h,systemname=${da.SystemName},${candidatorCur.mkString("|")}")
}
///////////////////////////////////////vote code/////////////////////////////////////////////////