From a971b1ccd942710deac89fbe6af82de74172451a Mon Sep 17 00:00:00 2001 From: wuwei1972 Date: Thu, 16 May 2019 16:59:25 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E5=87=BA=E5=9D=97=E4=B8=AD?= =?UTF-8?q?=E9=97=B4=E5=81=9C=E9=A1=BF=EF=BC=8C=E4=B8=8D=E8=83=BD=E8=BF=9E?= =?UTF-8?q?=E7=BB=AD=E5=87=BA=E5=9D=97=E7=9A=84=E9=97=AE=E9=A2=98=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scala/rep/network/base/ModuleBase.scala | 1 + .../rep/network/consensus/block/Blocker.scala | 17 +--- .../consensus/block/ConfirmOfBlock.scala | 6 +- .../consensus/block/EndorseCollector.scala | 63 +++++++------- .../block/EndorsementRequest4Future.scala | 86 +++++++++++-------- .../endorse/DispatchOfRecvEndorsement.scala | 60 +++++++++++++ .../consensus/endorse/EndorseMsg.scala | 2 + .../consensus/endorse/Endorser4Future.scala | 15 ++-- .../transaction/PreloaderForTransaction.scala | 5 +- .../rep/network/module/ModuleManager.scala | 9 +- .../rep/network/persistence/Storager.scala | 35 ++++---- .../scala/rep/storage/ImpDataAccess.scala | 25 ++++-- .../rep/storage/block/BlockFileMgr.scala | 5 +- .../rep/storage/block/BlockFileReader.scala | 84 ++++++++++++------ src/main/scala/rep/utils/GlobalUtils.scala | 1 + .../rep/storage/test/blockDataCheck.scala | 13 ++- 16 files changed, 273 insertions(+), 154 deletions(-) create mode 100644 src/main/scala/rep/network/consensus/endorse/DispatchOfRecvEndorsement.scala diff --git a/src/main/scala/rep/network/base/ModuleBase.scala b/src/main/scala/rep/network/base/ModuleBase.scala index 5440aebc..e70555e5 100644 --- a/src/main/scala/rep/network/base/ModuleBase.scala +++ b/src/main/scala/rep/network/base/ModuleBase.scala @@ -76,6 +76,7 @@ abstract class ModuleBase(name: String) extends Actor with ClusterActor with B case "gensisblock" => 17 case "api" => 18 case "transactiondispatcher" => 19 + case "dispatchofRecvendorsement" => 20 case _ => 0 } } diff --git a/src/main/scala/rep/network/consensus/block/Blocker.scala b/src/main/scala/rep/network/consensus/block/Blocker.scala index b1cbe3de..996a761d 100644 --- a/src/main/scala/rep/network/consensus/block/Blocker.scala +++ b/src/main/scala/rep/network/consensus/block/Blocker.scala @@ -144,7 +144,8 @@ class Blocker(moduleName: String) extends ModuleBase(moduleName) { //todo 交易排序 if (trans.size >= SystemProfile.getMinBlockTransNum) { RepTimeTracer.setEndTime(pe.getSysTag, "collectTransToBlock", System.currentTimeMillis(),pe.getCurrentHeight + 1,trans.size) - var blc = BlockHelp.WaitingForExecutionOfBlock(pe.getCurrentBlockHash, pe.getCurrentHeight + 1, trans) + //此处建立新块必须采用抽签模块的抽签结果来进行出块,否则出现刚抽完签,马上有新块的存储完成,就会出现错误 + var blc = BlockHelp.WaitingForExecutionOfBlock(pe.getBlocker.voteBlockHash, pe.getBlocker.VoteHeight + 1, trans) RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,height=${blc.height},local height=${pe.getCurrentHeight}" + "~" + selfAddr)) RepTimeTracer.setStartTime(pe.getSysTag, "PreloadTrans", System.currentTimeMillis(),blc.height,blc.transactions.size) blc = ExecuteTransactionOfBlock(blc) @@ -185,30 +186,18 @@ class Blocker(moduleName: String) extends ModuleBase(moduleName) { sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Block, Event.Action.CANDIDATOR) //是出块节点 - if (preblock == null) { + if (preblock == null || (preblock.previousBlockHash.toStringUtf8() != pe.getBlocker.voteBlockHash)) { CreateBlockHandler - } else { - if (preblock.previousBlockHash.toStringUtf8() != pe.getBlocker.voteBlockHash) { - //上一个块已经变化,需要重新出块 - CreateBlockHandler - } } } else { //出块标识错误,暂时不用做任何处理 RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,do not blocker or blocker hash not equal current hash,height=${this.preblock.height}" + "~" + selfAddr)) } - } else { //节点状态不对 RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,node status error,status is synching,height=${this.preblock.height}" + "~" + selfAddr)) } - //出块超时 - /*case Blocker.EndorseOfBlockTimeOut => - schedulerLink = clearSched() - RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"create new block,send endorse collector,height=${this.preblock.height},local height=${pe.getCurrentHeight}" + "~" + selfAddr)) - pe.getActorRef(ActorType.endorsementcollectioner) ! EndorseMsg.CollectEndorsement(this.preblock, pe.getBlocker.blocker) - schedulerLink = scheduler.scheduleOnce(TimePolicy.getTimeoutEndorse seconds, self, Blocker.EndorseOfBlockTimeOut)*/ case _ => //ignore } diff --git a/src/main/scala/rep/network/consensus/block/ConfirmOfBlock.scala b/src/main/scala/rep/network/consensus/block/ConfirmOfBlock.scala index fa55436a..04b7c70d 100644 --- a/src/main/scala/rep/network/consensus/block/ConfirmOfBlock.scala +++ b/src/main/scala/rep/network/consensus/block/ConfirmOfBlock.scala @@ -34,7 +34,7 @@ import scala.util.control.Breaks._ import scala.util.control.Exception.Finally import java.util.concurrent.ConcurrentHashMap import rep.network.consensus.block.Blocker.{ ConfirmedBlock } -import rep.network.persistence.Storager.{ BlockRestore, SourceOfBlock } +import rep.network.persistence.Storager.{ BlockRestore, SourceOfBlock ,BatchStore} import rep.network.consensus.util.{ BlockVerify, BlockHelp } import rep.log.RepLogger import rep.log.RepTimeTracer @@ -102,8 +102,10 @@ class ConfirmOfBlock(moduleName: String) extends ModuleBase(moduleName) { if (BlockVerify.VerifyEndorserSorted(block.endorsements.toArray[Signature]) == 1 || (block.height==1 && pe.getCurrentBlockHash == "" && block.previousBlockHash.isEmpty())) { //背书信息排序正确 RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "confirm verify endorsement sort")) + pe.getBlockCacheMgr.addToCache(BlockRestore(block, SourceOfBlock.CONFIRMED_BLOCK, actRefOfBlock)) + pe.getActorRef(ActorType.storager) ! BatchStore sendEvent(EventType.RECEIVE_INFO, mediator, pe.getSysTag, Topic.Block, Event.Action.BLOCK_NEW) - pe.getActorRef(ActorType.storager) ! BlockRestore(block, SourceOfBlock.CONFIRMED_BLOCK, actRefOfBlock) + //pe.getActorRef(ActorType.storager) ! BlockRestore(block, SourceOfBlock.CONFIRMED_BLOCK, actRefOfBlock) } else { ////背书信息排序错误 } diff --git a/src/main/scala/rep/network/consensus/block/EndorseCollector.scala b/src/main/scala/rep/network/consensus/block/EndorseCollector.scala index 3989703f..83651794 100644 --- a/src/main/scala/rep/network/consensus/block/EndorseCollector.scala +++ b/src/main/scala/rep/network/consensus/block/EndorseCollector.scala @@ -21,7 +21,7 @@ import akka.cluster.pubsub.DistributedPubSubMediator.Publish import akka.routing._; import rep.app.conf.{ SystemProfile, TimePolicy } import rep.network.base.ModuleBase -import rep.network.consensus.endorse.EndorseMsg.{ RequesterOfEndorsement, ResultOfEndorseRequester, CollectEndorsement } +import rep.network.consensus.endorse.EndorseMsg.{ RequesterOfEndorsement, ResultOfEndorseRequester, CollectEndorsement,ResendEndorseInfo } import rep.network.consensus.block.Blocker.ConfirmedBlock import rep.network.tools.PeerExtension import rep.network.Topic @@ -38,7 +38,6 @@ import rep.log.RepLogger import rep.log.RepTimeTracer object EndorseCollector { - case object ResendEndorseInfo def props(name: String): Props = Props(classOf[EndorseCollector], name) } @@ -58,10 +57,8 @@ class EndorseCollector(moduleName: String) extends ModuleBase(moduleName) { private def createRouter = { if (router == null) { - var list: Array[Routee] = new Array[Routee](SystemProfile.getVoteNodeList.size()) - for (i <- 0 to SystemProfile.getVoteNodeList.size() - 1) { - //EndorsementRequest4Future - //var ca = context.actorOf(EnodorsementRequester.props("endorsementrequester" + i), "endorsementrequester" + i) + var list: Array[Routee] = new Array[Routee](SystemProfile.getVoteNodeList.size()*3) + for (i <- 0 to SystemProfile.getVoteNodeList.size()*3 - 1) { var ca = context.actorOf(EndorsementRequest4Future.props("endorsementrequester" + i), "endorsementrequester" + i) context.watch(ca) list(i) = new ActorRefRoutee(ca) @@ -75,31 +72,20 @@ class EndorseCollector(moduleName: String) extends ModuleBase(moduleName) { this.block = block this.blocker = blocker this.recvedEndorse = this.recvedEndorse.empty - //schedulerLink = clearSched() } private def clearEndorseInfo = { this.block = null this.blocker = null this.recvedEndorse = this.recvedEndorse.empty - //schedulerLink = clearSched() } - private def resendEndorser = { - //schedulerLink = clearSched() - pe.getNodeMgr.getStableNodes.foreach(f => { - if (!recvedEndorse.contains(f.toString)) { - router.route(RequesterOfEndorsement(block, blocker, f), self) - } - }) - //schedulerLink = scheduler.scheduleOnce(TimePolicy.getTimeoutEndorse seconds, self, EndorseCollector.ResendEndorseInfo) - } + private def CheckAndFinishHandler { sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Endorsement, Event.Action.ENDORSEMENT) - RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("collectioner check is finish ")) + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("entry collectioner check ")) if (NodeHelp.ConsensusConditionChecked(this.recvedEndorse.size + 1, pe.getNodeMgr.getNodes.size)) { - //schedulerLink = clearSched() RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix("collectioner package endorsement to block")) this.recvedEndorse.foreach(f => { this.block = BlockHelp.AddEndorsementToBlock(this.block, f._2) @@ -120,26 +106,24 @@ class EndorseCollector(moduleName: String) extends ModuleBase(moduleName) { override def receive = { case CollectEndorsement(block, blocker) => createRouter - if (this.block != null && this.block.hashOfBlock.toStringUtf8().equals(block.hashOfBlock.toStringUtf8())) { + if (this.block != null && this.block.hashOfBlock.toStringUtf8() == block.hashOfBlock.toStringUtf8()) { RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"collectioner is waiting endorse result,height=${block.height},local height=${pe.getCurrentHeight}")) } else { - RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner recv endorsement,height=${block.height},local height=${pe.getCurrentHeight}")) - resetEndorseInfo(block, blocker) - pe.getNodeMgr.getStableNodes.foreach(f => { - RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner send endorsement to requester,height=${block.height},local height=${pe.getCurrentHeight}")) - router.route(RequesterOfEndorsement(block, blocker, f), self) - }) - //schedulerLink = scheduler.scheduleOnce(TimePolicy.getTimeoutEndorse seconds, self, EndorseCollector.ResendEndorseInfo) + if(block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash){ + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner recv endorsement,height=${block.height},local height=${pe.getCurrentHeight}")) + resetEndorseInfo(block, blocker) + pe.getNodeMgr.getStableNodes.foreach(f => { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner send endorsement to requester,height=${block.height},local height=${pe.getCurrentHeight}")) + router.route(RequesterOfEndorsement(block, blocker, f), self) + }) + }else{ + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner back out endorsement request,height=${block.height},local height=${pe.getCurrentHeight}")) + } } - /*case EndorseCollector.ResendEndorseInfo => - if (this.block != null) { - logMsg(LogType.INFO, "collectioner resend endorsement") - resendEndorser - }*/ case ResultOfEndorseRequester(result, endors, blockhash, endorser) => - if (this.block != null) { - if (this.block.hashOfBlock.toStringUtf8().equals(blockhash)) { + //block不空,该块的上一个块等于最后存储的hash,背书结果的块hash跟当前发出的块hash一致 + if (this.block != null && this.block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash && this.block.hashOfBlock.toStringUtf8() == blockhash) { if (result) { RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner recv endorsement result,height=${block.height},local height=${pe.getCurrentHeight}")) recvedEndorse += endorser.toString -> endors @@ -147,7 +131,18 @@ class EndorseCollector(moduleName: String) extends ModuleBase(moduleName) { } else { RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"collectioner recv endorsement result,is error,height=${block.height},local height=${pe.getCurrentHeight}")) } + }else{ + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner back out endorsement result,local height=${pe.getCurrentHeight}")) + } + case ResendEndorseInfo(endorer)=> + if (this.block != null && this.block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash ) { + if(this.router != null){ + router.route(RequesterOfEndorsement(this.block, this.blocker, endorer), self) + }else{ + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner's router is null,height=${block.height},local height=${pe.getCurrentHeight}")) } + }else{ + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"collectioner back out resend endorsement request,local height=${pe.getCurrentHeight}")) } case _ => //ignore } diff --git a/src/main/scala/rep/network/consensus/block/EndorsementRequest4Future.scala b/src/main/scala/rep/network/consensus/block/EndorsementRequest4Future.scala index d86378c5..85ef49a2 100644 --- a/src/main/scala/rep/network/consensus/block/EndorsementRequest4Future.scala +++ b/src/main/scala/rep/network/consensus/block/EndorsementRequest4Future.scala @@ -27,7 +27,7 @@ import com.google.protobuf.ByteString import com.google.protobuf.timestamp.Timestamp import rep.app.conf.{ SystemProfile, TimePolicy } import rep.network.base.ModuleBase -import rep.network.consensus.endorse.EndorseMsg.{ EndorsementInfo, ResultOfEndorsed, RequesterOfEndorsement, ResultOfEndorseRequester, ResultFlagOfEndorse } +import rep.network.consensus.endorse.EndorseMsg.{ EndorsementInfo, ResultOfEndorsed, RequesterOfEndorsement, ResultOfEndorseRequester, ResultFlagOfEndorse,ResendEndorseInfo } import rep.network.tools.PeerExtension import rep.network.Topic import rep.protos.peer._ @@ -38,6 +38,7 @@ import scala.util.control.Breaks import rep.utils.GlobalUtils.{ EventType, ActorType } import rep.network.sync.SyncMsg.StartSync import rep.log.RepLogger +import rep.log.RepTimeTracer object EndorsementRequest4Future { def props(name: String): Props = Props(classOf[EndorsementRequest4Future], name) @@ -49,7 +50,8 @@ class EndorsementRequest4Future(moduleName: String) extends ModuleBase(moduleNam import scala.concurrent.duration._ implicit val timeout = Timeout(TimePolicy.getTimeoutEndorse seconds) - private val endorsementActorName = "/user/modulemanager/endorser" + //private val endorsementActorName = "/user/modulemanager/endorser" + private val endorsementActorName = "/user/modulemanager/dispatchofRecvendorsement" override def preStart(): Unit = { RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "EndorsementRequest4Future Start")) @@ -78,51 +80,67 @@ class EndorsementRequest4Future(moduleName: String) extends ModuleBase(moduleNam private def EndorsementVerify(block: Block, result: ResultOfEndorsed): Boolean = { val bb = block.clearEndorsements.toByteArray val ev = BlockVerify.VerifyOneEndorseOfBlock(result.endor, bb, pe.getSysTag) - if (ev._1) { - true - } else { - false - } + ev._1 } private def handler(reqinfo: RequesterOfEndorsement) = { schedulerLink = clearSched() - val result = this.ExecuteOfEndorsement(reqinfo.endorer, EndorsementInfo(reqinfo.blc, reqinfo.blocker)) - if (result != null) { - if (result.result == ResultFlagOfEndorse.success) { - if (EndorsementVerify(reqinfo.blc, result)) { - val re = ResultOfEndorseRequester(true, result.endor, result.BlockHash, reqinfo.endorer) - context.parent ! re - RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"--------endorsementRequest4Future, send endorsement, height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} ")) - } else { - RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"--------endorsementRequest4Future recv endorsement result is error, result=${result.result},height=${reqinfo.blc.height},local height=${pe.getCurrentHeight}")) - context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer) - } - } else { - if (result.result == ResultFlagOfEndorse.BlockHeightError) { - if (result.endorserOfChainInfo.height > pe.getCurrentHeight + 1) { - pe.getActorRef(ActorType.synchrequester) ! StartSync(false) - context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer) - RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"--------endorsementRequest4Future recv endorsement result must synch,height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} ")) + + RepTimeTracer.setStartTime(pe.getSysTag, s"Endorsement-request-${moduleName}", System.currentTimeMillis(),reqinfo.blc.height,reqinfo.blc.transactions.size) + val result = this.ExecuteOfEndorsement(reqinfo.endorer, EndorsementInfo(reqinfo.blc, reqinfo.blocker)) + if (result != null) { + if (result.result == ResultFlagOfEndorse.success) { + if (EndorsementVerify(reqinfo.blc, result)) { + val re = ResultOfEndorseRequester(true, result.endor, result.BlockHash, reqinfo.endorer) + context.parent ! re + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"--------endorsementRequest4Future, send endorsement, height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} ")) } else { - schedulerLink = scheduler.scheduleOnce(TimePolicy.getTimeoutEndorse seconds, self, RequesterOfEndorsement(reqinfo.blc, reqinfo.blocker, reqinfo.endorer)) + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"--------endorsementRequest4Future recv endorsement result is error, result=${result.result},height=${reqinfo.blc.height},local height=${pe.getCurrentHeight}")) + context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer) } } else { - context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer) + if (result.result == ResultFlagOfEndorse.BlockHeightError) { + if (result.endorserOfChainInfo.height > pe.getCurrentHeight + 1) { + //todo 需要从块缓冲判断是否启动块同步 + pe.getActorRef(ActorType.synchrequester) ! StartSync(false) + context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer) + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"--------endorsementRequest4Future recv endorsement result must synch,height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} ")) + } else { + //if(pe.getCurrentHeight >= reqinfo.blc.height){ + //schedulerLink = scheduler.scheduleOnce(30 milliseconds, self, RequesterOfEndorsement(reqinfo.blc, reqinfo.blocker, reqinfo.endorer)) + // schedulerLink = scheduler.scheduleOnce(30 milliseconds, self, RequesterOfEndorsement(reqinfo.blc, reqinfo.blocker, reqinfo.endorer)) + //} + /*try{ + Thread.sleep(30) + context.parent ! ResendEndorseInfo(reqinfo.endorer) + }catch{ + case e:Exception => + RepLogger.error(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"--------endorsementRequest4Future sleep happen error,height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} ")) + }*/ + context.parent ! ResendEndorseInfo(reqinfo.endorer) + } + } else { + //context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer) + context.parent ! ResendEndorseInfo(reqinfo.endorer) + } } + } else { + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"--------endorsementRequest4Future recv endorsement result is null,height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} ")) + //context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer) + context.parent ! ResendEndorseInfo(reqinfo.endorer) } - } else { - RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"--------endorsementRequest4Future recv endorsement result is null,height=${reqinfo.blc.height},local height=${pe.getCurrentHeight} ")) - context.parent ! ResultOfEndorseRequester(false, null, reqinfo.blc.hashOfBlock.toStringUtf8(), reqinfo.endorer) - } + + RepTimeTracer.setEndTime(pe.getSysTag, s"Endorsement-request-${moduleName}", System.currentTimeMillis(),reqinfo.blc.height,reqinfo.blc.transactions.size) } override def receive = { case RequesterOfEndorsement(block, blocker, addr) => - handler(RequesterOfEndorsement(block, blocker, addr)) - - //case ResultOfEndorsed(result, endor, blockhash)=> - // handlerOfResult(ResultOfEndorsed(result, endor, blockhash)) + //待请求背书的块的上一个块的hash不等于系统最新的上一个块的hash,停止发送背书 + if(block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash){ + handler(RequesterOfEndorsement(block, blocker, addr)) + }else{ + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"--------endorsementRequest4Future back out endorsement,prehash not equal pe.currenthash ,height=${block.height},local height=${pe.getCurrentHeight} ")) + } case _ => //ignore } } \ No newline at end of file diff --git a/src/main/scala/rep/network/consensus/endorse/DispatchOfRecvEndorsement.scala b/src/main/scala/rep/network/consensus/endorse/DispatchOfRecvEndorsement.scala new file mode 100644 index 00000000..6ba1a304 --- /dev/null +++ b/src/main/scala/rep/network/consensus/endorse/DispatchOfRecvEndorsement.scala @@ -0,0 +1,60 @@ +package rep.network.consensus.endorse + +import akka.actor.{ Actor, ActorRef, Props, Address, ActorSelection } +import akka.cluster.pubsub.DistributedPubSubMediator.Publish +import akka.routing._; +import rep.app.conf.{ SystemProfile, TimePolicy } +import rep.network.base.ModuleBase +import rep.network.tools.PeerExtension +import rep.network.Topic +import rep.protos.peer._ +import rep.utils.GlobalUtils.{ EventType } +import rep.utils._ +import scala.collection.mutable._ +import rep.network.consensus.util.BlockVerify +import scala.util.control.Breaks +import rep.network.util.NodeHelp +import rep.network.consensus.util.BlockHelp +import rep.network.consensus.util.BlockVerify +import rep.log.RepLogger +import rep.log.RepTimeTracer +import rep.network.consensus.endorse.EndorseMsg.{ EndorsementInfo} + +object DispatchOfRecvEndorsement { + def props(name: String): Props = Props(classOf[DispatchOfRecvEndorsement], name) +} + + +class DispatchOfRecvEndorsement(moduleName: String) extends ModuleBase(moduleName) { + import context.dispatcher + import scala.concurrent.duration._ + import scala.collection.immutable._ + + private var router: Router = null + + override def preStart(): Unit = { + RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "DispatchOfRecvEndorsement Start")) + } + + private def createRouter = { + if (router == null) { + var list: Array[Routee] = new Array[Routee](SystemProfile.getVoteNodeList.size()) + for (i <- 0 to SystemProfile.getVoteNodeList.size() - 1) { + var ca = context.actorOf(Endorser4Future.props("endorser" + i), "endorser" + i) + context.watch(ca) + list(i) = new ActorRefRoutee(ca) + } + val rlist: IndexedSeq[Routee] = list.toIndexedSeq + router = Router(SmallestMailboxRoutingLogic(), rlist) + } + } + + + + override def receive = { + case EndorsementInfo(block, blocker) => + createRouter + router.route(EndorsementInfo(block, blocker), sender) + case _ => //ignore + } +} \ No newline at end of file diff --git a/src/main/scala/rep/network/consensus/endorse/EndorseMsg.scala b/src/main/scala/rep/network/consensus/endorse/EndorseMsg.scala index e7014b46..71696e60 100644 --- a/src/main/scala/rep/network/consensus/endorse/EndorseMsg.scala +++ b/src/main/scala/rep/network/consensus/endorse/EndorseMsg.scala @@ -33,6 +33,8 @@ object EndorseMsg { //背书请求者消息 case class RequesterOfEndorsement(blc: Block, blocker: String, endorer: Address) + case class ResendEndorseInfo(endorer: Address) + //给背书人的背书消息 case class EndorsementInfo(blc: Block, blocker: String) diff --git a/src/main/scala/rep/network/consensus/endorse/Endorser4Future.scala b/src/main/scala/rep/network/consensus/endorse/Endorser4Future.scala index c0df86c9..c3eed449 100644 --- a/src/main/scala/rep/network/consensus/endorse/Endorser4Future.scala +++ b/src/main/scala/rep/network/consensus/endorse/Endorser4Future.scala @@ -158,14 +158,15 @@ class Endorser4Future(moduleName: String) extends ModuleBase(moduleName) { //if (info.blc.previousBlockHash.toStringUtf8 == pe.getCurrentBlockHash && NodeHelp.isBlocker(info.blocker, pe.getBlocker.blocker)) { if (info.blc.previousBlockHash.toStringUtf8 == pe.getBlocker.voteBlockHash && NodeHelp.isBlocker(info.blocker, pe.getBlocker.blocker)) { //可以进入背书 - RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "vote result equal,allow entry endorse,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}")) + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"vote result equal,allow entry endorse,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}")) 0 } else { + //todo 需要判断区块缓存,再决定是否需要启动同步 if(info.blc.height > pe.getCurrentHeight+1){ pe.getActorRef(ActorType.synchrequester) ! StartSync(false) } //当前块hash和抽签的出块人都不一致,暂时不能够进行背书,可以进行缓存 - RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "block hash is not equal or blocker is not equal,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}")) + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"block hash is not equal or blocker is not equal,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}")) 2 } } else { @@ -199,8 +200,6 @@ class Endorser4Future(moduleName: String) extends ModuleBase(moduleName) { //println(s"${pe.getSysTag}:entry 6") if (result1) { RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"${pe.getSysTag}:entry 7")) - //if(AskPreloadTransactionOfBlock(info.blc)){ - //println("entry 9") val endtime = System.currentTimeMillis() RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( s"#################endorsement spent time=${endtime-starttime},,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}")) sendEvent(EventType.RECEIVE_INFO, mediator, pe.getSysTag, Topic.Endorsement,Event.Action.ENDORSEMENT) @@ -218,7 +217,7 @@ class Endorser4Future(moduleName: String) extends ModuleBase(moduleName) { r match { case 0 => //entry endorse - VerifyInfo(info: EndorsementInfo) + VerifyInfo(info) case 2 => //cache endorse,waiting revote sender ! ResultOfEndorsed(ResultFlagOfEndorse.BlockHeightError, null, info.blc.hashOfBlock.toStringUtf8(),pe.getSystemCurrentChainStatus,pe.getBlocker) @@ -226,7 +225,7 @@ class Endorser4Future(moduleName: String) extends ModuleBase(moduleName) { case 1 => //do not endorse sender ! ResultOfEndorsed(ResultFlagOfEndorse.BlockerSelfError, null, info.blc.hashOfBlock.toStringUtf8(),pe.getSystemCurrentChainStatus,pe.getBlocker) - RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"it is blocker,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}")) + RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"itself,recv endorse request,endorse height=${info.blc.height},local height=${pe.getCurrentHeight}")) case 3 => //do not endorse sender ! ResultOfEndorsed(ResultFlagOfEndorse.CandidatorError, null, info.blc.hashOfBlock.toStringUtf8(),pe.getSystemCurrentChainStatus,pe.getBlocker) @@ -238,9 +237,9 @@ class Endorser4Future(moduleName: String) extends ModuleBase(moduleName) { //Endorsement block case EndorsementInfo(block, blocker) => if(!pe.isSynching){ - RepTimeTracer.setStartTime(pe.getSysTag, "recvendorsement", System.currentTimeMillis(),block.height,block.transactions.size) + RepTimeTracer.setStartTime(pe.getSysTag, s"recvendorsement-${moduleName}", System.currentTimeMillis(),block.height,block.transactions.size) EndorseHandler(EndorsementInfo(block, blocker)) - RepTimeTracer.setEndTime(pe.getSysTag, "recvendorsement", System.currentTimeMillis(),block.height,block.transactions.size) + RepTimeTracer.setEndTime(pe.getSysTag, s"recvendorsement-${moduleName}", System.currentTimeMillis(),block.height,block.transactions.size) }else{ RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix(s"do not endorse,it is synching,recv endorse request,endorse height=${block.height},local height=${pe.getCurrentHeight}")) } diff --git a/src/main/scala/rep/network/consensus/transaction/PreloaderForTransaction.scala b/src/main/scala/rep/network/consensus/transaction/PreloaderForTransaction.scala index 1f125b7d..e7217297 100644 --- a/src/main/scala/rep/network/consensus/transaction/PreloaderForTransaction.scala +++ b/src/main/scala/rep/network/consensus/transaction/PreloaderForTransaction.scala @@ -129,10 +129,8 @@ class PreloaderForTransaction(moduleName: String) extends ModuleBase(moduleName) override def receive = { case PreTransBlock(block,prefixOfDbTag) => - RepLogger.trace(RepLogger.Consensus_Logger, this.getLogMsgPrefix( "entry preload")) - //logTime("block preload inner time", System.currentTimeMillis(), true) - if ((block.previousBlockHash.toStringUtf8().equals(pe.getCurrentBlockHash) || block.previousBlockHash == ByteString.EMPTY) && + if ((block.previousBlockHash.toStringUtf8() == pe.getCurrentBlockHash || block.previousBlockHash == ByteString.EMPTY) && block.height == (pe.getCurrentHeight + 1)) { var preLoadTrans = mutable.HashMap.empty[String, Transaction] preLoadTrans = block.transactions.map(trans => (trans.id, trans))(breakOut): mutable.HashMap[String, Transaction] @@ -156,7 +154,6 @@ class PreloaderForTransaction(moduleName: String) extends ModuleBase(moduleName) //全部或者部分交易成功 sender ! PreTransBlockResult(newblock.get,true) } - //logTime("block preload inner time", System.currentTimeMillis(), false) } case _ => //ignore } diff --git a/src/main/scala/rep/network/module/ModuleManager.scala b/src/main/scala/rep/network/module/ModuleManager.scala index 9b113a35..691384e5 100644 --- a/src/main/scala/rep/network/module/ModuleManager.scala +++ b/src/main/scala/rep/network/module/ModuleManager.scala @@ -29,7 +29,7 @@ import rep.network.cluster.MemberListener import rep.network.sync.{ SynchronizeResponser, SynchronizeRequester4Future } import rep.sc.TransactionDispatcher import rep.network.consensus.block.{ GenesisBlocker, ConfirmOfBlock, EndorseCollector, Blocker } -import rep.network.consensus.endorse.{Endorser4Future} +import rep.network.consensus.endorse.{Endorser4Future,DispatchOfRecvEndorsement} import rep.network.consensus.transaction.{PreloaderForTransaction} import rep.network.consensus.vote.Voter @@ -102,8 +102,11 @@ class ModuleManager(moduleName: String, sysTag: String, enableStatistic: Boolean context.actorOf(ConfirmOfBlock.props("confirmerofblock"), "confirmerofblock") context.actorOf(EndorseCollector.props("endorsementcollectioner"), "endorsementcollectioner") - //context.actorOf(Endorser.props("endorser"), "endorser") - context.actorOf(Endorser4Future.props("endorser"), "endorser") + + //context.actorOf(Endorser4Future.props("endorser"), "endorser") + context.actorOf(DispatchOfRecvEndorsement.props("dispatchofRecvendorsement"), "dispatchofRecvendorsement") + + if(this.isStartup){ context.actorOf(TransactionDispatcher.props("transactiondispatcher"), "transactiondispatcher") //context.actorOf(PreloaderForTransaction.props("preloaderoftransaction"),"preloaderoftransaction") diff --git a/src/main/scala/rep/network/persistence/Storager.scala b/src/main/scala/rep/network/persistence/Storager.scala index eced38c1..c4543355 100644 --- a/src/main/scala/rep/network/persistence/Storager.scala +++ b/src/main/scala/rep/network/persistence/Storager.scala @@ -40,6 +40,7 @@ object Storager { def props(name: String): Props = Props(classOf[Storager], name) final case class BlockRestore(blk: Block, SourceOfBlock: Int, blker: ActorRef) + final case object BatchStore case object SourceOfBlock { val CONFIRMED_BLOCK = 1 @@ -59,7 +60,7 @@ object Storager { class Storager(moduleName: String) extends ModuleBase(moduleName) { import context.dispatcher import scala.concurrent.duration._ - import rep.network.persistence.Storager.{ BlockRestore, SourceOfBlock } + import rep.network.persistence.Storager.{ BlockRestore, SourceOfBlock ,BatchStore} override def preStart(): Unit = { RepLogger.info(RepLogger.Storager_Logger, this.getLogMsgPrefix( "Storager Start")) @@ -72,16 +73,17 @@ class Storager(moduleName: String) extends ModuleBase(moduleName) { private def SaveBlock(blkRestore: BlockRestore): Integer = { var re: Integer = 0 try { - + RepTimeTracer.setStartTime(pe.getSysTag, "storage-save", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size) RepLogger.trace(RepLogger.Storager_Logger, this.getLogMsgPrefix( s"PreBlockHash(Before presistence): ${pe.getCurrentBlockHash}" + "~" + selfAddr)) val result = dataaccess.restoreBlock(blkRestore.blk) if (result._1) { RepLogger.trace(RepLogger.Storager_Logger, this.getLogMsgPrefix( s"Restore blocks success,node number: ${pe.getSysTag},block number=${blkRestore.blk.height}" + "~" + selfAddr)) - RepTimeTracer.setEndTime(pe.getSysTag, "storage", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size) - if(pe.getSysTag == pe.getBlocker.blocker && pe.getBlocker.VoteHeight+1 == blkRestore.blk.height){ - RepTimeTracer.setEndTime(pe.getSysTag, "Block", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size) - } + RepTimeTracer.setEndTime(pe.getSysTag, "storage-save", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size) + + if(blkRestore.SourceOfBlock == SourceOfBlock.CONFIRMED_BLOCK && pe.getSysTag == pe.getBlocker.blocker && pe.getBlocker.VoteHeight+1 == blkRestore.blk.height){ + RepTimeTracer.setEndTime(pe.getSysTag, "Block", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size) + } pe.getTransPoolMgr.removeTrans(blkRestore.blk.transactions) pe.resetSystemCurrentChainStatus(new BlockchainInfo(result._2, result._3, ByteString.copyFromUtf8(result._4), ByteString.copyFromUtf8(result._5),ByteString.copyFromUtf8(result._6))) @@ -127,8 +129,7 @@ class Storager(moduleName: String) extends ModuleBase(moduleName) { } } - private def Handler(_blkRestore: BlockRestore)={ - pe.getBlockCacheMgr.addToCache(_blkRestore) + private def Handler={ try { var localchaininfo = pe.getSystemCurrentChainStatus if (localchaininfo.height <= 0) { @@ -142,6 +143,7 @@ class Storager(moduleName: String) extends ModuleBase(moduleName) { breakable( while(loop <= maxheight){ + val _blkRestore = pe.getBlockCacheMgr.getBlockFromCache(loop) if(loop > localchaininfo.height+1){ //发送同步消息 if(!pe.isSynching){ @@ -149,7 +151,7 @@ class Storager(moduleName: String) extends ModuleBase(moduleName) { } break }else{ - RestoreBlock(pe.getBlockCacheMgr.getBlockFromCache(loop)) + RestoreBlock(_blkRestore) } loop += 1l } @@ -196,12 +198,15 @@ class Storager(moduleName: String) extends ModuleBase(moduleName) { override def receive = { case blkRestore: BlockRestore => RepLogger.trace(RepLogger.Storager_Logger, this.getLogMsgPrefix( s"node number:${pe.getSysTag},restore single block,height:${blkRestore.blk.height}" + "~" + selfAddr)) - RepTimeTracer.setStartTime(pe.getSysTag, "storage", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size) - Handler(blkRestore) - /*RepTimeTracer.setEndTime(pe.getSysTag, "storage", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size) - if(pe.getSysTag == pe.getBlocker.blocker){ - RepTimeTracer.setEndTime(pe.getSysTag, "Block", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size) - }*/ + RepTimeTracer.setStartTime(pe.getSysTag, "storage-handle", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size) + pe.getBlockCacheMgr.addToCache(blkRestore) + Handler + RepTimeTracer.setEndTime(pe.getSysTag, "storage-handle", System.currentTimeMillis(),blkRestore.blk.height,blkRestore.blk.transactions.size) + + case BatchStore => + RepTimeTracer.setStartTime(pe.getSysTag, "storage-handle-noarg", System.currentTimeMillis(),pe.getCurrentHeight,120) + Handler + RepTimeTracer.setEndTime(pe.getSysTag, "storage-handle-noarg", System.currentTimeMillis(),pe.getCurrentHeight,120) case _ => //ignore } diff --git a/src/main/scala/rep/storage/ImpDataAccess.scala b/src/main/scala/rep/storage/ImpDataAccess.scala index 30162cd2..de82ce8f 100644 --- a/src/main/scala/rep/storage/ImpDataAccess.scala +++ b/src/main/scala/rep/storage/ImpDataAccess.scala @@ -35,6 +35,7 @@ import rep.network.consensus.util.BlockHelp import rep.log.RepLogger import rep.utils.SerializeUtils.deserialise import rep.storage.util.pathUtil +import rep.log.RepTimeTracer /** @@ -413,7 +414,7 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName) * @param newblock Block 待判断的块,lastblock Block 已知的最后区块 * @return 如果是最后的区块,返回true;否则,返回false */ - private def isLastBlock(newblock: Block, lastblock: Block): Boolean = { + private def isLastBlock(newblock: Block, lastblock: blockindex): Boolean = { var b: Boolean = false if (lastblock == null) { if (newblock.previousBlockHash.isEmpty()) { @@ -421,12 +422,12 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName) } else { b = false } - } else { val prve = newblock.previousBlockHash.toStringUtf8() //目前直接从上一个块中获取 - val cur = lastblock.hashOfBlock.toStringUtf8() - if (prve.equals(cur) && (lastblock.height + 1) == newblock.height) { + //val cur = lastblock.hashOfBlock.toStringUtf8() + //if (prve.equals(cur) && (lastblock.height + 1) == newblock.height) { + if(prve == lastblock.getBlockHash()){ b = true } else { b = false @@ -537,18 +538,25 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName) val oldh = getBlockHeight() val oldno = this.getMaxFileNo() val oldtxnumber = this.getBlockAllTxNumber() - var prevblock: Block = null + var prevblock: blockindex = null if (oldh > 0) { - val tbs = this.getBlockByHeight(oldh) - prevblock = Block.parseFrom(tbs) + RepTimeTracer.setStartTime(this.SystemName, "storage-save-get-preblock", System.currentTimeMillis(),block.height,block.transactions.size) + //val tbs = this.getBlockByHeight(oldh) + prevblock = getBlockIdxByHeight(block.height-1) + //prevblock = Block.parseFrom(tbs) + RepTimeTracer.setEndTime(this.SystemName, "storage-save-get-preblock", System.currentTimeMillis(),block.height,block.transactions.size) } if (isLastBlock(block, prevblock)) { try { this.BeginTrans + RepTimeTracer.setStartTime(this.SystemName, "storage-save-write-operlog", System.currentTimeMillis(),block.height,block.transactions.size) WriteOperLogToDBWithRestoreBlock(block) + RepTimeTracer.setEndTime(this.SystemName, "storage-save-write-operlog", System.currentTimeMillis(),block.height,block.transactions.size) + RepTimeTracer.setStartTime(this.SystemName, "storage-save-commit", System.currentTimeMillis(),block.height,block.transactions.size) if (this.commitAndAddBlock(block, oldh, oldno, oldtxnumber)) { + RepTimeTracer.setEndTime(this.SystemName, "storage-save-commit", System.currentTimeMillis(),block.height,block.transactions.size) this.CommitTrans (true, block.height, oldtxnumber + block.transactions.length, block.hashOfBlock.toStringUtf8(), block.previousBlockHash.toStringUtf8(), block.stateHash.toStringUtf8()) } else { @@ -635,8 +643,9 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName) this.setBlockAllTxNumber(newtxnumber) //jiangbuyun modify 20180430,块写入文件系统时,增加块长度写入文件中,方便以后没有leveldb时,可以完全依靠块文件快速恢复系统,该位置实现字节数组的合并 //bhelp.writeBlock(bidx.getBlockFileNo(), bidx.getBlockFilePos(), rbb) + RepTimeTracer.setStartTime(this.SystemName, "storage-save-write-file", System.currentTimeMillis(),block.height,block.transactions.size) filemgr.writeBlock(bidx.getBlockFileNo(), bidx.getBlockFilePos() - 8, pathUtil.longToByte(blenght) ++ rbb) - + RepTimeTracer.setEndTime(this.SystemName, "storage-save-write-file", System.currentTimeMillis(),block.height,block.transactions.size) b = true RepLogger.trace( RepLogger.Storager_Logger, diff --git a/src/main/scala/rep/storage/block/BlockFileMgr.scala b/src/main/scala/rep/storage/block/BlockFileMgr.scala index 5a383e30..21f7f4d7 100644 --- a/src/main/scala/rep/storage/block/BlockFileMgr.scala +++ b/src/main/scala/rep/storage/block/BlockFileMgr.scala @@ -6,6 +6,7 @@ import rep.storage.util.pathUtil class BlockFileMgr(val SystemName: String) { private var bw: BlockFileWriter = null + private var br: BlockFileReader = new BlockFileReader(this.SystemName) private def checkFileWriter(fileno: Long) = { synchronized { @@ -84,8 +85,8 @@ class BlockFileMgr(val SystemName: String) { * @return 返回读取的区块字节数组 */ def readBlock(fileno: Long, startpos: Long, length: Int): Array[Byte] = { - var reader = new BlockFileReader(this.SystemName) - reader.readBlock(fileno, startpos, length) + //var reader = new BlockFileReader(this.SystemName) + br.readBlock(fileno, startpos, length) } def longToByte(number:Long):Array[Byte]={ diff --git a/src/main/scala/rep/storage/block/BlockFileReader.scala b/src/main/scala/rep/storage/block/BlockFileReader.scala index 9f66b597..599025e0 100644 --- a/src/main/scala/rep/storage/block/BlockFileReader.scala +++ b/src/main/scala/rep/storage/block/BlockFileReader.scala @@ -8,7 +8,35 @@ import java.nio.channels.FileChannel; class BlockFileReader(val SystemName:String) { private val FileName = "Repchain_BlockFile_" + private val BlockDataPath = StoreConfig4Scala.getBlockPath(SystemName) + private var rf: RandomAccessFile = null; + private var channel: FileChannel = null; + private var fileindex:Long = 0 + + private def openChannel(fileno:Long)={ + if(rf == null || (rf != null && fileindex != fileno)){ + createChannel(fileno) + } + } + + private def createChannel(fileno:Long)={ + synchronized { + this.FreeResouce + val fn4path = this.BlockDataPath + File.separator + FileName + fileno; + try { + var f = new File(fn4path) + if (f.exists()) { + rf = new RandomAccessFile(fn4path, "r"); + channel = rf.getChannel(); + fileindex = fileno + } + } catch { + case e: Exception => throw e + } + } + } + /** * @author jiangbuyun * @version 1.0 @@ -19,17 +47,12 @@ class BlockFileReader(val SystemName:String) { */ def readBlock(fileno: Long, startpos: Long, length: Int): Array[Byte] = { var rb: Array[Byte] = null - val np = StoreConfig4Scala.getBlockPath(SystemName) + File.separator + FileName + fileno + synchronized { - var rf: RandomAccessFile = null - var channel: FileChannel = null + try { - var f = new File(np) - if (!f.exists()) { - rb - } else { - rf = new RandomAccessFile(np, "r") - channel = rf.getChannel() + openChannel(fileno) + if(channel != null){ channel.position(startpos) var buf: ByteBuffer = ByteBuffer.allocate(length) channel.read(buf) @@ -40,27 +63,34 @@ class BlockFileReader(val SystemName:String) { case e: Exception => e.printStackTrace() throw e - } finally { - if (channel != null) { - try { - channel.close(); - } catch { - case e: Exception => - e.printStackTrace() - } - } - - if (rf != null) { - try { - rf.close(); - } catch { - case e: Exception => - e.printStackTrace() - } - } } } rb } + + def FreeResouce = { + if (channel != null) { + try { + channel.close(); + channel = null + } catch { + case e: Exception => + channel = null + e.printStackTrace() + } + } + + if (rf != null) { + try { + rf.close(); + rf = null + } catch { + case e: Exception => + rf = null + e.printStackTrace(); + } + } + this.fileindex = 0 + } } \ No newline at end of file diff --git a/src/main/scala/rep/utils/GlobalUtils.scala b/src/main/scala/rep/utils/GlobalUtils.scala index 5b76ca95..87632b3a 100644 --- a/src/main/scala/rep/utils/GlobalUtils.scala +++ b/src/main/scala/rep/utils/GlobalUtils.scala @@ -81,6 +81,7 @@ object GlobalUtils { val gensisblock = 17 val api = 18 val transactiondispatcher = 19 + val dispatchofRecvendorsement = 20 } diff --git a/src/test/scala/rep/storage/test/blockDataCheck.scala b/src/test/scala/rep/storage/test/blockDataCheck.scala index 16630f7b..f835d083 100644 --- a/src/test/scala/rep/storage/test/blockDataCheck.scala +++ b/src/test/scala/rep/storage/test/blockDataCheck.scala @@ -253,9 +253,9 @@ object blockDataCheck extends App { //writePretty(b.endorsements) } - println(readBlockToString(da4,1)) + /*println(readBlockToString(da4,1)) println(readBlockToString(da4,2)) - println(readBlockToString(da4,3)) + println(readBlockToString(da4,3))*/ /*da5.FindByLike("rechain_", 1).foreach(f=>{ println(f._1) @@ -282,6 +282,13 @@ object blockDataCheck extends App { } } + var l : Long = 119649 + for(i<-0 to 10){ + + getblockerForheight(da1, l) + l = l + 1 + } + def getblockerForheight(da: ImpDataAccess, h: Long) = { var nodes = new Array[String](5) nodes(0) = "12110107bi45jh675g.node2" @@ -292,7 +299,7 @@ object blockDataCheck extends App { val b = da.getBlock4ObjectByHeight(h) val candidatorCur = candidators(da.getSystemName, b.hashOfBlock.toStringUtf8(), nodes.toSet, Sha256.hash(b.hashOfBlock.toStringUtf8())) - println(s"height=$ch,systemname=${da.SystemName},${candidatorCur.mkString("|")}") + println(s"height=$h,systemname=${da.SystemName},${candidatorCur.mkString("|")}") } ///////////////////////////////////////vote code/////////////////////////////////////////////////