repchain2.0-update88:修改交易接收逻辑,检查交易大小、检查交易是否重复、检查交易签名、根据配置项确定交易是否预执行,最后根据配置确定交易是否广播,不广播直接加入自身交易池。

This commit is contained in:
jiangbuyun 2022-07-24 15:55:53 +08:00
parent df9d7714ea
commit 4d0f502c0d
2 changed files with 66 additions and 15 deletions

View File

@ -17,6 +17,7 @@
package rep.api.rest
import akka.util.Timeout
import scala.concurrent.duration._
import rep.crypto._
import org.json4s._
@ -25,12 +26,19 @@ import akka.actor.Props
import rep.log.RepLogger
import rep.network.base.ModuleBase
import rep.network.consensus.byzantium.ConsensusCondition
import rep.proto.rc2.{ActionResult, ChaincodeId, Event, Transaction}
import rep.network.module.ModuleActorType
import rep.proto.rc2.{ActionResult, ChaincodeId, Event, Transaction, TransactionResult}
import rep.sc.Sandbox.DoTransactionResult
import rep.sc.TypeOfSender
import rep.storage.chain.block.BlockSearcher
import rep.storage.db.factory.DBFactory
import rep.utils.GlobalUtils.EventType
import rep.utils.{MessageToJson, SerializeUtils}
import scala.concurrent.Await
import akka.pattern.ask
import rep.sc.SandboxDispatcher.DoTransaction
/**
* RestActor伴生object包含可接受的传入消息定义以及处理的返回结果定义
* 以及用于建立Tranaction检索Tranaction的静态方法
@ -184,6 +192,24 @@ class RestActor(moduleName: String) extends ModuleBase(moduleName) {
}
}
private def sendTransaction(t:Transaction):Unit={
//预执行正常,提交并广播交易
if (config.isBroadcastTransaction) {
mediator ! Publish(Topic.Transaction, t)
sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Transaction, Event.Action.TRANSACTION)
sender ! PostResult(t.id, None, None)
}else{
val pool = pe.getRepChainContext.getTransactionPool
if(!pool.hasOverflowed){
pool.addTransactionToCache(t)
sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Transaction, Event.Action.TRANSACTION)
sender ! PostResult(t.id, None, None)
}else{
sender ! PostResult(t.id, None, Option("code=901,reason=交易池已经满了"))
}
}
}
// 先检查交易大小然后再检查交易是否已存在再去验证签名如果没有问题则广播
def preTransaction(t: Transaction): Unit = {
val tranLimitSize = config.getBlockMaxLength / 3
@ -191,23 +217,33 @@ class RestActor(moduleName: String) extends ModuleBase(moduleName) {
sender ! PostResult(t.id, None, Option(s"交易大小超出限制: ${tranLimitSize},请重新检查"))
} else if (!this.consensusCondition.CheckWorkConditionOfSystem(pe.getRepChainContext.getNodeMgr.getStableNodes.size)) {
sender ! PostResult(t.id, None, Option("共识节点数目太少,暂时无法处理交易"))
} else if (pe.getRepChainContext.getTransactionPool.isExistInCache(t.id) || sr.isExistTransactionByTxId(t.id)) {
sender ! PostResult(t.id, None, Option(s"交易ID重复, ID为${t.id}"))
} else {
try {
//if (pe.getTransPoolMgr.getTransLength() < SystemProfile.getMaxCacheTransNum) {
//pe.getTransPoolMgr.putTran(t,pe.getSysTag)
if (config.isBroadcastTransaction) {
mediator ! Publish(Topic.Transaction, t)
val sig = t.signature.get.signature.toByteArray
val tOutSig = t.clearSignature
val certId = t.signature.get.certId.get
if (pe.getRepChainContext.getSignTool.verify(sig, tOutSig.toByteArray, certId)) {
RepLogger.info(RepLogger.Business_Logger, s"验证签名成功txid: ${t.id},creditCode: ${t.signature.get.getCertId.creditCode}, certName: ${t.signature.get.getCertId.certName}")
if (pe.getRepChainContext.getConfig.hasPreloadOfApi) {
val future = pe.getActorRef(ModuleActorType.ActorType.transactiondispatcher) ? DoTransaction(Array(t).toSeq, "api_" + t.id, TypeOfSender.FromAPI)
val result = Await.result(future, timeout.duration).asInstanceOf[Seq[TransactionResult]]
val rv = result(0).err
rv.get.code match {
case 0 =>
sendTransaction(t)
case _ =>
//预执行异常,废弃交易向api调用者发送异常
sender ! PostResult(t.id, None, Option("code="+rv.get.code+",reason="+rv.get.reason))
}
} else {
sendTransaction(t)
}
} else {
RepLogger.info(RepLogger.Business_Logger, s"验证签名出错txid: ${t.id},creditCode: ${t.signature.get.getCertId.creditCode}, certName: ${t.signature.get.getCertId.certName}")
sender ! PostResult(t.id, None, Option("验证签名出错"))
}
sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Transaction, Event.Action.TRANSACTION)
sender ! PostResult(t.id, None, None)
/*} else {
// 交易缓存池已满不可继续提交交易
sender ! PostResult(t.id, None, Option(s"交易缓存池已满,容量为${pe.getTransPoolMgr.getTransLength()},不可继续提交交易"))
}*/
} catch {
case e: Exception =>
sender ! PostResult(t.id, None, Option(e.getMessage))

View File

@ -142,6 +142,21 @@ class PoolOfTransaction(ctx:RepChainSystemContext) {
r
}
/**
* @author jiangbuyun
* @version 2.0
* @since 2022-04-15
* @category 根据交易Id检查交易是否已经出块
* @param tid:String 交易Id
* @return 交易已经出块返回true否则false
* */
def isExistInCache(tid:String):Boolean={
var r = false
if(this.transactionCaches.containsKey(tid)){
r = true
}
r
}
/**
* @author jiangbuyun
* @version 2.0