repchain2.0-update91:完善CFRD共识算法,当某些节点没有交易,其他节点有交易时,有交易的节点将广播交易到其他节点。

This commit is contained in:
jiangbuyun 2022-07-25 15:27:20 +08:00
parent ffc7a0e69d
commit 5f78895c12
3 changed files with 106 additions and 14 deletions

View File

@ -41,6 +41,7 @@ object Topic {
val SyncOfBlock = "SyncOfBlock"
val VoteTransform = "VoteTransform"
val VoteSynchronized = "VoteSynchronized"
val MessageWithZeroTransaction = "MessageWithZeroTransaction"
}
object InnerTopic {
@ -103,7 +104,10 @@ class PeerHelper(name: String) extends ModuleBase(name) {
"transfer", Seq(li2))
//pe.getActorRef(ModuleActorType.ActorType.transactionpool) ! t3
sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Transaction, Event.Action.TRANSACTION)
mediator ! Publish(Topic.Transaction, t3)
if(!pe.getRepChainContext.getTransactionPool.hasOverflowed)pe.getRepChainContext.getTransactionPool.addTransactionToCache(t3)
if(pe.getRepChainContext.getConfig.isBroadcastTransaction)
mediator ! Publish(Topic.Transaction, t3)
RepLogger.trace(RepLogger.System_Logger,this.getLogMsgPrefix(s"########################create transaction id =${t3.id}"))
} catch {
case e: RuntimeException => throw e

View File

@ -1,31 +1,72 @@
package rep.network.consensus.cfrd.vote
import akka.actor.Props
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import rep.log.RepLogger
import rep.log.httplog.AlertInfo
import rep.network.autotransaction.Topic
import rep.network.consensus.byzantium.ConsensusCondition
import rep.network.consensus.common.vote.IVoter
import rep.network.module.cfrd.CFRDActorType
import rep.network.consensus.cfrd.MsgOfCFRD.{CreateBlock, ForceVoteInfo, SpecifyVoteHeight, VoteOfBlocker, VoteOfForce, VoteOfReset}
import rep.network.consensus.cfrd.vote.VoterOfCFRD.{CheckZero, RequestWithZeroTransaction, ZeroTransactionRequests}
import rep.network.util.NodeHelp
import rep.network.consensus.common.algorithm.IRandomAlgorithmOfVote
import scala.collection.mutable.ArrayBuffer
/**
* Created by jiangbuyun on 2020/03/17.
* 实现CFRD抽签actor
*/
object VoterOfCFRD{
def props(name: String): Props = Props(classOf[VoterOfCFRD],name)
object VoterOfCFRD {
def props(name: String): Props = Props(classOf[VoterOfCFRD], name)
case object CheckZero
case class RequestWithZeroTransaction(height: Long, systemName: String)
case class ZeroTransactionRequests(height: Long, nodes: ArrayBuffer[String])
}
class VoterOfCFRD(moduleName: String) extends IVoter(moduleName: String) {
import scala.concurrent.duration._
import context.dispatcher
override def preStart(): Unit = {
//注册接收交易为空的广播
if (pe.getRepChainContext.getConfig.getVoteNodeList.contains(pe.getSysTag)) {
//共识节点可以订阅交易为空的广播事件
SubscribeTopic(mediator, self, selfAddr, Topic.MessageWithZeroTransaction, true)
}
RepLogger.info(RepLogger.Consensus_Logger, this.getLogMsgPrefix("VoterOfCFRD module start"))
super.preStart()
}
this.algorithmInVoted = new IRandomAlgorithmOfVote
override def preStart(): Unit = {
RepLogger.info(RepLogger.Vote_Logger, this.getLogMsgPrefix("VoterOfCFRD module start"))
//////////////交易数为零导致无法进行抽签广播交易数为零请求接收到该请求的节点
// 获得了大于1/2的节点请求之后如果本节点有未出块的交易广播一条交易
// 如果当前节点与大于1/2节点的高度不一致发送同步命令进行同步
private var schedulerOfZero: akka.actor.Cancellable = null
private def checkZeroScheduler: Unit = {
if (!checkTranNum) {
if (schedulerOfZero == null) {
this.schedulerOfZero = scheduler.scheduleOnce(15.second, self, VoterOfCFRD.CheckZero)
}
RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag}," +
s"startup scheduler" + "~" + selfAddr))
} else {
if (schedulerOfZero != null) schedulerOfZero.cancel()
this.schedulerOfZero = null
RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag}," +
s"delete scheduler" + "~" + selfAddr))
}
}
override protected def NoticeBlockerMsg: Unit = {
@ -36,7 +77,7 @@ class VoterOfCFRD(moduleName: String) extends IVoter(moduleName: String) {
}
override protected def DelayVote: Unit = {
if(voteCount >= 50)
if (voteCount >= 50)
this.voteCount = 1
else
this.voteCount += 1
@ -46,7 +87,8 @@ class VoterOfCFRD(moduleName: String) extends IVoter(moduleName: String) {
}
override protected def vote(isForce: Boolean,forceInfo:ForceVoteInfo): Unit = {
override protected def vote(isForce: Boolean, forceInfo: ForceVoteInfo): Unit = {
checkZeroScheduler
if (checkTranNum || isForce) {
val currentblockhash = pe.getCurrentBlockHash
val currentheight = pe.getCurrentHeight
@ -112,14 +154,49 @@ class VoterOfCFRD(moduleName: String) extends IVoter(moduleName: String) {
override def receive: Receive = {
case VoteOfBlocker =>
if (NodeHelp.isCandidateNow(pe.getSysTag, pe.getRepChainContext.getSystemCertList.getVoteList)) {
voteMsgHandler(false,null)
voteMsgHandler(false, null)
}
case VoteOfForce=>
voteMsgHandler(true,null)
case VoteOfReset=>
case VoteOfForce =>
voteMsgHandler(true, null)
case VoteOfReset =>
cleanVoteInfo
voteMsgHandler(true,null)
case SpecifyVoteHeight(voteinfo)=>
voteMsgHandler(true,voteinfo)
voteMsgHandler(true, null)
case SpecifyVoteHeight(voteinfo) =>
voteMsgHandler(true, voteinfo)
case CheckZero =>
if (new ConsensusCondition(pe.getRepChainContext).CheckWorkConditionOfSystem(pe.getRepChainContext.getNodeMgr.getStableNodes.size)) {
if (!pe.isSynching) {
if (checkTranNum) {
//如果产生交易了删除定时器什么也不做
RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag}," +
s"CheckZero stop" + "~" + selfAddr))
if (schedulerOfZero != null) schedulerOfZero.cancel()
this.schedulerOfZero = null
} else {
RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag}," +
s"CheckZero ,broadcast RequestWithZeroTransaction " + "~" + selfAddr))
mediator ! Publish(Topic.MessageWithZeroTransaction, RequestWithZeroTransaction(pe.getCurrentHeight, pe.getSysTag))
}
}
}
case RequestWithZeroTransaction(h, sn) =>
recvZeroTransactionHandle(VoterOfCFRD.RequestWithZeroTransaction(h, sn))
}
private def recvZeroTransactionHandle(zt: RequestWithZeroTransaction): Unit = {
RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag}," +
s"recv RequestWithZeroTransaction,height=${zt.height},systemName=${zt.systemName}" + "~" + selfAddr))
if (new ConsensusCondition(pe.getRepChainContext).CheckWorkConditionOfSystem(pe.getRepChainContext.getNodeMgr.getStableNodes.size)) {
if (!pe.isSynching && checkTranNum) {
RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag}," +
s"recv RequestWithZeroTransaction,broadcast transaction" + "~" + selfAddr))
val t = pe.getRepChainContext.getTransactionPool.getRandomTransaction
if (t != null) {
RepLogger.trace(RepLogger.Vote_Logger, this.getLogMsgPrefix(s"sysname=${pe.getSysTag}," +
s"recv RequestWithZeroTransaction,get transaction ,broadcast transaction" + "~" + selfAddr))
mediator ! Publish(Topic.Transaction, t)
}
}
}
}
}

View File

@ -3,13 +3,16 @@ package rep.network.tools.transpool
import java.util
import java.util.concurrent.atomic.{AtomicLong, LongAdder}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
import rep.app.system.RepChainSystemContext
import rep.proto.rc2.Transaction
import rep.storage.chain.block.BlockSearcher
import rep.storage.db.common.ITransactionCallback
import rep.storage.db.factory.DBFactory
import rep.utils.SerializeUtils
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.Breaks.{break, breakable}
/**
@ -243,6 +246,14 @@ class PoolOfTransaction(ctx:RepChainSystemContext) {
this.transactionCaches.get(tid)
}
def getRandomTransaction:Transaction={
val keys = this.transactionCaches.keySet().toArray
val num = Random.nextInt() % keys.length
val id : String = keys(num).asInstanceOf[String]
val t = getTransaction(id)
t
}
/**
* @author jiangbuyun
* @version 2.0