Merge remote-tracking branch 'origin/dev_jby_preview' into yf_preview

This commit is contained in:
brightestboy 2019-09-22 11:28:24 +08:00
commit 1ef4cb5c28
4 changed files with 161 additions and 68 deletions

View File

@ -36,6 +36,7 @@ import rep.utils.{ ActorUtils, GlobalUtils }
import rep.utils.GlobalUtils.EventType
import rep.utils.SerializeUtils
import rep.log.RepLogger
import rep.network.util.NodeHelp
/**
* 交易缓冲池伴生对象
@ -60,6 +61,8 @@ object TransactionPool {
class TransactionPool(moduleName: String) extends ModuleBase(moduleName) {
import akka.actor.ActorSelection
private val transPoolActorName = "/user/modulemanager/transactionpool"
private var addr4NonUser = ""
val dataaccess: ImpDataAccess = ImpDataAccess.GetDataAccess(pe.getSysTag)
override def preStart(): Unit = {
@ -97,9 +100,10 @@ class TransactionPool(moduleName: String) extends ModuleBase(moduleName) {
val siginfo = sig.signature.toByteArray()
if (SignTool.verify(siginfo, tOutSig.toByteArray, cert, pe.getSysTag)) {
dataAccess.isExistTrans4Txid(t.id) match {
case false => result = true
case true => resultMsg = s"The transaction(${t.id}) is duplicated with txid"
if (dataAccess.isExistTrans4Txid(t.id)) {
resultMsg = s"The transaction(${t.id}) is duplicated with txid"
} else {
result = true
}
} else {
resultMsg = s"The transaction(${t.id}) is not completed"
@ -111,41 +115,48 @@ class TransactionPool(moduleName: String) extends ModuleBase(moduleName) {
CheckedTransactionResult(result, resultMsg)
}
private def addTransToCache(t: Transaction) = {
val checkedTransactionResult = checkTransaction(t, dataaccess)
if (checkedTransactionResult.result) {
//签名验证成功
if (pe.getTransPoolMgr.getTransLength() < 100)
RepLogger.trace(RepLogger.System_Logger, this.getLogMsgPrefix(s"<<<<<<<<<<<<<>>>>>>>>>transaction=${pe.getTransPoolMgr.getTransLength()}"))
if (SystemProfile.getMaxCacheTransNum == 0 || pe.getTransPoolMgr.getTransLength() < SystemProfile.getMaxCacheTransNum) {
pe.getTransPoolMgr.putTran(t, pe.getSysTag)
//广播接收交易事件
if (pe.getTransPoolMgr.getTransLength() >= SystemProfile.getMinBlockTransNum)
pe.getActorRef(GlobalUtils.ActorType.voter) ! VoteOfBlocker
}
}
}
private def publishTrans(t: Transaction) = {
if (this.addr4NonUser == "" && this.selfAddr.indexOf("/user") > 0) {
this.selfAddr.substring(0, this.selfAddr.indexOf("/user"))
}
pe.getNodeMgr.getStableNodes.foreach(f => {
if (this.addr4NonUser != "" && !NodeHelp.isSameNode(f.toString, this.addr4NonUser)) {
visitStoreService(f, this.transPoolActorName, t)
}
})
}
override def receive = {
//处理接收的交易
case t: Transaction =>
//我们在这里并不缓存该Transaction在接收到同级别的广播时再进行缓存
//保存交易到本地
sendEvent(EventType.RECEIVE_INFO, mediator, pe.getSysTag, Topic.Transaction, Event.Action.TRANSACTION)
addTransToCache(t)
// mediator ! Publish(Topic.Transaction, t)
//pe.getStableNodes.foreach(sn=>{
// visitStoreService(sn , "/user/moduleManager/transactionPool",t)
//})
if (ActorUtils.isHelper(sender().path.toString) ||
ActorUtils.isAPI(sender().path.toString)) {
//广播交易到其他共识节点
if (ActorUtils.isHelper(sender().path.toString) || ActorUtils.isAPI(sender().path.toString)) {
//广播交易
mediator ! Publish(Topic.Transaction, t)
publishTrans(t)
//广播发送交易事件
sendEvent(EventType.PUBLISH_INFO, mediator, pe.getSysTag, Topic.Transaction, Event.Action.TRANSACTION)
} else {
//交易缓存,这里默认各个节点的交易缓存都是一致的但是因为入网时间不一致所有可能会有区别最终以出块人的交易为准
//TODO kami 验证交易签名和证书
//val checkedTransactionResult = CheckedTransactionResult(true, "")
sendEvent(EventType.RECEIVE_INFO, mediator, pe.getSysTag, Topic.Transaction, Event.Action.TRANSACTION)
val checkedTransactionResult = checkTransaction(t, dataaccess)
if (checkedTransactionResult.result) {
//签名验证成功
if (pe.getTransPoolMgr.getTransLength() < 100)
RepLogger.trace(RepLogger.System_Logger, this.getLogMsgPrefix(s"<<<<<<<<<<<<<>>>>>>>>>transaction=${pe.getTransPoolMgr.getTransLength()}"))
if (SystemProfile.getMaxCacheTransNum == 0 || pe.getTransPoolMgr.getTransLength() < SystemProfile.getMaxCacheTransNum) {
pe.getTransPoolMgr.putTran(t,pe.getSysTag)
//广播接收交易事件
if (pe.getTransPoolMgr.getTransLength() >= SystemProfile.getMinBlockTransNum)
pe.getActorRef(GlobalUtils.ActorType.voter) ! VoteOfBlocker
}
}
}
}
case _ => //ignore
}
}

View File

@ -37,69 +37,71 @@ object NodeHelp {
}
b
}
def isSameNode(srcStr: String, destStr: String): Boolean = {
destStr.equalsIgnoreCase(srcStr)
}
//获取Actor的地址akka.ssl.tcp://Repchain@192.168.10.155:54310
def getNodeAddress(actref: ActorRef):String={
def getNodeAddress(actref: ActorRef): String = {
val path = getNodePath(actref)
if (path.indexOf("/user") > 0) path.substring(0, path.indexOf("/user")) else ""
}
//获取Actor的路径akka.ssl.tcp://Repchain@192.168.10.155:54310/user/modulemanager/synchresponser#-1500748370
def getNodePath(actref: ActorRef):String={
if(actref == null) ""
def getNodePath(actref: ActorRef): String = {
if (actref == null) ""
akka.serialization.Serialization.serializedActorPath(actref)
}
def ConsensusConditionChecked(inputNumber: Int, nodeNumber: Int): Boolean = {
(inputNumber - 1) >= Math.floor(((nodeNumber)*1.0) / 2)
(inputNumber - 1) >= Math.floor(((nodeNumber) * 1.0) / 2)
}
def isCandidateNow(Systemname: String, candidates: Set[ String ]): Boolean = {
def isCandidateNow(Systemname: String, candidates: Set[String]): Boolean = {
val list = candidates.toList
list.contains(Systemname)
}
def isBlocker(blockerOfInput: String, blockername: String): Boolean = {
blockerOfInput == blockername
}
def checkBlocker(myaddress:String,sendaddress:String):Boolean = {
var b :Boolean = false
if(myaddress.indexOf("/user")>0){
def checkBlocker(myaddress: String, sendaddress: String): Boolean = {
var b: Boolean = false
if (myaddress.indexOf("/user") > 0) {
val addr = myaddress.substring(0, myaddress.indexOf("/user"))
b = sendaddress.indexOf(addr) != -1
}
b
}
def isSeedNode(nodeName:String):Boolean={
def isSeedNode(nodeName: String): Boolean = {
SystemProfile.getGenesisNodeName.equals(nodeName)
}
def isCandidatorNode(roles: Set[String]):Boolean = {
def isCandidatorNode(roles: Set[String]): Boolean = {
var r = false
breakable(
roles.foreach(f=>{
if(f.startsWith("CRFD-Node")){
r = true
break
}
})
)
roles.foreach(f => {
if (f.startsWith("CRFD-Node")) {
r = true
break
}
}))
r
}
def getNodeName(roles: Set[String]):String = {
def getNodeName(roles: Set[String]): String = {
var r = ""
breakable(
roles.foreach(f=>{
if(f.startsWith("CRFD-Node")){
r = f.substring(f.indexOf("CRFD-Node")+10)
break
}
})
)
roles.foreach(f => {
if (f.startsWith("CRFD-Node")) {
r = f.substring(f.indexOf("CRFD-Node") + 10)
break
}
}))
r
}
}

View File

@ -0,0 +1,70 @@
package rep.storage
import rep.protos.peer.BlockchainInfo
import _root_.com.google.protobuf.ByteString
class ChainInfoInCache(mda:ImpDataAccess) {
private var currentheight = 0l
private var currenttxnumber = 0l
private var bhash : String = null
private var bprevhash : String = null
private var statehash : String= null
initChainInfo
def initChainInfo={
currentheight = mda.getBlockHeight()
currenttxnumber = mda.getBlockAllTxNumber()
val bidx = mda.getBlockIdxByHeight(currentheight)
if (bidx != null) {
bhash = bidx.getBlockHash()
bprevhash = bidx.getBlockPrevHash()
statehash = bidx.getStateHash()
}
}
def getBlockChainInfo(): BlockchainInfo = {
var rbc = new BlockchainInfo()
if (bhash != null && !bhash.equalsIgnoreCase("")) {
rbc = rbc.withCurrentBlockHash(ByteString.copyFromUtf8(bhash))
} else {
rbc = rbc.withCurrentBlockHash(ByteString.EMPTY)
}
if (bprevhash != null && !bprevhash.equalsIgnoreCase("")) {
rbc = rbc.withPreviousBlockHash(ByteString.copyFromUtf8(bprevhash))
} else {
rbc = rbc.withPreviousBlockHash(ByteString.EMPTY)
}
if (statehash != null && !statehash.equalsIgnoreCase("")) {
rbc = rbc.withCurrentStateHash(ByteString.copyFromUtf8(statehash))
} else {
rbc = rbc.withCurrentStateHash(ByteString.EMPTY)
}
rbc = rbc.withHeight(currentheight)
rbc = rbc.withTotalTransactions(currenttxnumber)
rbc
}
def setHeight(h:Long)={
this.currentheight = h
}
def setTXNumber(n:Long)={
this.currenttxnumber = n
}
def setBlockHash(hash:String)={
this.bhash = hash
}
def setPrevBlockHash(phash:String)={
this.bprevhash = phash
}
def setBlockStateHash(state:String)={
this.statehash = state
}
}

View File

@ -59,6 +59,7 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName)
filemgr = new BlockFileMgr(this.SystemName)
private var chainInfoCache : ChainInfoInCache = new ChainInfoInCache(this)
/**
* @author jiangbuyun
* @version 0.7
@ -439,7 +440,7 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName)
* @return 返回链码信息 BlockchainInfo
*/
override def getBlockChainInfo(): BlockchainInfo = {
var rbc = new BlockchainInfo()
/*var rbc = new BlockchainInfo()
val currentheight = this.getBlockHeight()
val currenttxnumber = this.getBlockAllTxNumber()
val bidx = this.getBlockIdxByHeight(currentheight)
@ -470,7 +471,8 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName)
rbc = rbc.withHeight(currentheight)
rbc = rbc.withTotalTransactions(currenttxnumber)
rbc
rbc*/
chainInfoCache.getBlockChainInfo()
}
/**
@ -614,7 +616,9 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName)
override def rollbackToheight(toHeight: Long): Boolean = {
val rs = new Rollback4Storager(this, filemgr)
rs.rollbackToheight(toHeight)
val b = rs.rollbackToheight(toHeight)
chainInfoCache.initChainInfo
b
}
/**
@ -652,6 +656,11 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName)
if (this.commitAndAddBlock(block, oldh, oldno, oldtxnumber)) {
RepTimeTracer.setEndTime(this.SystemName, "storage-save-commit", System.currentTimeMillis(), block.height, block.transactions.size)
this.CommitTrans
chainInfoCache.setHeight(block.height)
chainInfoCache.setTXNumber(oldtxnumber + block.transactions.length)
chainInfoCache.setBlockHash(block.hashOfBlock.toStringUtf8())
chainInfoCache.setPrevBlockHash(block.previousBlockHash.toStringUtf8())
chainInfoCache.setBlockStateHash(block.stateHash.toStringUtf8())
(true, block.height, oldtxnumber + block.transactions.length, block.hashOfBlock.toStringUtf8(), block.previousBlockHash.toStringUtf8(), block.stateHash.toStringUtf8())
} else {
this.RollbackTrans
@ -741,6 +750,7 @@ class ImpDataAccess private (SystemName: String) extends IDataAccess(SystemName)
filemgr.writeBlock(bidx.getBlockFileNo(), bidx.getBlockFilePos() - 8, pathUtil.longToByte(blenght) ++ rbb)
RepTimeTracer.setEndTime(this.SystemName, "storage-save-write-file", System.currentTimeMillis(), block.height, block.transactions.size)
b = true
RepLogger.trace(
RepLogger.Storager_Logger,
"system_name=" + this.SystemName + "\t blockhash=" + bidx.getBlockHash() + "\tcommited success")