在system.conf文件中增加配置项,

vote {
    //最低投票人数量
    vote_note_min = 4
    //参与共识的节点别名
    vote_node_list = ["1","2","3","4"]
  }
  vote_node_list这个项是必须的配置,配置内容是可以参与的共识节点的别名,没有配置的节点
  不是共识节点。
This commit is contained in:
wuwei1972 2018-10-31 16:37:56 +08:00
parent 19078a4d01
commit ea19df8d91
12 changed files with 133 additions and 52 deletions

View File

@ -94,6 +94,8 @@ system {
vote {
//最低投票人数量
vote_note_min = 4
//参与共识的节点别名
vote_node_list = ["1","2","3","4"]
}
diskspaceManager{
@ -104,7 +106,7 @@ system {
//辅助自动创建交易的间隔
tran_create_dur = 50 //millis
//最大交易缓存量
max_cache_num = 10000
max_cache_num = 60000
}
cluster {

View File

@ -37,10 +37,10 @@ object Repchain {
val cluster = sys1.getActorSys//获取内部系统SystemActor实例
val node_min = 4
val node_min = 5
//如果node_max>node_min 将启动node反复离网和入网的仿真但是由于system离网后无法复用并重新加入
//运行一定时间会内存溢出
val node_max = 4
val node_max = 5
var node_add = true
var nodes = Set.empty[ClusterSystem]

View File

@ -16,6 +16,8 @@
package rep.app.conf
import java.io._
import rep.crypto.ECDSASign
/**
* @author jiangbuyun
@ -25,30 +27,29 @@ import java.io._
object SystemCertList {
private var mySystemCertList:Set[String] = (new scala.collection.mutable.ArrayBuffer[String]()).toSet[String]
//private def InitSystemCertList:Set[String] = {
var a = new scala.collection.mutable.ArrayBuffer[String]()
val fis = new File("jks")
if(fis.isDirectory()){
val fs = fis.listFiles()
for(fn<-fs){
if(fn.isFile()){
val fname = fn.getName
val pos = fname.indexOf("mykeystore_")
val suffixpos = fname.indexOf(".jks")
if(pos >= 0 && suffixpos>0){
a += fname.substring(pos+11, suffixpos)
}
private def loadVoteNodeListForCert = {
synchronized{
if(this.mySystemCertList.isEmpty){
val list = SystemProfile.getVoteNodeList
val clist = ECDSASign.getAliasOfTrustkey
var rlist : scala.collection.mutable.ArrayBuffer[String] = new scala.collection.mutable.ArrayBuffer[String]()
var i = 0
for( i <- 1 to clist.size()-1){
val alias = clist.get(i)
if(list.contains(alias)){
rlist += alias
}
}
this.mySystemCertList = rlist.toSet[String]
}
}
mySystemCertList = a.toSet[String]
//}
}
def getSystemCertList:Set[String] = {
//if(this.mySystemCertList.size <=0 ){
// mySystemCertList = InitSystemCertList:Set[String]
//}
mySystemCertList
if(this.mySystemCertList.isEmpty){
loadVoteNodeListForCert
}
this.mySystemCertList
}
}

View File

@ -16,7 +16,10 @@
package rep.app.conf
import com.typesafe.config.Config
//import collection.JavaConversions._
//import scala.collection.immutable._
import java.util.List
import java.util.ArrayList
/**
* 系统配置信息缓存对象
@ -45,6 +48,7 @@ object SystemProfile {
private[this] var _SERVERPORT:Int=8081//http服务的端口默认为8081
private[this] var _CHECKCERTVALIDATE:Int=0//是否检查证书的有效性0不检查1检查
private[this] var _CONTRACTOPERATIONMODE = 0//设置合约的运行方式0=debug方式1=deploy默认为debug方式如果发布部署必须使用deploy方式
private[this] var _VOTENODELIST : List[String] = new ArrayList[String]
private def SERVERPORT :Int = _SERVERPORT
@ -52,6 +56,12 @@ object SystemProfile {
private def DISKSPACE_ALARM_NUM :Long = _DISKSPACE_ALARM_NUM
private def CONTRACTOPERATIONMODE:Int=_CONTRACTOPERATIONMODE
private def VOTENODELIST : List[String] = _VOTENODELIST
private def VOTENODELIST_=(value: List[String]): Unit = {
_VOTENODELIST = value
}
private def SERVERPORT_=(value: Int): Unit = {
_SERVERPORT = value
@ -123,6 +133,7 @@ object SystemProfile {
MIN_BLOCK_TRANS_NUM_=(config.getInt("system.block.trans_num_min"))
RETRY_TIME_=(config.getInt("system.block.retry_time"))
VOTE_NOTE_MIN_=(config.getInt("system.vote.vote_note_min"))
VOTENODELIST_=(config.getStringList("system.vote.vote_node_list"))
TRAN_CREATE_DUR_=(config.getInt("system.transaction.tran_create_dur"))
MAX_CATCH_TRANS_NUM_=(config.getInt("system.transaction.max_cache_num"))
TRANS_CREATE_TYPE_=(config.getInt("system.trans_create_type"))
@ -131,8 +142,6 @@ object SystemProfile {
CHECKCERTVALIDATE_=(config.getInt("system.checkCertValidate"))
CONTRACTOPERATIONMODE_=(config.getInt("system.contractOperationMode"))
}
def getLimitBlockTransNum = LIMIT_BLOCK_TRANS_NUM
@ -155,4 +164,6 @@ object SystemProfile {
def getCheckCertValidate = CHECKCERTVALIDATE
def getContractOperationMode = CONTRACTOPERATIONMODE
def getVoteNodeList = VOTENODELIST
}

View File

@ -34,6 +34,9 @@ import rep.storage.cfg._
import java.io.File
import scala.collection.mutable
import rep.app.conf.SystemProfile
import com.typesafe.config.ConfigValueFactory
import java.util.List
import java.util.ArrayList
/**
* System创建伴生对象
@ -89,7 +92,7 @@ class ClusterSystem(sysTag: String, initType: Int, sysStart:Boolean) extends Rep
private var enableStatistic = false
private val sysConf: Config = initSystem(sysTag)
private var sysConf: Config = initSystem(sysTag)
private var sysActor:ActorSystem = null
@ -190,6 +193,7 @@ class ClusterSystem(sysTag: String, initType: Int, sysStart:Boolean) extends Rep
* 初始化
*/
def init = {
initConsensusNodeOfConfig
sysStart match {
case true =>
sysActor = ActorSystem(SystemConf.SYSTEM_NAME, sysConf)
@ -200,14 +204,25 @@ class ClusterSystem(sysTag: String, initType: Int, sysStart:Boolean) extends Rep
logMsg(LOG_TYPE.INFO, moduleName, s"System(${sysTag}) init successfully", "s")
}
private def initConsensusNodeOfConfig={
val nodelist = sysConf.getStringList("system.vote.vote_node_list")
if(nodelist.contains(this.sysTag)){
//val roles = Array("CRFD-Node")
var roles :List[String] = new ArrayList[String]
roles.add("CRFD-Node")
sysConf = sysConf.withValue("akka.cluster.roles", ConfigValueFactory.fromAnyRef(roles))
}
}
/**
* 启动系统
*/
def start = {
if(enableStatistic) statistics = sysActor.actorOf(Props[StatisticCollection],"statistic")
moduleManager = sysActor.actorOf(ModuleManager.props("moduleManager", sysTag),"moduleManager")
ModuleBase.registerActorRef(sysTag, ActorType.MODULE_MANAGER, moduleManager)
SystemProfile.initConfigSystem(sysActor.settings.config)
moduleManager = sysActor.actorOf(ModuleManager.props("moduleManager", sysTag),"moduleManager")
ModuleBase.registerActorRef(sysTag, ActorType.MODULE_MANAGER, moduleManager)
if(!hasDiskSpace){
Cluster(sysActor).down(clusterAddr)
throw new Exception("not enough disk space")

View File

@ -15,7 +15,7 @@
package rep.network
import akka.actor.{Actor, Props}
import akka.actor.{Actor, Address, Props}
import com.google.protobuf.ByteString
import com.google.protobuf.timestamp.Timestamp
import rep.app.conf.SystemProfile
@ -27,7 +27,7 @@ import rep.network.tools.PeerExtension
import rep.protos.peer._
import rep.utils.GlobalUtils.ActorType
import rep.utils.{IdUtils, RepLogging, TimeUtils}
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import scala.concurrent.forkjoin.ThreadLocalRandom
import java.text.SimpleDateFormat
@ -166,12 +166,17 @@ class PeerHelper(name: String) extends ModuleBase(name) {
var chaincode = ""
override def preStart(): Unit =
{
override def preStart(): Unit = {
//注册接收交易的广播
SubscribeTopic(mediator, self, selfAddr, Topic.Transaction, true)
logMsg(LOG_TYPE.INFO,name,"Transaction Creator Start",selfAddr)
scheduler.scheduleOnce(5.seconds, self, Tick)
}
// override postRestart so we don't call preStart and schedule a new Tick
override def postRestart(reason: Throwable): Unit = ()
@ -196,14 +201,14 @@ class PeerHelper(name: String) extends ModuleBase(name) {
//invoke
// val cname = t.payload.get.chaincodeID.get.name
try{
createTransForLoop //在做tps测试到时候执行该函数并且注释其他代码
//createTransForLoop //在做tps测试到时候执行该函数并且注释其他代码
//val start = System.currentTimeMillis()
//val t3 = transactionCreator(pe.getSysTag,rep.protos.peer.Transaction.Type.CHAINCODE_INVOKE,
// "", "transfer" ,Seq(li2),"", Option(chaincode),rep.protos.peer.ChaincodeSpec.CodeType.CODE_JAVASCRIPT)
//getActorRef(ActorType.TRANSACTION_POOL) ! t3
val t3 = transactionCreator(pe.getSysTag,rep.protos.peer.Transaction.Type.CHAINCODE_INVOKE,
"", "transfer" ,Seq(li2),"", Option(chaincode),rep.protos.peer.ChaincodeSpec.CodeType.CODE_JAVASCRIPT)
getActorRef(ActorType.TRANSACTION_POOL) ! t3
//val end = System.currentTimeMillis()
//println(s"!!!!!!!!!!!!!!!!!!!!auto create trans time=${end-start}")
//scheduler.scheduleOnce(SystemProfile.getTranCreateDur.millis, self, TickInvoke)
scheduler.scheduleOnce(SystemProfile.getTranCreateDur.millis, self, TickInvoke)
}catch{
case e:RuntimeException => throw e
}
@ -213,16 +218,20 @@ class PeerHelper(name: String) extends ModuleBase(name) {
//自动循环不间断提交交易到系统用于压力测试或者tps测试时使用
def createTransForLoop={
var count : Int = 0;
if(pe.getSysTag == "1" || pe.getSysTag == "2" || pe.getSysTag=="3" || pe.getSysTag=="4")
if(pe.getSysTag == "1" )//|| pe.getSysTag == "2" || pe.getSysTag=="3" || pe.getSysTag=="4")
while(true){
try{
val start=System.currentTimeMillis()
//val start = System.currentTimeMillis()
val t3 = transactionCreator(pe.getSysTag,rep.protos.peer.Transaction.Type.CHAINCODE_INVOKE,
"", "transfer" ,Seq(li2),"", Option(chaincode),rep.protos.peer.ChaincodeSpec.CodeType.CODE_JAVASCRIPT)
getActorRef(ActorType.TRANSACTION_POOL) ! t3
//mediator ! Publish(Topic.Transaction, t3)
count += 1
if(count > 6000){
Thread.sleep(5000)
if(count > 1000){
val end = System.currentTimeMillis()
println("send 1000 trans spent = "+(end-start))
Thread.sleep(2000)
count = 0
}
//val end = System.currentTimeMillis()

View File

@ -19,7 +19,7 @@ import java.security.cert.Certificate
import com.google.protobuf.ByteString
import akka.actor.Props
import akka.actor.{Actor, Address, Props}
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import rep.app.conf.SystemProfile
import rep.crypto.ECDSASign
@ -56,13 +56,28 @@ object TransactionPool {
*/
class TransactionPool(moduleName: String) extends ModuleBase(moduleName) {
import akka.actor.ActorSelection
val dataaccess: ImpDataAccess = ImpDataAccess.GetDataAccess(pe.getSysTag)
override def preStart(): Unit = {
//注册接收交易的广播
SubscribeTopic(mediator, self, selfAddr, Topic.Transaction, true)
}
def toAkkaUrl(sn : Address, actorName:String):String = {
return sn.toString + "/" + actorName;
}
def visitStoreService(sn : Address, actorName:String,t1:Transaction) = {
try {
val selection : ActorSelection = context.actorSelection(toAkkaUrl(sn , actorName));
selection ! t1
} catch {
case e: Exception => e.printStackTrace()
}
}
/**
* 检查交易是否符合规则
* @param t
@ -101,10 +116,19 @@ class TransactionPool(moduleName: String) extends ModuleBase(moduleName) {
//处理接收的交易
case t: Transaction =>
//我们在这里并不缓存该Transaction在接收到同级别的广播时再进行缓存
// mediator ! Publish(Topic.Transaction, t)
//pe.getStableNodes.foreach(sn=>{
// visitStoreService(sn , "/user/moduleManager/transactionPool",t)
//})
if (ActorUtils.isHelper(sender().path.toString) ||
ActorUtils.isAPI(sender().path.toString)) {
//广播交易
mediator ! Publish(Topic.Transaction, t)
//广播发送交易事件
sendEvent(EventType.PUBLISH_INFO, mediator, selfAddr, Topic.Transaction, Event.Action.TRANSACTION)
} else {

View File

@ -510,8 +510,8 @@ class BlockModule(moduleName: String) extends ModuleBase(moduleName) {
blockTimeMgr.writeTime(pe.getSysTag,pe.getCurrentBlockHash,pe.getCacheHeight(),timeType.sendblock_end,System.currentTimeMillis())
blockTimeMgr.writeTime(pe.getSysTag,pe.getCurrentBlockHash,pe.getCacheHeight(),timeType.store_start,System.currentTimeMillis())
getActorRef(pe.getSysTag, ActorType.PERSISTENCE_MODULE) ! BlockRestore(blk, height, BlockSrc.CONFIRMED_BLOCK, actRef)
getActorRef(pe.getSysTag, ActorType.PERSISTENCE_MODULE) ! PersistenceModule.LastBlock(BlockHelper.getBlkHash(blk), 0,
BlockSrc.CONFIRMED_BLOCK, self)
//getActorRef(pe.getSysTag, ActorType.PERSISTENCE_MODULE) ! PersistenceModule.LastBlock(BlockHelper.getBlkHash(blk), 0,
// BlockSrc.CONFIRMED_BLOCK, self)
if (isThisAddr(selfAddr, pe.getBlocker.toString)) {
//Thread.sleep(500)
logMsg(LOG_TYPE.INFO, "block store finish,start new vote ...")

View File

@ -56,6 +56,15 @@ class Endorse4Blocker(moduleName: String) extends ModuleBase(moduleName) {
}
}
def visitStoreService(sn : Address, actorName:String,cb:ConfirmedBlock) = {
try {
val selection : ActorSelection = context.actorSelection(toAkkaUrl(sn , actorName));
selection ! cb
} catch {
case e: Exception => e.printStackTrace()
}
}
def addEndoserNode(endaddr:String,actorName:String)={
if(endaddr.indexOf(actorName)>0){
val addr = endaddr.substring(0, endaddr.indexOf(actorName))
@ -240,8 +249,18 @@ class Endorse4Blocker(moduleName: String) extends ModuleBase(moduleName) {
logMsg(LOG_TYPE.INFO, s"new block,nodename=${pe.getSysTag},transaction size=${blc.transactions.size},identifier=${this.blkidentifier_str},${blkidentifier},current height=${dataaccess.getBlockChainInfo().height},previoushash=${blc.previousBlockHash.toStringUtf8()}")
blockTimeMgr.writeTime(pe.getSysTag,pe.getCurrentBlockHash,pe.getCacheHeight(),timeType.endorse_end,System.currentTimeMillis())
blockTimeMgr.writeTime(pe.getSysTag,pe.getCurrentBlockHash,pe.getCacheHeight(),timeType.sendblock_start,System.currentTimeMillis())
mediator ! Publish(Topic.Block, new ConfirmedBlock(blc, dataaccess.getBlockChainInfo().height + 1,
sender))
// mediator ! Publish(Topic.Block, new ConfirmedBlock(blc, dataaccess.getBlockChainInfo().height + 1,
// sender))
pe.getStableNodes.foreach(sn=>{
visitStoreService(sn , "/user/moduleManager/consensusManager/consensus-CRFD/blocker",new ConfirmedBlock(blc, dataaccess.getBlockChainInfo().height + 1,
sender))
})
logMsg(LOG_TYPE.INFO, s"java sort spent time=${javaend-javastart}")
logMsg(LOG_TYPE.INFO, s"recv newBlock msg,node number=${pe.getSysTag},new block height=${dataaccess.getBlockChainInfo().height + 1}")

View File

@ -234,10 +234,10 @@ class EndorsementModule(moduleName: String) extends ModuleBase(moduleName) {
/*schedulerLink = scheduler.scheduleOnce(1 seconds, self, EndorsementModule.VerifySignTimeout )
dispatchTransSignVerify(blk.transactions.toArray[Transaction])*/
//使用多线程来验证签名
var findflag = findTransPoolTx(blk.transactions.toArray[Transaction])
val b = verifyobject.StartVerify(blk.transactions.toArray[Transaction],findflag)
//var findflag = findTransPoolTx(blk.transactions.toArray[Transaction])
//val b = verifyobject.StartVerify(blk.transactions.toArray[Transaction],findflag)
//使用对象内的方法来验证签名
//val b = verifyTransSign(blk.transactions.toArray[Transaction])
val b = verifyTransSign(blk.transactions.toArray[Transaction])
if(b){
val checksignresulttime = System.currentTimeMillis()
logMsg(LOG_TYPE.WARN, s"Block endorse success,current height=${pe.getCacheHeight()},identifier=${blkidentifier}")

View File

@ -285,8 +285,8 @@ class SyncModule(moduleName: String) extends ModuleBase(moduleName) {
rec ! ChainDataResSingleBlk(blk, height)
height += 1
})
rec ! PersistenceModule.LastBlock(BlockHelper.getBlkHash(data(data.length - 1)), local.height,
BlockSrc.SYNC_START_BLOCK, self)
// rec ! PersistenceModule.LastBlock(BlockHelper.getBlkHash(data(data.length - 1)), local.height,
// BlockSrc.SYNC_START_BLOCK, self)
case BlockSrc.CONFIRMED_BLOCK =>
//ignore
rec ! ChainDataResSingleBlk(data(0), infoH + 1)

View File

@ -6,8 +6,8 @@ import org.slf4j.LoggerFactory
object timeAnalysis {
protected def log = LoggerFactory.getLogger(this.getClass)
val count = 600
val outType = 0 //1=print 0=log
val count = 50
val outType = 1 //1=print 0=log
var q : ArrayBuffer[blocktime] = new ArrayBuffer[blocktime]()
var BlockerStart : Long = 0l
var BlockerEnd :Long = 0l