重构代码19--增加背书的actor种类,背书签名actor,背书交易执行actor,交易签名验证actor

This commit is contained in:
wuwei1972 2019-04-16 09:57:20 +08:00
parent c1164c0ccc
commit 5a1139b2d8
10 changed files with 357 additions and 61 deletions

View File

@ -128,7 +128,7 @@ system {
block {
//投票选举重试间隔
vote_retry_delay = 100
vote_retry_delay = 200
//投票重试无果后等待时间
//waiting_delay = 3600000
waiting_delay = 360

View File

@ -76,6 +76,12 @@ abstract class ModuleBase(name: String) extends Actor with ClusterActor with B
case "confirmerofblock" => 16
case "gensisblock" => 17
case "api" => 18
case "verifytransofsigner" => 19
case "verifytransofexecutor" => 20
case "verifyblockendorser" => 21
case _ => 0
}
}

View File

@ -0,0 +1,40 @@
package rep.network.consensus.endorse
import akka.actor.{ Address}
import rep.protos.peer.{Signature,Block}
object EndorseMsg {
//背书请求者消息
case class RequesterOfEndorsement(blc: Block, blocker: String, endorer: Address)
//给背书人的背书消息
case class EndorsementInfo(blc: Block, blocker: String)
//背书收集者消息
case class CollectEndorsement(blc: Block, blocker: String)
//背书人返回的背书结果
case class ResultOfEndorsed(result: Boolean, endor: Signature, BlockHash: String)
//背书请求者返回的结果
case class ResultOfEndorseRequester(result: Boolean, endor: Signature, BlockHash: String, endorser: Address)
case class verifyTransSignOfEndorsement(blc: Block, blocker: String)
case class verifyTransExeOfEndorsement(blc: Block, blocker: String)
case class VerfiyBlockEndorseOfEndorsement(blc: Block, blocker: String)
case class VerifyResultOfEndorsement(blockhash:String,blocker:String,verifyType:Int,result:Boolean)
case object VerifyTypeOfEndorsement{
val transSignVerify = 1
val transExeVerify = 2
val endorsementVerify = 3
}
case class VerifyCacher(blc: Block, blocker: String,result:VerifyResultOfEndorsement)
case object ConsensusOfVote
}

View File

@ -22,15 +22,15 @@ import scala.util.control.Breaks._
import scala.util.control.Exception.Finally
import java.util.concurrent.ConcurrentHashMap
import rep.log.trace.LogType
import rep.network.consensus.endorse.EndorseMsg.{ EndorsementInfo, ResultOfEndorsed }
import rep.network.consensus.endorse.EndorseMsg.{ EndorsementInfo, ResultOfEndorsed,verifyTransSignOfEndorsement,verifyTransExeOfEndorsement,VerfiyBlockEndorseOfEndorsement,VerifyResultOfEndorsement,VerifyTypeOfEndorsement,ConsensusOfVote }
import rep.network.consensus.block.Blocker.{PreTransBlock, PreTransBlockResult}
import rep.network.consensus.util.{ BlockVerify, BlockHelp }
object Endorser {
def props(name: String): Props = Props(classOf[Endorser], name)
case class CacheOfEndorsement(endorseinfo:EndorsementInfo,requester:ActorRef,
repeatOfChecked:Boolean,transSignOfChecked:Boolean,EndorseSignOfChecked:Boolean,
transExeOfChecked:Boolean,result:ResultOfEndorsed)
repeatOfChecked:Int,transSignOfChecked:Int,EndorseSignOfChecked:Int,
transExeOfChecked:Int,result:ResultOfEndorsed)
}
class Endorser(moduleName: String) extends ModuleBase(moduleName) {
@ -62,57 +62,8 @@ class Endorser(moduleName: String) extends ModuleBase(moduleName) {
}
isRepeat
}
private def ExecuteTransactionOfBlock(block: Block): Boolean = {
try {
val future = pe.getActorRef(ActorType.preloaderoftransaction) ? PreTransBlock(block,"endorser")
val result = Await.result(future, timeout.duration).asInstanceOf[PreTransBlockResult]
if (result.result) {
var tmpblock = result.blc.withHashOfBlock(block.hashOfBlock)
BlockVerify.VerifyHashOfBlock(tmpblock)
} else {
false
}
} catch {
case e: AskTimeoutException => false
}
}
private def asyncVerifyTransaction(t: Transaction): Future[Boolean] = {
val result = Promise[Boolean]
if (pe.getTransPoolMgr.findTrans(t.id)) {
result.success(true)
} else {
val tmp = BlockVerify.VerifyOneSignOfTrans(t, pe.getSysTag)
if (tmp._1) {
result.success(true)
}else{
result.success(false)
}
}
result.future
}
private def asyncVerifyTransactions(block: Block): Boolean = {
val listOfFuture: Seq[Future[Boolean]] = block.transactions.map(x => {
asyncVerifyTransaction(x)
})
val futureOfList: Future[List[Boolean]] = Future.sequence(listOfFuture.toList)
var result = true
//breakable(
futureOfList.map(x => {
x.foreach(f => {
if (!f) {
result = false
logMsg(LogType.INFO, "endorser verify tran sign is error, break")
//break
}
})
})
//)
result
}
/*
private def checkedOfEndorseCondition(block: Block, blocker: String) = {
if (pe.getSystemStatus == NodeStatus.Endorsing) {
logMsg(LogType.INFO, "endorser recv endorsement")
@ -178,7 +129,7 @@ class Endorser(moduleName: String) extends ModuleBase(moduleName) {
logMsg(LogType.INFO, "endorser verify itself ")
//sender ! ResultOfEndorsed(false, null, block.hashOfBlock.toStringUtf8())
}
}
}*/
private def isAllowEndorse(info:EndorsementInfo):Int={
if(info.blocker == pe.getSysTag){
@ -210,7 +161,17 @@ class Endorser(moduleName: String) extends ModuleBase(moduleName) {
case 0 =>
//entry endorse
if(this.cache.result == null){
if(this.cache.transSignOfChecked == 0) pe.getActorRef(ActorType.verifytransofsigner) ! verifyTransSignOfEndorsement(this.cache.endorseinfo.blc, this.cache.endorseinfo.blocker)
if(this.cache.transExeOfChecked == 0) pe.getActorRef(ActorType.verifytransofexecutor) ! verifyTransExeOfEndorsement(this.cache.endorseinfo.blc, this.cache.endorseinfo.blocker)
if(this.cache.EndorseSignOfChecked == 0) pe.getActorRef(ActorType.verifyblockendorser) ! VerfiyBlockEndorseOfEndorsement(this.cache.endorseinfo.blc, this.cache.endorseinfo.blocker)
if(this.cache.repeatOfChecked == 0){
val r = this.hasRepeatOfTrans(this.cache.endorseinfo.blc.transactions)
if(r){
this.cache = CacheOfEndorsement(this.cache.endorseinfo,this.cache.requester,1,this.cache.transSignOfChecked,this.cache.EndorseSignOfChecked,this.cache.transExeOfChecked,null)
}else{
this.cache = CacheOfEndorsement(this.cache.endorseinfo,this.cache.requester,2,this.cache.transSignOfChecked,this.cache.EndorseSignOfChecked,this.cache.transExeOfChecked,null)
}
}
}else{
//send result to endorse requester
if(this.cache.result.result){
@ -236,7 +197,7 @@ class Endorser(moduleName: String) extends ModuleBase(moduleName) {
private def CheckEndorseInfo(info:EndorsementInfo)={
if(this.cache == null){
//直接接收背书并进行处理
this.cache = CacheOfEndorsement(info,sender,false,false,false,false,null)
this.cache = CacheOfEndorsement(info,sender,0,0,0,0,null)
logMsg(LogType.INFO, "new endorsment")
}else{
//已经缓存背书了检查是否是同一个背书请求
@ -245,17 +206,57 @@ class Endorser(moduleName: String) extends ModuleBase(moduleName) {
logMsg(LogType.INFO, "endorsement is same")
}else{
//不是同一个背书
this.cache = CacheOfEndorsement(info,sender,false,false,false,false,null)
this.cache = CacheOfEndorsement(info,sender,0,0,0,0,null)
logMsg(LogType.INFO, "new endorsment,refresh cache")
}
}
EndorseHandler
}
private def VerifyOfHandler(vr:VerifyResultOfEndorsement)={
if(this.cache != null && this.cache.endorseinfo.blc != null && this.cache.endorseinfo.blc.hashOfBlock.toStringUtf8() == vr.blockhash && this.cache.endorseinfo.blocker==vr.blocker){
vr.verifyType match{
case VerifyTypeOfEndorsement.transSignVerify=>
if(vr.result){
this.cache = CacheOfEndorsement(this.cache.endorseinfo,this.cache.requester,this.cache.repeatOfChecked,1,this.cache.EndorseSignOfChecked,this.cache.transExeOfChecked,null)
}else{
this.cache = CacheOfEndorsement(this.cache.endorseinfo,this.cache.requester,this.cache.repeatOfChecked,2,this.cache.EndorseSignOfChecked,this.cache.transExeOfChecked,null)
}
case VerifyTypeOfEndorsement.transExeVerify=>
if(vr.result){
this.cache = CacheOfEndorsement(this.cache.endorseinfo,this.cache.requester,this.cache.repeatOfChecked,this.cache.transSignOfChecked,this.cache.EndorseSignOfChecked,1,null)
}else{
this.cache = CacheOfEndorsement(this.cache.endorseinfo,this.cache.requester,this.cache.repeatOfChecked,this.cache.transSignOfChecked,this.cache.EndorseSignOfChecked,2,null)
}
case VerifyTypeOfEndorsement.endorsementVerify=>
if(vr.result){
this.cache = CacheOfEndorsement(this.cache.endorseinfo,this.cache.requester,this.cache.repeatOfChecked,this.cache.transSignOfChecked,1,this.cache.transExeOfChecked,null)
}else{
this.cache = CacheOfEndorsement(this.cache.endorseinfo,this.cache.requester,this.cache.repeatOfChecked,this.cache.transSignOfChecked,2,this.cache.transExeOfChecked,null)
}
}
if(this.cache.repeatOfChecked == 1 && this.cache.transSignOfChecked == 1 && this.cache.transExeOfChecked == 1 && this.cache.EndorseSignOfChecked == 1){
this.cache = CacheOfEndorsement(this.cache.endorseinfo,this.cache.requester,this.cache.repeatOfChecked,this.cache.transSignOfChecked,this.cache.EndorseSignOfChecked,this.cache.transExeOfChecked,
ResultOfEndorsed(true, BlockHelp.SignBlock(this.cache.endorseinfo.blc, pe.getSysTag), this.cache.endorseinfo.blc.hashOfBlock.toStringUtf8()))
}else if(this.cache.repeatOfChecked == 2 || this.cache.transSignOfChecked == 2 || this.cache.transExeOfChecked == 2 || this.cache.EndorseSignOfChecked == 2){
this.cache = CacheOfEndorsement(this.cache.endorseinfo,this.cache.requester,this.cache.repeatOfChecked,this.cache.transSignOfChecked,this.cache.EndorseSignOfChecked,this.cache.transExeOfChecked,
ResultOfEndorsed(false, BlockHelp.SignBlock(this.cache.endorseinfo.blc, pe.getSysTag), this.cache.endorseinfo.blc.hashOfBlock.toStringUtf8()))
}
EndorseHandler
}
}
override def receive = {
//Endorsement block
case EndorsementInfo(block, blocker) =>
checkedOfEndorseCondition(block, blocker)
CheckEndorseInfo(EndorsementInfo(block, blocker))
case VerifyResultOfEndorsement(blockhash:String,blocker:String,verifyType:Int,result:Boolean) =>
VerifyOfHandler(VerifyResultOfEndorsement(blockhash,blocker,verifyType,result))
case ConsensusOfVote=>
if(this.cache != null){
EndorseHandler
}
case _ => //ignore
}

View File

@ -0,0 +1,63 @@
package rep.network.consensus.endorse
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import akka.pattern.AskTimeoutException
import scala.concurrent._
import akka.actor.{ Actor, ActorRef, Props, Address, ActorSelection }
import com.google.protobuf.ByteString
import com.google.protobuf.timestamp.Timestamp
import rep.app.conf.{ SystemProfile, TimePolicy }
import rep.network.base.ModuleBase
import rep.network.consensus.endorse.EndorseMsg.{ VerifyResultOfEndorsement, VerfiyBlockEndorseOfEndorsement,VerifyTypeOfEndorsement,VerifyCacher}
import rep.network.tools.PeerExtension
import rep.network.Topic
import rep.protos.peer._
import rep.utils._
import rep.log.trace.LogType
import akka.pattern.AskTimeoutException
import rep.network.consensus.util.BlockVerify
import scala.util.control.Breaks
import rep.utils.GlobalUtils.EventType
object VerifyBlockEndorser {
def props(name: String): Props = Props(classOf[VerifyBlockEndorser], name)
}
class VerifyBlockEndorser(moduleName: String) extends ModuleBase(moduleName) {
import context.dispatcher
import scala.collection.breakOut
import scala.concurrent.duration._
var cache : VerifyCacher = null
override def preStart(): Unit = {
logMsg(LogType.INFO, "VerifyTransOfSigner Start")
}
private def Handler={
if(this.cache.result == null){
val result = BlockVerify.VerifyAllEndorseOfBlock(this.cache.blc, pe.getSysTag)
this.cache = VerifyCacher(this.cache.blc, this.cache.blocker,VerifyResultOfEndorsement(this.cache.blc.hashOfBlock.toStringUtf8(),this.cache.blocker,VerifyTypeOfEndorsement.endorsementVerify,result._1))
}
sender ! this.cache.result
}
override def receive = {
case VerfiyBlockEndorseOfEndorsement(blc: Block, blocker: String) =>
if(this.cache == null){
logMsg(LogType.INFO, "endorse sign verify is new")
this.cache = VerifyCacher(blc, blocker,null)
}else if(blc.hashOfBlock.toStringUtf8() == this.cache.blc.hashOfBlock.toStringUtf8() && blocker == this.cache.blocker){
logMsg(LogType.INFO, "endorse sign verify is exist")
}else{
logMsg(LogType.INFO, "endorse sign verify is repeat")
this.cache = VerifyCacher(blc, blocker,null)
}
Handler
case _ => //ignore
}
}

View File

@ -0,0 +1,81 @@
package rep.network.consensus.endorse
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import akka.pattern.AskTimeoutException
import scala.concurrent._
import akka.actor.{ Actor, ActorRef, Props, Address, ActorSelection }
import com.google.protobuf.ByteString
import com.google.protobuf.timestamp.Timestamp
import rep.app.conf.{ SystemProfile, TimePolicy }
import rep.network.base.ModuleBase
import rep.network.consensus.endorse.EndorseMsg.{ VerifyResultOfEndorsement, verifyTransExeOfEndorsement,VerifyTypeOfEndorsement,VerifyCacher}
import rep.network.tools.PeerExtension
import rep.network.Topic
import rep.protos.peer._
import rep.utils._
import rep.log.trace.LogType
import akka.pattern.AskTimeoutException
import scala.util.control.Breaks
import rep.utils.GlobalUtils.EventType
import rep.network.consensus.block.Blocker.{PreTransBlock, PreTransBlockResult}
import rep.network.consensus.util.{ BlockVerify, BlockHelp }
import rep.utils.GlobalUtils.{ ActorType}
object VerifyTransOfExecutor {
def props(name: String): Props = Props(classOf[VerifyTransOfExecutor], name)
}
class VerifyTransOfExecutor(moduleName: String) extends ModuleBase(moduleName) {
import context.dispatcher
import scala.collection.breakOut
import scala.concurrent.duration._
implicit val timeout = Timeout(TimePolicy.getTimeoutPreload seconds)
var cache : VerifyCacher = null
override def preStart(): Unit = {
logMsg(LogType.INFO, "VerifyTransOfSigner Start")
}
private def ExecuteTransactionOfBlock(block: Block): Boolean = {
try {
val future = pe.getActorRef(ActorType.preloaderoftransaction) ? PreTransBlock(block,"endorser")
val result = Await.result(future, timeout.duration).asInstanceOf[PreTransBlockResult]
if (result.result) {
var tmpblock = result.blc.withHashOfBlock(block.hashOfBlock)
BlockVerify.VerifyHashOfBlock(tmpblock)
} else {
false
}
} catch {
case e: AskTimeoutException => false
}
}
private def Handler={
if(this.cache.result == null){
val result = ExecuteTransactionOfBlock(this.cache.blc)
this.cache = VerifyCacher(this.cache.blc, this.cache.blocker,VerifyResultOfEndorsement(this.cache.blc.hashOfBlock.toStringUtf8(),this.cache.blocker,VerifyTypeOfEndorsement.transExeVerify,result))
}
sender ! this.cache.result
}
override def receive = {
case verifyTransExeOfEndorsement(blc: Block, blocker: String) =>
if(this.cache == null){
logMsg(LogType.INFO, "trans exe verify is new")
this.cache = VerifyCacher(blc, blocker,null)
}else if(blc.hashOfBlock.toStringUtf8() == this.cache.blc.hashOfBlock.toStringUtf8() && blocker == this.cache.blocker){
logMsg(LogType.INFO, "trans exe verify is exist")
}else{
logMsg(LogType.INFO, "trans exe verify is repeat")
this.cache = VerifyCacher(blc, blocker,null)
}
Handler
case _ => //ignore
}
}

View File

@ -0,0 +1,95 @@
package rep.network.consensus.endorse
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import akka.pattern.AskTimeoutException
import scala.concurrent._
import akka.actor.{ Actor, ActorRef, Props, Address, ActorSelection }
import com.google.protobuf.ByteString
import com.google.protobuf.timestamp.Timestamp
import rep.app.conf.{ SystemProfile, TimePolicy }
import rep.network.base.ModuleBase
import rep.network.consensus.endorse.EndorseMsg.{ VerifyResultOfEndorsement, verifyTransSignOfEndorsement,VerifyTypeOfEndorsement,VerifyCacher}
import rep.network.tools.PeerExtension
import rep.network.Topic
import rep.protos.peer._
import rep.utils._
import rep.log.trace.LogType
import akka.pattern.AskTimeoutException
import rep.network.consensus.util.BlockVerify
import scala.util.control.Breaks
import rep.utils.GlobalUtils.EventType
object VerifyTransOfSigner {
def props(name: String): Props = Props(classOf[VerifyTransOfSigner], name)
}
class VerifyTransOfSigner(moduleName: String) extends ModuleBase(moduleName) {
import context.dispatcher
import scala.collection.breakOut
import scala.concurrent.duration._
var cache : VerifyCacher = null
override def preStart(): Unit = {
logMsg(LogType.INFO, "VerifyTransOfSigner Start")
}
private def asyncVerifyTransaction(t: Transaction): Future[Boolean] = {
val result = Promise[Boolean]
if (pe.getTransPoolMgr.findTrans(t.id)) {
result.success(true)
} else {
val tmp = BlockVerify.VerifyOneSignOfTrans(t, pe.getSysTag)
if (tmp._1) {
result.success(true)
}else{
result.success(false)
}
}
result.future
}
private def asyncVerifyTransactions(block: Block): Boolean = {
val listOfFuture: Seq[Future[Boolean]] = block.transactions.map(x => {
asyncVerifyTransaction(x)
})
val futureOfList: Future[List[Boolean]] = Future.sequence(listOfFuture.toList)
var result = true
futureOfList.map(x => {
x.foreach(f => {
if (!f) {
result = false
logMsg(LogType.INFO, "endorser verify tran sign is error, break")
}
})
})
result
}
private def Handler={
if(this.cache.result == null){
val result = asyncVerifyTransactions(this.cache.blc)
this.cache = VerifyCacher(this.cache.blc, this.cache.blocker,VerifyResultOfEndorsement(this.cache.blc.hashOfBlock.toStringUtf8(),this.cache.blocker,VerifyTypeOfEndorsement.transSignVerify,result))
}
sender ! this.cache.result
}
override def receive = {
case verifyTransSignOfEndorsement(blc: Block, blocker: String) =>
if(this.cache == null){
logMsg(LogType.INFO, "trans sign verify is new")
this.cache = VerifyCacher(blc, blocker,null)
}else if(blc.hashOfBlock.toStringUtf8() == this.cache.blc.hashOfBlock.toStringUtf8() && blocker == this.cache.blocker){
logMsg(LogType.INFO, "trans sign verify is exist")
}else{
logMsg(LogType.INFO, "trans sign verify is repeat")
this.cache = VerifyCacher(blc, blocker,null)
}
Handler
case _ => //ignore
}
}

View File

@ -13,6 +13,7 @@ import rep.network.util.NodeHelp
import rep.network.consensus.block.Blocker.{ CreateBlock }
import rep.network.sync.SyncMsg.StartSync
import rep.network.consensus.block.GenesisBlocker.GenesisBlock
import rep.network.consensus.endorse.EndorseMsg.ConsensusOfVote
object Voter {
@ -76,6 +77,7 @@ class Voter(moduleName: String) extends ModuleBase(moduleName) with CRFDVoter {
pe.getActorRef(ActorType.blocker) ! CreateBlock
} else {
//自己是背书人
pe.getActorRef(ActorType.endorser) ! ConsensusOfVote
pe.setSystemStatus(NodeStatus.Endorsing)
}
}

View File

@ -30,7 +30,7 @@ import rep.network.cluster.MemberListener
import rep.network.sync.{ SynchronizeResponser, SynchronizeRequester }
import rep.network.consensus.block.{ GenesisBlocker, ConfirmOfBlock, EndorseCollector, Blocker }
import rep.network.consensus.endorse.Endorser
import rep.network.consensus.endorse.{Endorser,VerifyBlockEndorser,VerifyTransOfExecutor,VerifyTransOfSigner}
import rep.network.consensus.transaction.PreloaderForTransaction
import rep.network.consensus.vote.Voter
import rep.sc.TransProcessor
@ -90,6 +90,11 @@ class ModuleManager(moduleName: String, sysTag: String, enableStatistic: Boolean
context.actorOf(GenesisBlocker.props("gensisblock"), "gensisblock")
context.actorOf(ConfirmOfBlock.props("confirmerofblock"), "confirmerofblock")
context.actorOf(EndorseCollector.props("endorsementcollectioner"), "endorsementcollectioner")
context.actorOf(VerifyTransOfSigner.props("verifytransofsigner"), "verifytransofsigner")
context.actorOf(VerifyTransOfExecutor.props("verifytransofexecutor"), "verifytransofexecutor")
context.actorOf(VerifyBlockEndorser.props("verifyblockendorser"), "verifyblockendorser")
context.actorOf(Endorser.props("endorser"), "endorser")
context.actorOf(PreloaderForTransaction.props("preloaderoftransaction", context.actorOf(TransProcessor.props("sandbox_for_Preload", self), "sandboxProcessor")), "preloaderoftransaction")
//context.actorOf(Endorser.props("endorser"), "endorser")

View File

@ -80,6 +80,9 @@ object GlobalUtils {
val confirmerofblock = 16
val gensisblock = 17
val api = 18
val verifytransofsigner=19
val verifytransofexecutor=20
val verifyblockendorser=21
}