mirror of
https://gitee.com/BTAJL/repchain.git
synced 2024-11-30 10:48:26 +08:00
性能优化3
This commit is contained in:
parent
874544f382
commit
93e6faf971
@ -52,7 +52,7 @@
|
||||
#canonical.hostname = "192.168.10.155"
|
||||
#canonical.hostname = "192.168.31.155"
|
||||
canonical.hostname = "127.0.0.1"
|
||||
canonical.port = 22526
|
||||
canonical.port = 22522
|
||||
}
|
||||
}
|
||||
|
||||
@ -96,7 +96,7 @@ system {
|
||||
statistic_enable = 1 # 0,unable;able
|
||||
#实时图的事件是否发送,如果不发送,前端实时图将收不到任何消息。
|
||||
realtimegraph_enable = 1#0 unable;1 enable; default 1
|
||||
httpServicePort = 9085#http服务的端口号,默认为8081
|
||||
httpServicePort = 9081#http服务的端口号,默认为8081
|
||||
checkCertValidate = 0#设置是否检查证书的有效性,默认为0 0=不校验,1=校验
|
||||
contractOperationMode = 0#设置合约的运行方式,0=debug方式,1=deploy,默认为debug方式,如果发布部署,必须使用deploy方式。
|
||||
|
||||
@ -112,7 +112,7 @@ system {
|
||||
#块内交易的最大数量
|
||||
trans_num_limit = 2000
|
||||
#块内交易标准最小数量
|
||||
trans_num_min = 500
|
||||
trans_num_min = 1
|
||||
#交易数量不足,重试次数
|
||||
retry_time = 10
|
||||
#区块的最大长度,不能大于传输的消息的最大长度,单位是字节
|
||||
@ -145,7 +145,7 @@ system {
|
||||
#辅助自动创建交易的间隔
|
||||
tran_create_dur = 50 #millis
|
||||
#最大交易缓存量
|
||||
max_cache_num = 60000
|
||||
max_cache_num = 100000
|
||||
}
|
||||
|
||||
cluster {
|
||||
|
@ -279,8 +279,6 @@ class ClusterSystem(sysTag: String, initType: Int, sysStart: Boolean) {
|
||||
|
||||
SystemProfile.initConfigSystem(this.sysConf,this.sysTag )
|
||||
|
||||
|
||||
|
||||
if (!hasDiskSpace) {
|
||||
Cluster(sysActor).down(clusterAddr)
|
||||
throw new Exception("not enough disk space")
|
||||
@ -298,6 +296,7 @@ class ClusterSystem(sysTag: String, initType: Int, sysStart: Boolean) {
|
||||
}
|
||||
|
||||
|
||||
|
||||
RepLogger.trace(RepLogger.System_Logger, sysTag + "~" + "System" + " ~ " + s"ClusterSystem ${sysTag} start" + " ~ ")
|
||||
}
|
||||
|
||||
|
@ -66,7 +66,7 @@ class TransactionChecker (moduleName: String) extends ModuleBase(moduleName){
|
||||
private def addTransToCache(t: Transaction) = {
|
||||
if(!pe.getTransPoolMgr.findTrans(t.id)){
|
||||
//交易池中不存在的交易才检查
|
||||
val checkedTransactionResult = checkTransaction(t, dataaccess)
|
||||
//val checkedTransactionResult = checkTransaction(t, dataaccess)
|
||||
//签名验证成功
|
||||
val poolIsEmpty = pe.getTransPoolMgr.isEmpty
|
||||
//if((checkedTransactionResult.result) && (SystemProfile.getMaxCacheTransNum == 0 || pe.getTransPoolMgr.getTransLength() < SystemProfile.getMaxCacheTransNum) ){
|
||||
@ -86,8 +86,11 @@ class TransactionChecker (moduleName: String) extends ModuleBase(moduleName){
|
||||
case t: Transaction =>
|
||||
//保存交易到本地
|
||||
sendEvent(EventType.RECEIVE_INFO, mediator, pe.getSysTag, Topic.Transaction, Event.Action.TRANSACTION)
|
||||
if(!NodeHelp.isSameNodeForString(this.selfAddr,NodeHelp.getNodePath(sender())))
|
||||
if(!NodeHelp.isSameNodeForString(this.selfAddr,NodeHelp.getNodePath(sender()))) {
|
||||
addTransToCache(t)
|
||||
}else{
|
||||
System.err.println(s"recv tx from local,system=${pe.getSysTag}")
|
||||
}
|
||||
case _ => //ignore
|
||||
}
|
||||
|
||||
|
@ -56,12 +56,12 @@ class BlockerOfCFRDInStream(moduleName: String) extends IBlocker(moduleName) {
|
||||
breakable(
|
||||
op = for (i <- 1 to blcCount) {
|
||||
RepTimeTracer.setStartTime(pe.getSysTag, "collectTransToBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, 0)
|
||||
val tmpList = pe.getTransPoolMgr.getTransListClone(start, SystemProfile.getLimitBlockTransNum, pe.getSysTag).reverse
|
||||
val tmpList = pe.getTransPoolMgr.getTransListClone(SystemProfile.getLimitBlockTransNum, pe.getSysTag).reverse
|
||||
RepTimeTracer.setEndTime(pe.getSysTag, "collectTransToBlock", System.currentTimeMillis(), pe.getBlocker.VoteHeight + 1, tmpList.size)
|
||||
var isLast = false
|
||||
if (tmpList.length >= SystemProfile.getLimitBlockTransNum && i < blcCount) {
|
||||
isLast = false
|
||||
val tmpList1 = pe.getTransPoolMgr.getTransListClone(start, SystemProfile.getLimitBlockTransNum, pe.getSysTag)
|
||||
val tmpList1 = pe.getTransPoolMgr.getTransListClone(SystemProfile.getLimitBlockTransNum, pe.getSysTag)
|
||||
if (tmpList1.length <= 0) {
|
||||
isLast = true
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ abstract class IBlocker(moduleName: String) extends ModuleBase(moduleName) {
|
||||
protected def CollectedTransOfBlock(start: Int, num: Int, limitsize: Int): Seq[Transaction] = {
|
||||
//var result = ArrayBuffer.empty[Transaction]
|
||||
try {
|
||||
val tmplist = pe.getTransPoolMgr.getTransListClone(start, num, pe.getSysTag)
|
||||
val tmplist = pe.getTransPoolMgr.getTransListClone( num, pe.getSysTag)
|
||||
//if (tmplist.size > 0) {
|
||||
val currenttime = System.currentTimeMillis() / 1000
|
||||
/*var transsize = 0
|
||||
|
@ -70,7 +70,7 @@ class BlockerOfPBFT(moduleName: String) extends ModuleBase(moduleName) {
|
||||
private def CollectedTransOfBlock(start: Int, num: Int, limitsize: Int): ArrayBuffer[Transaction] = {
|
||||
var result = ArrayBuffer.empty[Transaction]
|
||||
try {
|
||||
val tmplist = pe.getTransPoolMgr.getTransListClone(start, num, pe.getSysTag)
|
||||
val tmplist = pe.getTransPoolMgr.getTransListClone(num, pe.getSysTag)
|
||||
if (tmplist.size > 0) {
|
||||
val currenttime = System.currentTimeMillis() / 1000
|
||||
var transsize = 0
|
||||
|
@ -68,7 +68,7 @@ class IModuleManager(moduleName: String, sysTag: String, enableStatistic: Boolea
|
||||
pe.setSysTag(sysTag)
|
||||
val confHeler = new ConfigerHelper(conf, sysTag, pe.getSysTag)
|
||||
confHeler.init()
|
||||
|
||||
pe.getTransPoolMgr.startupSchedule(sysTag)
|
||||
}
|
||||
|
||||
private def loadCommonActor:Unit = {
|
||||
|
@ -1,63 +1,80 @@
|
||||
/*
|
||||
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BA SIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package rep.network.tools.transpool
|
||||
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ConcurrentSkipListMap, Executors, TimeUnit}
|
||||
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
|
||||
|
||||
import rep.protos.peer.Transaction
|
||||
import java.util.concurrent.locks._
|
||||
|
||||
import rep.log.RepLogger
|
||||
//import scala.jdk.CollectionConverters._
|
||||
import scala.collection.JavaConverters._
|
||||
import java.util.concurrent.ConcurrentSkipListMap
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import rep.app.conf.{ TimePolicy }
|
||||
import rep.app.conf.TimePolicy
|
||||
import rep.log.RepLogger
|
||||
import rep.protos.peer.Transaction
|
||||
import rep.storage.ImpDataAccess
|
||||
import scala.util.control.Breaks._
|
||||
|
||||
import scala.util.control.Breaks.{break, breakable}
|
||||
|
||||
class TransactionPoolMgr {
|
||||
private val transLock : Lock = new ReentrantLock();
|
||||
|
||||
private implicit var transactions = new ConcurrentSkipListMap[Long,Transaction]() asScala
|
||||
private implicit var transQueue = new ConcurrentLinkedQueue[Transaction]()
|
||||
private implicit var transKeys = new ConcurrentHashMap[String,Long]() asScala
|
||||
case class TransactionInfo(transaction: Transaction,entryTime:Long)
|
||||
|
||||
private implicit var transQueueOfTxid = new ConcurrentLinkedQueue[String]()
|
||||
private implicit var transKeys = new ConcurrentHashMap[String,TransactionInfo]() asScala
|
||||
private implicit var transNumber = new AtomicInteger(0)
|
||||
|
||||
/*def getTransListClone(num: Int,sysName:String): Seq[Transaction] = {
|
||||
private var scheduledExecutorService = Executors.newSingleThreadScheduledExecutor
|
||||
private var isStarup = new AtomicBoolean(false)
|
||||
|
||||
def startupSchedule(sysName:String)={
|
||||
if(this.isStarup.get() == false){
|
||||
this.isStarup.set((true))
|
||||
this.scheduledExecutorService.scheduleWithFixedDelay(
|
||||
new cleanCache(sysName),10,10, TimeUnit.SECONDS
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class cleanCache(sysName:String) extends Runnable{
|
||||
override def run(){
|
||||
var translist = scala.collection.mutable.ArrayBuffer[String]()
|
||||
val currenttime = System.currentTimeMillis()
|
||||
val sr: ImpDataAccess = ImpDataAccess.GetDataAccess(sysName)
|
||||
try{
|
||||
System.err.println(s"entry Clean Cache,system=${sysName}")
|
||||
transKeys.values.foreach(ti=>{
|
||||
if((currenttime - ti.entryTime)/1000 > TimePolicy.getTranscationWaiting || sr.isExistTrans4Txid(ti.transaction.id) ){
|
||||
translist += ti.transaction.id
|
||||
}
|
||||
})
|
||||
|
||||
System.err.println(s"waiting delete trans,,system=${sysName},list:"+translist.mkString(","))
|
||||
translist.foreach(txid=>{
|
||||
transKeys.remove(txid)
|
||||
if(transQueueOfTxid.remove(txid)){
|
||||
transNumber.decrementAndGet()
|
||||
}
|
||||
})
|
||||
System.err.println(s"entry Clean Cache finish,system=${sysName}")
|
||||
}catch{
|
||||
case e:Exception=>e.printStackTrace()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def getTransListClone(num: Int,sysName:String): Seq[Transaction] = {
|
||||
var translist = scala.collection.mutable.ArrayBuffer[Transaction]()
|
||||
val currenttime = System.currentTimeMillis()
|
||||
try{
|
||||
val sr: ImpDataAccess = ImpDataAccess.GetDataAccess(sysName)
|
||||
var count = 0
|
||||
|
||||
breakable(
|
||||
for(i<-0 to num-1){
|
||||
val t = this.transQueue.poll()
|
||||
if(t != null){
|
||||
val txid = this.transQueueOfTxid.poll()
|
||||
if(txid != null){
|
||||
this.transNumber.decrementAndGet()
|
||||
val l = this.transKeys.get(t.id)
|
||||
if(l != null){
|
||||
if(currenttime - l.get._1 > TimePolicy.getTranscationWaiting) { //|| sr.isExistTrans4Txid(txid) ){
|
||||
val l = this.transKeys.get(txid)
|
||||
if(l != None){
|
||||
if((currenttime - l.get.entryTime)/1000 > TimePolicy.getTranscationWaiting || sr.isExistTrans4Txid(txid) ){
|
||||
//超时或者重复 删除
|
||||
this.transKeys.remove(t.id)
|
||||
this.transKeys.remove(txid)
|
||||
}else{
|
||||
translist += t
|
||||
translist += l.get.transaction
|
||||
}
|
||||
}
|
||||
}else{
|
||||
@ -72,71 +89,24 @@ class TransactionPoolMgr {
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"systemname=${sysName},transNumber=${transNumber},getTransListClone spent time=${end-currenttime}")
|
||||
|
||||
translist.toSeq
|
||||
}*/
|
||||
def getTransListClone(start:Int,num: Int,sysName:String): Seq[Transaction] = {
|
||||
var translist = scala.collection.mutable.ArrayBuffer[Transaction]()
|
||||
transLock.lock()
|
||||
val starttime = System.currentTimeMillis()
|
||||
try{
|
||||
var deltrans4id = scala.collection.mutable.ArrayBuffer[String]()
|
||||
val sr: ImpDataAccess = ImpDataAccess.GetDataAccess(sysName)
|
||||
val currenttime = System.nanoTime() / 1000000000
|
||||
var count = 0
|
||||
var pos = 0
|
||||
breakable(
|
||||
transactions.foreach(f=>{
|
||||
if(count <= num){
|
||||
val txid = f._2.id
|
||||
if ((currenttime - f._1/1000000000) > TimePolicy.getTranscationWaiting || sr.isExistTrans4Txid(txid) ){
|
||||
deltrans4id += txid
|
||||
}else{
|
||||
if(pos < start){
|
||||
pos += 1
|
||||
}else{
|
||||
translist += f._2
|
||||
count += 1
|
||||
}
|
||||
}
|
||||
}else{
|
||||
break
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
if(deltrans4id.length > 0){
|
||||
deltrans4id.foreach(f=>{
|
||||
RepLogger.info(RepLogger.TransLifeCycle_Logger, s"systemname=${sysName},remove trans from pool,trans timeout or exist in block,${f}")
|
||||
removeTranscation4Txid(f,sysName)
|
||||
})
|
||||
}
|
||||
deltrans4id.clear()
|
||||
}finally{
|
||||
transLock.unlock()
|
||||
}
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"systemname=${sysName},transNumber=${transNumber},getTransListClone spent time=${end-starttime}")
|
||||
|
||||
translist.toSeq
|
||||
}
|
||||
|
||||
|
||||
def putTran(tran: Transaction,sysName:String): Unit = {
|
||||
transLock.lock()
|
||||
|
||||
val start = System.currentTimeMillis()
|
||||
try{
|
||||
val time = System.nanoTime()
|
||||
val time = System.currentTimeMillis()
|
||||
val txid = tran.id
|
||||
if(transKeys.contains(txid)){
|
||||
RepLogger.info(RepLogger.TransLifeCycle_Logger, s"systemname=${sysName},trans entry pool,${tran.id} exists in cache")
|
||||
}else{
|
||||
transactions.put(time, tran)
|
||||
transKeys.put(txid, time)
|
||||
transKeys.put(txid, TransactionInfo(tran,time))
|
||||
this.transQueueOfTxid.add(txid)
|
||||
transNumber.incrementAndGet()
|
||||
RepLogger.info(RepLogger.TransLifeCycle_Logger, s"systemname=${sysName},transNumber=${transNumber},trans entry pool,${tran.id},entry time = ${time}")
|
||||
}
|
||||
}finally {
|
||||
transLock.unlock()
|
||||
}
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"systemname=${sysName},putTran spent time=${end-start}")
|
||||
@ -154,45 +124,30 @@ class TransactionPoolMgr {
|
||||
}
|
||||
|
||||
def getTransaction(txid:String):Transaction={
|
||||
transactions.getOrElse(txid,null)
|
||||
var t : Transaction = null
|
||||
val d = this.transKeys.getOrElse(txid,null)
|
||||
if(d != None){
|
||||
t = d.transaction
|
||||
}
|
||||
t
|
||||
}
|
||||
|
||||
def removeTrans(trans: Seq[ Transaction ],sysName:String): Unit = {
|
||||
transLock.lock()
|
||||
try{
|
||||
trans.foreach(f=>{
|
||||
removeTranscation(f,sysName)
|
||||
})
|
||||
}finally{
|
||||
transLock.unlock()
|
||||
}
|
||||
}
|
||||
|
||||
def removeTranscation(tran:Transaction,sysName:String):Unit={
|
||||
transLock.lock()
|
||||
try{
|
||||
RepLogger.info(RepLogger.TransLifeCycle_Logger, s"systemname=${sysName},remove trans from pool,trans entry block,${tran.id}")
|
||||
removeTranscation4Txid(tran.id,sysName)
|
||||
this.transKeys.remove(tran.id)
|
||||
}finally{
|
||||
transLock.unlock()
|
||||
}
|
||||
}
|
||||
|
||||
def removeTranscation4Txid(txid:String,sysName:String):Unit={
|
||||
transLock.lock()
|
||||
val start = System.currentTimeMillis()
|
||||
try{
|
||||
if(transKeys.contains(txid)){
|
||||
transactions.remove(transKeys(txid))
|
||||
transKeys.remove(txid)
|
||||
transNumber.decrementAndGet()
|
||||
}
|
||||
RepLogger.info(RepLogger.TransLifeCycle_Logger, s"systemname=${sysName},remove trans from pool,${txid}")
|
||||
}finally{
|
||||
transLock.unlock()
|
||||
}
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"systemname=${sysName},removeTranscation4Txid spent time=${end-start}")
|
||||
}
|
||||
|
||||
def getTransLength() : Int = {
|
||||
@ -200,6 +155,6 @@ class TransactionPoolMgr {
|
||||
}
|
||||
|
||||
def isEmpty:Boolean={
|
||||
transactions.isEmpty
|
||||
this.transQueueOfTxid.isEmpty
|
||||
}
|
||||
}
|
@ -0,0 +1,205 @@
|
||||
/*
|
||||
* Copyright 2019 Blockchain Technology and Application Joint Lab, Linkel Technology Co., Ltd, Beijing, Fintech Research Center of ISCAS.
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BA SIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package rep.network.tools.transpool
|
||||
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
|
||||
import rep.protos.peer.Transaction
|
||||
import java.util.concurrent.locks._
|
||||
|
||||
import rep.log.RepLogger
|
||||
//import scala.jdk.CollectionConverters._
|
||||
import scala.collection.JavaConverters._
|
||||
import java.util.concurrent.ConcurrentSkipListMap
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import rep.app.conf.{ TimePolicy }
|
||||
import rep.storage.ImpDataAccess
|
||||
import scala.util.control.Breaks._
|
||||
|
||||
class TransactionPoolMgr_1 {
|
||||
private val transLock : Lock = new ReentrantLock();
|
||||
|
||||
private implicit var transactions = new ConcurrentSkipListMap[Long,Transaction]() asScala
|
||||
private implicit var transQueue = new ConcurrentLinkedQueue[Transaction]()
|
||||
private implicit var transKeys = new ConcurrentHashMap[String,Long]() asScala
|
||||
private implicit var transNumber = new AtomicInteger(0)
|
||||
|
||||
/*def getTransListClone(num: Int,sysName:String): Seq[Transaction] = {
|
||||
var translist = scala.collection.mutable.ArrayBuffer[Transaction]()
|
||||
val currenttime = System.currentTimeMillis()
|
||||
try{
|
||||
val sr: ImpDataAccess = ImpDataAccess.GetDataAccess(sysName)
|
||||
var count = 0
|
||||
|
||||
breakable(
|
||||
for(i<-0 to num-1){
|
||||
val t = this.transQueue.poll()
|
||||
if(t != null){
|
||||
this.transNumber.decrementAndGet()
|
||||
val l = this.transKeys.get(t.id)
|
||||
if(l != null){
|
||||
if(currenttime - l.get._1 > TimePolicy.getTranscationWaiting) { //|| sr.isExistTrans4Txid(txid) ){
|
||||
//超时或者重复 删除
|
||||
this.transKeys.remove(t.id)
|
||||
}else{
|
||||
translist += t
|
||||
}
|
||||
}
|
||||
}else{
|
||||
//队列为空,打包结束
|
||||
break
|
||||
}
|
||||
})
|
||||
}catch{
|
||||
case e:Exception =>
|
||||
RepLogger.error(RepLogger.OutputTime_Logger, s"systemname=${sysName},transNumber=${transNumber},getTransListClone error, info=${e.getMessage}")
|
||||
}
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"systemname=${sysName},transNumber=${transNumber},getTransListClone spent time=${end-currenttime}")
|
||||
|
||||
translist.toSeq
|
||||
}*/
|
||||
def getTransListClone(start:Int,num: Int,sysName:String): Seq[Transaction] = {
|
||||
var translist = scala.collection.mutable.ArrayBuffer[Transaction]()
|
||||
transLock.lock()
|
||||
val starttime = System.currentTimeMillis()
|
||||
try{
|
||||
var deltrans4id = scala.collection.mutable.ArrayBuffer[String]()
|
||||
val sr: ImpDataAccess = ImpDataAccess.GetDataAccess(sysName)
|
||||
val currenttime = System.nanoTime() / 1000000000
|
||||
var count = 0
|
||||
var pos = 0
|
||||
breakable(
|
||||
transactions.foreach(f=>{
|
||||
if(count <= num){
|
||||
val txid = f._2.id
|
||||
if ((currenttime - f._1/1000000000) > TimePolicy.getTranscationWaiting || sr.isExistTrans4Txid(txid) ){
|
||||
deltrans4id += txid
|
||||
}else{
|
||||
if(pos < start){
|
||||
pos += 1
|
||||
}else{
|
||||
translist += f._2
|
||||
count += 1
|
||||
}
|
||||
}
|
||||
}else{
|
||||
break
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
if(deltrans4id.length > 0){
|
||||
deltrans4id.foreach(f=>{
|
||||
RepLogger.info(RepLogger.TransLifeCycle_Logger, s"systemname=${sysName},remove trans from pool,trans timeout or exist in block,${f}")
|
||||
removeTranscation4Txid(f,sysName)
|
||||
})
|
||||
}
|
||||
deltrans4id.clear()
|
||||
}finally{
|
||||
transLock.unlock()
|
||||
}
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"systemname=${sysName},transNumber=${transNumber},getTransListClone spent time=${end-starttime}")
|
||||
|
||||
translist.toSeq
|
||||
}
|
||||
|
||||
|
||||
def putTran(tran: Transaction,sysName:String): Unit = {
|
||||
transLock.lock()
|
||||
val start = System.currentTimeMillis()
|
||||
try{
|
||||
val time = System.nanoTime()
|
||||
val txid = tran.id
|
||||
if(transKeys.contains(txid)){
|
||||
RepLogger.info(RepLogger.TransLifeCycle_Logger, s"systemname=${sysName},trans entry pool,${tran.id} exists in cache")
|
||||
}else{
|
||||
transactions.put(time, tran)
|
||||
transKeys.put(txid, time)
|
||||
transNumber.incrementAndGet()
|
||||
RepLogger.info(RepLogger.TransLifeCycle_Logger, s"systemname=${sysName},transNumber=${transNumber},trans entry pool,${tran.id},entry time = ${time}")
|
||||
}
|
||||
}finally {
|
||||
transLock.unlock()
|
||||
}
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"systemname=${sysName},putTran spent time=${end-start}")
|
||||
}
|
||||
|
||||
def findTrans(txid:String):Boolean = {
|
||||
var b :Boolean = false
|
||||
val start = System.currentTimeMillis()
|
||||
if(transKeys.contains(txid)){
|
||||
b = true
|
||||
}
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"findTrans spent time=${end-start}")
|
||||
b
|
||||
}
|
||||
|
||||
def getTransaction(txid:String):Transaction={
|
||||
transactions.getOrElse(txid,null)
|
||||
}
|
||||
|
||||
def removeTrans(trans: Seq[ Transaction ],sysName:String): Unit = {
|
||||
transLock.lock()
|
||||
try{
|
||||
trans.foreach(f=>{
|
||||
removeTranscation(f,sysName)
|
||||
})
|
||||
}finally{
|
||||
transLock.unlock()
|
||||
}
|
||||
}
|
||||
|
||||
def removeTranscation(tran:Transaction,sysName:String):Unit={
|
||||
transLock.lock()
|
||||
try{
|
||||
RepLogger.info(RepLogger.TransLifeCycle_Logger, s"systemname=${sysName},remove trans from pool,trans entry block,${tran.id}")
|
||||
removeTranscation4Txid(tran.id,sysName)
|
||||
}finally{
|
||||
transLock.unlock()
|
||||
}
|
||||
}
|
||||
|
||||
def removeTranscation4Txid(txid:String,sysName:String):Unit={
|
||||
transLock.lock()
|
||||
val start = System.currentTimeMillis()
|
||||
try{
|
||||
if(transKeys.contains(txid)){
|
||||
transactions.remove(transKeys(txid))
|
||||
transKeys.remove(txid)
|
||||
transNumber.decrementAndGet()
|
||||
}
|
||||
RepLogger.info(RepLogger.TransLifeCycle_Logger, s"systemname=${sysName},remove trans from pool,${txid}")
|
||||
}finally{
|
||||
transLock.unlock()
|
||||
}
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"systemname=${sysName},removeTranscation4Txid spent time=${end-start}")
|
||||
}
|
||||
|
||||
def getTransLength() : Int = {
|
||||
this.transNumber.get
|
||||
}
|
||||
|
||||
def isEmpty:Boolean={
|
||||
transactions.isEmpty
|
||||
}
|
||||
}
|
@ -1,106 +0,0 @@
|
||||
package rep.network.tools.transpool
|
||||
|
||||
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ConcurrentSkipListMap}
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import scala.collection.JavaConverters._
|
||||
import rep.app.conf.TimePolicy
|
||||
import rep.log.RepLogger
|
||||
import rep.protos.peer.Transaction
|
||||
import rep.storage.ImpDataAccess
|
||||
|
||||
import scala.util.control.Breaks.{break, breakable}
|
||||
|
||||
class TransactionPoolOfQueueMgr {
|
||||
|
||||
private implicit var transQueue = new ConcurrentLinkedQueue[Transaction]()
|
||||
private implicit var transKeys = new ConcurrentHashMap[String,(Long,Transaction)]() asScala
|
||||
private implicit var transNumber = new AtomicInteger(0)
|
||||
|
||||
def getTransListClone(number: Int,sysName:String): Seq[Transaction] = {
|
||||
var transList = scala.collection.mutable.ArrayBuffer[Transaction]()
|
||||
val currentTime = System.currentTimeMillis()
|
||||
val len = number - 1
|
||||
|
||||
breakable(
|
||||
for(i<-0 to len){
|
||||
val t = this.transQueue.poll()
|
||||
if(t == null){
|
||||
break
|
||||
}else{
|
||||
var tk = this.transKeys.getOrElse(t.id,null)
|
||||
var time = 0l
|
||||
if(tk != null){
|
||||
time = tk._1
|
||||
}
|
||||
if((time - currentTime > TimePolicy.getTranscationWaiting)){//|| sr.isExistTrans4Txid(txid) ){
|
||||
this.transKeys.remove(t.id)
|
||||
}else{
|
||||
transList += t
|
||||
this.transKeys.remove(t.id)
|
||||
this.transNumber.decrementAndGet()
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"systemname=${sysName},transNumber=${transList.length},getTransListClone spent time=${end-currentTime}")
|
||||
|
||||
transList.toSeq
|
||||
}
|
||||
|
||||
|
||||
def putTran(tran: Transaction,sysName:String): Unit = {
|
||||
val start = System.currentTimeMillis()
|
||||
|
||||
if(this.transKeys.contains(tran.id)){
|
||||
RepLogger.info(RepLogger.TransLifeCycle_Logger, s"systemname=${sysName},trans entry pool,${tran.id} exists in cache")
|
||||
}else{
|
||||
this.transQueue.add(tran)
|
||||
this.transKeys.put(tran.id,(System.currentTimeMillis(),tran))
|
||||
this.transNumber.incrementAndGet()
|
||||
}
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"systemname=${sysName},putTran spent time=${end-start}")
|
||||
}
|
||||
|
||||
def putTrans(trans:Seq[Transaction],sysName:String): Unit ={
|
||||
val start = System.currentTimeMillis()
|
||||
trans.foreach(t=>{
|
||||
this.putTran(t,sysName)
|
||||
})
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"systemname=${sysName},putTran spent time=${end-start}")
|
||||
}
|
||||
|
||||
def findTrans(txid:String):Boolean = {
|
||||
var b :Boolean = false
|
||||
val start = System.currentTimeMillis()
|
||||
if(transKeys.contains(txid)){
|
||||
b = true
|
||||
}
|
||||
val end = System.currentTimeMillis()
|
||||
RepLogger.trace(RepLogger.OutputTime_Logger, s"findTrans spent time=${end-start}")
|
||||
b
|
||||
}
|
||||
|
||||
def getTransaction(txid:String):Transaction={
|
||||
val t = this.transKeys.getOrElse(txid,null)
|
||||
if(t != null){
|
||||
t._2
|
||||
}else null
|
||||
}
|
||||
|
||||
def removeTrans(trans: Seq[ Transaction ],sysName:String): Unit = {
|
||||
trans.foreach(f=>{
|
||||
this.transKeys.remove(f.id)
|
||||
})
|
||||
}
|
||||
|
||||
def getTransLength() : Int = {
|
||||
this.transNumber.get
|
||||
}
|
||||
|
||||
def isEmpty:Boolean={
|
||||
this.transQueue.isEmpty
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user