repchain2.0-update86:整理代码:删除API、合约、共识部分的专门线程池的设置。

This commit is contained in:
jiangbuyun 2022-07-24 12:52:19 +08:00
parent 13fb4c5071
commit 12af1ef720
9 changed files with 37 additions and 646 deletions

View File

@ -101,73 +101,9 @@ akka {
}
}
//创世块
genesisblock {
creationBlockTime = 1495451252086
}
}
contract-dispatcher {
#executor = “thread-pool-executor”
#type = Dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) . . . ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 3
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
http-dispatcher {
#executor = “thread-pool-executor”
#type = Dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) . . . ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 2
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
consensus-dispatcher {
#executor = “thread-pool-executor”
#type = Dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) . . . ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 2
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}

View File

@ -1,253 +0,0 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
# Options: ERROR, WARNING, INFO, DEBUG
loglevel = "INFO"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
cluster {
akka.cluster.log-info = on
log-info-verbose = on
#jmx.enabled = on
jmx.multi-mbeans-in-same-jvm = on
#min-nr-of-members = 4
}
actor {
serialize-messages = off
serialize-creators = off
enable-additional-serialization-bindings = on
allow-java-serialization = off
provider = "akka.cluster.ClusterActorRefProvider"
#provider = "cluster"
serializers {
kryo = "com.twitter.chill.akka.AkkaSerializer"
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"java.lang.String" = java
"java.io.Serializable" = kryo
"com.google.protobuf.Message" = proto
"scalapb.GeneratedMessage" = proto
}
}
remote {
classic {
netty.tcp {
transport-class = "akka.remote.transport.netty.NettyTransport"
#mesage传输块大小配置
send-buffer-size = 2048000b
receive-buffer-size = 2048000b
maximum-frame-size = 1024000b
enable-ssl = true
ssl-engine-provider = akka.remote.transport.netty.ConfigSSLEngineProvider
security {
key-store = "jks/121000005l35120456.node1.jks"
trust-store = "jks/mytruststore.jks" // 加载信任列表证书用的_tls1.3
#trust-store = "jks/mytrust" // ssl用的_ca
trust-store-mm = "jks/mytruststore.jks" // 加载信任列表证书用的_origin
key-store-password = "123"
key-password = "123"
trust-store-password = "changeme" // 加载信任列表证书用的_tls1.3
#trust-store-password = "changeit" // ssl用的_ca
trust-store-password-mm = "changeme" // 加载信任列表证书用的_origin
protocol = "TLSv1.3"
enabled-algorithms = [TLS_AES_128_GCM_SHA256]
require-mutual-authentication = on
random-number-generator = "SecureRandom"
}
}
netty.ssl.security {
key-store = "jks/121000005l35120456.node1.jks"
trust-store = "jks/mytruststore.jks" // 加载信任列表证书用的_tls1.3
#trust-store = "jks/mytrust" // ssl用的_ca
trust-store-mm = "jks/mytruststore.jks" // 加载信任列表证书用的_origin
key-store-password = "123"
key-password = "123"
trust-store-password = "changeme" // 加载信任列表证书用的_tls1.3
#trust-store-password = "changeit" // ssl用的_ca
trust-store-password-mm = "changeme" // 加载信任列表证书用的_origin
protocol = "TLSv1.3"
enabled-algorithms = [TLS_AES_128_GCM_SHA256]
require-mutual-authentication = on
random-number-generator = "SecureRandom"
}
}
artery {
# Disable artery with this flag
enabled = on
# Select the underlying transport implementation.
# Possible values: aeron-udp, tcp, tls-tcp
transport = tls-tcp
#log-received-messages = on
#log-sent-messages = on
advanced {
# Maximum serialized message size, including header data.
maximum-frame-size = 5120 KiB
# Direct byte buffers are reused in a pool with this maximum size.
# Each buffer has the size of 'maximum-frame-size'.
# This is not a hard upper limit on number of created buffers. Additional
# buffers will be created if needed, e.g. when using many outbound
# associations at the same time. Such additional buffers will be garbage
# collected, which is not as efficient as reusing buffers in the pool.
buffer-pool-size = 256
# Maximum serialized message size for the large messages, including header data.
# It is currently restricted to 1/8th the size of a term buffer that can be
# configured by setting the 'aeron.term.buffer.length' system property.
# See 'large-message-destinations'.
maximum-large-frame-size = 5 MiB
# Direct byte buffers for the large messages are reused in a pool with this maximum size.
# Each buffer has the size of 'maximum-large-frame-size'.
# See 'large-message-destinations'.
# This is not a hard upper limit on number of created buffers. Additional
# buffers will be created if needed, e.g. when using many outbound
# associations at the same time. Such additional buffers will be garbage
# collected, which is not as efficient as reusing buffers in the pool.
large-buffer-pool-size = 64
outbound-message-queue-size = 30720
#inbound-lanes = 1
#outbound-lanes = 1
}
# SSL configuration that is used when transport=tls-tcp.
ssl {
# Factory of SSLEngine.
# Must implement akka.remote.artery.tcp.SSLEngineProvider and have a public
# constructor with an ActorSystem parameter.
# The default ConfigSSLEngineProvider is configured by properties in section
# akka.remote.artery.ssl.config-ssl-engine
#ssl-engine-provider = akka.remote.artery.tcp.ConfigSSLEngineProvider
#ssl-engine-provider = akka.remote.artery.tcp.ConfigSSLEngineProvider
ssl-engine-provider = rep.crypto.GMSSLEngineProvider
# Config of akka.remote.artery.tcp.ConfigSSLEngineProvider
config-ssl-engine {
base-path = "pfx/"
#base-path = "jks/"
key-store = "pfx/sm2.test_node1.both.pfx"
#key-store = "jks/121000005l35120456.node1.jks"
trust-store = "pfx/mytruststore.pfx" // 加载信任列表证书用的_tls1.3
#trust-store = "jks/mytruststore.jks" // 加载信任列表证书用的_tls1.3
#trust-store = "jks/mytrust" // ssl用的_ca
trust-store-mm = "pfx/mytruststore.pfx" // 加载信任列表证书用的_origin
#trust-store-mm = "jks/mytruststore.jks" // 加载信任列表证书用的_origin
key-store-password = "12345678"
key-password = "12345678"
trust-store-password = "changeme" // 加载信任列表证书用的_tls1.3
#trust-store-password = "changeit" // ssl用的_ca
trust-store-password-mm = "changeme" // 加载信任列表证书用的_origin
protocol = "GMSSLv1.1"
#protocol = "TLSv1.2"
#enabled-algorithms = [TLS_AES_128_GCM_SHA256]
enabled-algorithms = [GMSSL_ECC_SM4_SM3]
#enabled-algorithms = [TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256]
#require-mutual-authentication = on
random-number-generator = "SecureRandom"
# Require mutual authentication between TLS peers
#
# Without mutual authentication only the peer that actively establishes a connection (TLS client side)
# checks if the passive side (TLS server side) sends over a trusted certificate. With the flag turned on,
# the passive side will also request and verify a certificate from the connecting peer.
#
# To prevent man-in-the-middle attacks this setting is enabled by default.
require-mutual-authentication = on
# Set this to `on` to verify hostnames with sun.security.util.HostnameChecker
hostname-verification = off
}
}
}
}
//创世块
genesisblock {
creationBlockTime = 1495451252086
}
}
contract-dispatcher {
#executor = “thread-pool-executor”
#type = Dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) . . . ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 3
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
http-dispatcher {
#executor = “thread-pool-executor”
#type = Dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) . . . ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 2
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
consensus-dispatcher {
#executor = “thread-pool-executor”
#type = Dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) . . . ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 2
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}

View File

@ -1,247 +0,0 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
# Options: ERROR, WARNING, INFO, DEBUG
loglevel = "INFO"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
cluster {
akka.cluster.log-info = on
log-info-verbose = on
#jmx.enabled = on
jmx.multi-mbeans-in-same-jvm = on
#min-nr-of-members = 4
}
actor {
serialize-messages = off
serialize-creators = off
enable-additional-serialization-bindings = on
allow-java-serialization = off
provider = "akka.cluster.ClusterActorRefProvider"
#provider = "cluster"
serializers {
kryo = "com.twitter.chill.akka.AkkaSerializer"
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"java.lang.String" = java
"java.io.Serializable" = kryo
"com.google.protobuf.Message" = proto
"scalapb.GeneratedMessage" = proto
}
}
remote {
classic {
netty.tcp {
transport-class = "akka.remote.transport.netty.NettyTransport"
#mesage传输块大小配置
send-buffer-size = 2048000b
receive-buffer-size = 2048000b
maximum-frame-size = 1024000b
enable-ssl = true
ssl-engine-provider = akka.remote.transport.netty.ConfigSSLEngineProvider
security {
key-store = "jks/121000005l35120456.node1.jks"
trust-store = "jks/mytruststore.jks" // 加载信任列表证书用的_tls1.3
#trust-store = "jks/mytrust" // ssl用的_ca
trust-store-mm = "jks/mytruststore.jks" // 加载信任列表证书用的_origin
key-store-password = "123"
key-password = "123"
trust-store-password = "changeme" // 加载信任列表证书用的_tls1.3
#trust-store-password = "changeit" // ssl用的_ca
trust-store-password-mm = "changeme" // 加载信任列表证书用的_origin
protocol = "TLSv1.3"
enabled-algorithms = [TLS_AES_128_GCM_SHA256]
require-mutual-authentication = on
random-number-generator = "SecureRandom"
}
}
netty.ssl.security {
key-store = "jks/121000005l35120456.node1.jks"
trust-store = "jks/mytruststore.jks" // 加载信任列表证书用的_tls1.3
#trust-store = "jks/mytrust" // ssl用的_ca
trust-store-mm = "jks/mytruststore.jks" // 加载信任列表证书用的_origin
key-store-password = "123"
key-password = "123"
trust-store-password = "changeme" // 加载信任列表证书用的_tls1.3
#trust-store-password = "changeit" // ssl用的_ca
trust-store-password-mm = "changeme" // 加载信任列表证书用的_origin
protocol = "TLSv1.3"
enabled-algorithms = [TLS_AES_128_GCM_SHA256]
require-mutual-authentication = on
random-number-generator = "SecureRandom"
}
}
artery {
# Disable artery with this flag
enabled = on
# Select the underlying transport implementation.
# Possible values: aeron-udp, tcp, tls-tcp
transport = tls-tcp
#log-received-messages = on
#log-sent-messages = on
advanced {
# Maximum serialized message size, including header data.
maximum-frame-size = 5120 KiB
# Direct byte buffers are reused in a pool with this maximum size.
# Each buffer has the size of 'maximum-frame-size'.
# This is not a hard upper limit on number of created buffers. Additional
# buffers will be created if needed, e.g. when using many outbound
# associations at the same time. Such additional buffers will be garbage
# collected, which is not as efficient as reusing buffers in the pool.
buffer-pool-size = 256
# Maximum serialized message size for the large messages, including header data.
# It is currently restricted to 1/8th the size of a term buffer that can be
# configured by setting the 'aeron.term.buffer.length' system property.
# See 'large-message-destinations'.
maximum-large-frame-size = 5 MiB
# Direct byte buffers for the large messages are reused in a pool with this maximum size.
# Each buffer has the size of 'maximum-large-frame-size'.
# See 'large-message-destinations'.
# This is not a hard upper limit on number of created buffers. Additional
# buffers will be created if needed, e.g. when using many outbound
# associations at the same time. Such additional buffers will be garbage
# collected, which is not as efficient as reusing buffers in the pool.
large-buffer-pool-size = 64
outbound-message-queue-size = 30720
#inbound-lanes = 1
#outbound-lanes = 1
}
# SSL configuration that is used when transport=tls-tcp.
ssl {
# Factory of SSLEngine.
# Must implement akka.remote.artery.tcp.SSLEngineProvider and have a public
# constructor with an ActorSystem parameter.
# The default ConfigSSLEngineProvider is configured by properties in section
# akka.remote.artery.ssl.config-ssl-engine
#ssl-engine-provider = akka.remote.artery.tcp.ConfigSSLEngineProvider
ssl-engine-provider = akka.remote.artery.tcp.ConfigSSLEngineProvider
#ssl-engine-provider = rep.crypto.GMSSLEngineProvider
# Config of akka.remote.artery.tcp.ConfigSSLEngineProvider
config-ssl-engine {
base-path = "jks/"
key-store = "jks/121000005l35120456.node1.jks"
trust-store = "jks/mytruststore.jks" // 加载信任列表证书用的_tls1.3
#trust-store = "jks/mytrust" // ssl用的_ca
trust-store-mm = "jks/mytruststore.jks" // 加载信任列表证书用的_origin
key-store-password = "123"
key-password = "123"
trust-store-password = "changeme" // 加载信任列表证书用的_tls1.3
#trust-store-password = "changeit" // ssl用的_ca
trust-store-password-mm = "changeme" // 加载信任列表证书用的_origin
protocol = "TLSv1.2"
#enabled-algorithms = [TLS_AES_128_GCM_SHA256]
enabled-algorithms = [TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256]
#require-mutual-authentication = on
random-number-generator = "SecureRandom"
# Require mutual authentication between TLS peers
#
# Without mutual authentication only the peer that actively establishes a connection (TLS client side)
# checks if the passive side (TLS server side) sends over a trusted certificate. With the flag turned on,
# the passive side will also request and verify a certificate from the connecting peer.
#
# To prevent man-in-the-middle attacks this setting is enabled by default.
require-mutual-authentication = on
# Set this to `on` to verify hostnames with sun.security.util.HostnameChecker
hostname-verification = off
}
}
}
}
//创世块
genesisblock {
creationBlockTime = 1495451252086
}
}
contract-dispatcher {
#executor = “thread-pool-executor”
#type = Dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) . . . ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 3
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
http-dispatcher {
#executor = “thread-pool-executor”
#type = Dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) . . . ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 2
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
consensus-dispatcher {
#executor = “thread-pool-executor”
#type = Dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) . . . ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 2
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}

View File

@ -1,42 +0,0 @@
package rep.api.rest
import java.util.concurrent.atomic.AtomicLong
import akka.actor.{ActorRef, ActorSystem}
class RestRouter(ActorNumber:Int,system: ActorSystem) {
private var ras: Array[ActorRef] = new Array[ActorRef](ActorNumber)
//private var ras4Transaction: Array[ActorRef] = new Array[ActorRef](ActorNumber/2)
private val nextActor : AtomicLong = new AtomicLong(0)
CreateActor
//CreateActor4Transaction
private def CreateActor={
for (i <- 0 to ActorNumber - 1) {
var ra = system.actorOf(RestActor.props("api_" + i).withDispatcher("http-dispatcher"), "api_"+i)
ras(i) = ra
}
}
/*private def CreateActor4Transaction={
for (i <- 0 to ActorNumber/2 - 1) {
var ra = system.actorOf(AcceptTransactionActor.props("api_trans_" + i).withDispatcher("http-dispatcher"), "trans_"+i)
ras4Transaction(i) = ra
}
}*/
def getRestActor:ActorRef={
val size = ras.length
val index = (nextActor.getAndIncrement % size).asInstanceOf[Int]
ras(if (index < 0) size + index else index)
}
/*def getRestActor4Transaction:ActorRef={
val size = ras.length
val index = (nextActor.getAndIncrement % size).asInstanceOf[Int]
ras4Transaction(if (index < 0) size + index else index)
}*/
}

View File

@ -17,7 +17,7 @@
package rep.api.rest
import java.io.File
import akka.actor.ActorRef
import scala.util.{Failure, Success}
import scala.concurrent.{ExecutionContext, Future}
import akka.util.Timeout
@ -58,7 +58,7 @@ import rep.log.RepLogger
*/
@Tag(name = "chaininfo", description = "获得当前区块链信息")
@Path("/chaininfo")
class ChainService(ra: RestRouter)(implicit executionContext: ExecutionContext)
class ChainService(ra: ActorRef)(implicit executionContext: ExecutionContext)
extends Directives {
import akka.pattern.ask
@ -81,7 +81,7 @@ class ChainService(ra: RestRouter)(implicit executionContext: ExecutionContext)
extractClientIP { ip =>
RepLogger.debug(RepLogger.APIAccess_Logger, s"remoteAddr=${ip} get chaininfo")
complete {
(ra.getRestActor ? ChainInfo).mapTo[QueryResult]
(ra ? ChainInfo).mapTo[QueryResult]
}
}
}
@ -99,7 +99,7 @@ class ChainService(ra: RestRouter)(implicit executionContext: ExecutionContext)
extractClientIP { ip =>
RepLogger.debug(RepLogger.APIAccess_Logger, s"remoteAddr=${ip} get node number")
complete {
(ra.getRestActor ? NodeNumber).mapTo[QueryResult]
(ra ? NodeNumber).mapTo[QueryResult]
}
}
}
@ -118,7 +118,7 @@ class ChainService(ra: RestRouter)(implicit executionContext: ExecutionContext)
extractClientIP { ip =>
RepLogger.debug(RepLogger.APIAccess_Logger, s"remoteAddr=${ip} get number of cache")
complete {
(ra.getRestActor ? TransNumber).mapTo[QueryResult]
(ra ? TransNumber).mapTo[QueryResult]
}
}
}
@ -136,7 +136,7 @@ class ChainService(ra: RestRouter)(implicit executionContext: ExecutionContext)
extractClientIP { ip =>
RepLogger.debug(RepLogger.APIAccess_Logger, s"remoteAddr=${ip} get number of accepted")
complete {
(ra.getRestActor ? AcceptedTransNumber).mapTo[QueryResult]
(ra ? AcceptedTransNumber).mapTo[QueryResult]
}
}
}
@ -151,7 +151,7 @@ class ChainService(ra: RestRouter)(implicit executionContext: ExecutionContext)
@Tag(name = "block", description = "获得区块数据")
@Path("/block")
class BlockService(ra: RestRouter, repContext: RepChainSystemContext, isCheckClientPermission: Boolean)(implicit executionContext: ExecutionContext)
class BlockService(ra: ActorRef, repContext: RepChainSystemContext, isCheckClientPermission: Boolean)(implicit executionContext: ExecutionContext)
extends Directives {
import akka.pattern.ask
@ -184,7 +184,7 @@ class BlockService(ra: RestRouter, repContext: RepChainSystemContext, isCheckCli
try {
if (cert != null && repContext.getPermissionVerify.CheckPermissionOfX509Certificate(cert, "block.hash", null)) {
complete {
(ra.getRestActor ? BlockId(blockId)).mapTo[QueryResult]
(ra ? BlockId(blockId)).mapTo[QueryResult]
}
} else {
complete(QueryResult(Option(JsonMethods.parse(string2JsonInput(PermissionVerify.errorInfo_None_Permission)))))
@ -196,7 +196,7 @@ class BlockService(ra: RestRouter, repContext: RepChainSystemContext, isCheckCli
}
} else {
complete {
(ra.getRestActor ? BlockId(blockId)).mapTo[QueryResult]
(ra ? BlockId(blockId)).mapTo[QueryResult]
}
}
}
@ -222,7 +222,7 @@ class BlockService(ra: RestRouter, repContext: RepChainSystemContext, isCheckCli
try {
if (cert != null && repContext.getPermissionVerify.CheckPermissionOfX509Certificate(cert, "block.blockHeight", null)) {
complete {
(ra.getRestActor ? BlockHeight(blockHeight.toInt)).mapTo[QueryResult]
(ra ? BlockHeight(blockHeight.toInt)).mapTo[QueryResult]
}
} else {
complete(QueryResult(Option(JsonMethods.parse(string2JsonInput(PermissionVerify.errorInfo_None_Permission)))))
@ -234,7 +234,7 @@ class BlockService(ra: RestRouter, repContext: RepChainSystemContext, isCheckCli
}
} else {
complete {
(ra.getRestActor ? BlockHeight(blockHeight.toInt)).mapTo[QueryResult]
(ra ? BlockHeight(blockHeight.toInt)).mapTo[QueryResult]
}
}
}
@ -257,7 +257,7 @@ class BlockService(ra: RestRouter, repContext: RepChainSystemContext, isCheckCli
post {
entity(as[Map[String, Long]]) { blockQuery =>
complete {
(ra.getRestActor ? TransNumberOfBlock(blockQuery("height"))).mapTo[QueryResult]
(ra ? TransNumberOfBlock(blockQuery("height"))).mapTo[QueryResult]
}
}
}
@ -277,7 +277,7 @@ class BlockService(ra: RestRouter, repContext: RepChainSystemContext, isCheckCli
extractClientIP { ip =>
RepLogger.debug(RepLogger.APIAccess_Logger, s"remoteAddr=${ip} get block time for Height,block height=${blockHeight}")
complete {
(ra.getRestActor ? BlockTimeForHeight(blockHeight.toLong)).mapTo[QueryResult]
(ra ? BlockTimeForHeight(blockHeight.toLong)).mapTo[QueryResult]
}
}
}
@ -297,7 +297,7 @@ class BlockService(ra: RestRouter, repContext: RepChainSystemContext, isCheckCli
extractClientIP { ip =>
RepLogger.debug(RepLogger.APIAccess_Logger, s"remoteAddr=${ip} get block time for txid,txid=${transid}")
complete {
(ra.getRestActor ? BlockTimeForTxid(transid)).mapTo[QueryResult]
(ra ? BlockTimeForTxid(transid)).mapTo[QueryResult]
}
}
}
@ -321,7 +321,7 @@ class BlockService(ra: RestRouter, repContext: RepChainSystemContext, isCheckCli
val cert = RepChainConfigFilePathMgr.getCert(sessionInfo)
try {
if (cert != null && repContext.getPermissionVerify.CheckPermissionOfX509Certificate(cert, "block.stream", null)) {
complete((ra.getRestActor ? BlockHeightStream(blockHeight.toInt)).mapTo[HttpResponse])
complete((ra ? BlockHeightStream(blockHeight.toInt)).mapTo[HttpResponse])
} else {
complete(QueryResult(Option(JsonMethods.parse(string2JsonInput(PermissionVerify.errorInfo_None_Permission)))))
}
@ -331,7 +331,7 @@ class BlockService(ra: RestRouter, repContext: RepChainSystemContext, isCheckCli
}
}
} else {
complete((ra.getRestActor ? BlockHeightStream(blockHeight.toInt)).mapTo[HttpResponse])
complete((ra ? BlockHeightStream(blockHeight.toInt)).mapTo[HttpResponse])
}
}
}
@ -347,7 +347,7 @@ class BlockService(ra: RestRouter, repContext: RepChainSystemContext, isCheckCli
@Consumes(Array(MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML, MediaType.MULTIPART_FORM_DATA))
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/transaction")
class TransactionService(ra: RestRouter, repContext: RepChainSystemContext, isCheckClientPermission: Boolean)(implicit executionContext: ExecutionContext)
class TransactionService(ra: ActorRef, repContext: RepChainSystemContext, isCheckClientPermission: Boolean)(implicit executionContext: ExecutionContext)
extends Directives {
import akka.pattern.ask
@ -416,7 +416,7 @@ class TransactionService(ra: RestRouter, repContext: RepChainSystemContext, isCh
try {
if (cert != null && repContext.getPermissionVerify.CheckPermissionOfX509Certificate(cert, "transaction", null)) {
complete {
(ra.getRestActor ? TransactionId(transactionId)).mapTo[QueryResult]
(ra ? TransactionId(transactionId)).mapTo[QueryResult]
}
} else {
complete(QueryResult(Option(JsonMethods.parse(string2JsonInput(PermissionVerify.errorInfo_None_Permission)))))
@ -428,7 +428,7 @@ class TransactionService(ra: RestRouter, repContext: RepChainSystemContext, isCh
}
} else {
complete {
(ra.getRestActor ? TransactionId(transactionId)).mapTo[QueryResult]
(ra ? TransactionId(transactionId)).mapTo[QueryResult]
}
}
}
@ -454,7 +454,7 @@ class TransactionService(ra: RestRouter, repContext: RepChainSystemContext, isCh
val cert = RepChainConfigFilePathMgr.getCert(sessionInfo)
try {
if (cert != null && repContext.getPermissionVerify.CheckPermissionOfX509Certificate(cert, "transaction.stream", null)) {
complete((ra.getRestActor ? TransactionStreamId(transactionId)).mapTo[HttpResponse])
complete((ra ? TransactionStreamId(transactionId)).mapTo[HttpResponse])
} else {
complete(QueryResult(Option(JsonMethods.parse(string2JsonInput(PermissionVerify.errorInfo_None_Permission)))))
}
@ -464,7 +464,7 @@ class TransactionService(ra: RestRouter, repContext: RepChainSystemContext, isCh
}
}
} else {
complete((ra.getRestActor ? TransactionStreamId(transactionId)).mapTo[HttpResponse])
complete((ra ? TransactionStreamId(transactionId)).mapTo[HttpResponse])
}
}
}
@ -489,7 +489,7 @@ class TransactionService(ra: RestRouter, repContext: RepChainSystemContext, isCh
val cert = RepChainConfigFilePathMgr.getCert(sessionInfo)
try {
if (cert != null && repContext.getPermissionVerify.CheckPermissionOfX509Certificate(cert, "transaction.tranInfoAndHeight", null)) {
complete((ra.getRestActor ? TranInfoAndHeightId(transactionId)).mapTo[QueryResult])
complete((ra ? TranInfoAndHeightId(transactionId)).mapTo[QueryResult])
} else {
complete(QueryResult(Option(JsonMethods.parse(string2JsonInput(PermissionVerify.errorInfo_None_Permission)))))
}
@ -499,7 +499,7 @@ class TransactionService(ra: RestRouter, repContext: RepChainSystemContext, isCh
}
}
} else {
complete((ra.getRestActor ? TranInfoAndHeightId(transactionId)).mapTo[QueryResult])
complete((ra ? TranInfoAndHeightId(transactionId)).mapTo[QueryResult])
}
@ -524,7 +524,7 @@ class TransactionService(ra: RestRouter, repContext: RepChainSystemContext, isCh
post {
entity(as[String]) { trans =>
complete {
(ra.getRestActor ? tranSign(trans)).mapTo[PostResult]
(ra ? tranSign(trans)).mapTo[PostResult]
}
// complete { (StatusCodes.Accepted, PostResult("hahhaha",None, Some("处理存在异常"))) }
}
@ -559,7 +559,7 @@ class TransactionService(ra: RestRouter, repContext: RepChainSystemContext, isCh
onComplete(tranFuture) {
case Success(tranByteString) =>
complete {
(ra.getRestActor ? Transaction.parseFrom(tranByteString.toArray)).mapTo[PostResult]
(ra ? Transaction.parseFrom(tranByteString.toArray)).mapTo[PostResult]
}
case Failure(ex) =>
complete(StatusCodes.InternalServerError, ex.getMessage)
@ -589,7 +589,7 @@ class TransactionService(ra: RestRouter, repContext: RepChainSystemContext, isCh
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
entity(as[CSpec]) { request =>
complete {
(ra.getRestActor ? request).mapTo[PostResult]
(ra ? request).mapTo[PostResult]
}
}
}
@ -603,7 +603,7 @@ class TransactionService(ra: RestRouter, repContext: RepChainSystemContext, isCh
*/
@Tag(name = "db", description = "查询合约存储在DB中的数据")
@Path("/db")
class DbService(ra: RestRouter, repContext: RepChainSystemContext, isCheckClientPermission: Boolean)(implicit executionContext: ExecutionContext)
class DbService(ra: ActorRef, repContext: RepChainSystemContext, isCheckClientPermission: Boolean)(implicit executionContext: ExecutionContext)
extends Directives {
import akka.pattern.ask
@ -634,7 +634,7 @@ class DbService(ra: RestRouter, repContext: RepChainSystemContext, isCheckClient
if (cert != null && repContext.getPermissionVerify.CheckPermissionOfX509Certificate(cert, "db.query", null)) {
entity(as[QueryDB]) { query: QueryDB =>
complete {
(ra.getRestActor ? query).mapTo[QueryResult]
(ra ? query).mapTo[QueryResult]
}
}
} else {
@ -648,7 +648,7 @@ class DbService(ra: RestRouter, repContext: RepChainSystemContext, isCheckClient
} else {
entity(as[QueryDB]) { query: QueryDB =>
complete {
(ra.getRestActor ? query).mapTo[QueryResult]
(ra ? query).mapTo[QueryResult]
}
}
}

View File

@ -76,9 +76,9 @@ class IModuleManager(moduleName: String, isStartup: Boolean) extends ModuleBase(
private def loadTransModule:Any={
//if (this.isStartup) {
pe.register(ModuleActorType.ActorType.transactiondispatcher, context.actorOf(TransactionDispatcher.props("transactiondispatcher").withDispatcher("contract-dispatcher"), "transactiondispatcher"))
pe.register(ModuleActorType.ActorType.transactiondispatcher, context.actorOf(TransactionDispatcher.props("transactiondispatcher"), "transactiondispatcher"))
//}
pe.register(ModuleActorType.ActorType.dispatchofpreload, context.actorOf(DispatchOfPreload.props("dispatchofpreload").withDispatcher("contract-dispatcher"), "dispatchofpreload"))
pe.register(ModuleActorType.ActorType.dispatchofpreload, context.actorOf(DispatchOfPreload.props("dispatchofpreload"), "dispatchofpreload"))
}
private def loadClusterModule = {

View File

@ -232,7 +232,7 @@ class SandboxDispatcher(moduleName: String, cid: String) extends ModuleBase(modu
case ChaincodeDeploy.CodeType.CODE_JAVASCRIPT =>
null
case ChaincodeDeploy.CodeType.CODE_SCALA =>
context.actorOf(Props(new SandboxScala(cid)).withDispatcher("contract-dispatcher"), sandboxName)
context.actorOf(Props(new SandboxScala(cid)), sandboxName)
case ChaincodeDeploy.CodeType.CODE_VCL_DLL =>
null
case ChaincodeDeploy.CodeType.CODE_VCL_EXE =>

View File

@ -29,7 +29,7 @@ class TransactionDispatcher(moduleName: String) extends ModuleBase(moduleName) {
RepLogger.debug(RepLogger.Sandbox_Logger, s"transaction dispatcher for ${cid} is exist.")
this.TransActors(cid)
} else {
val sd = context.actorOf(SandboxDispatcher.props("sandbox_dispatcher_" + cid, cid).withDispatcher("contract-dispatcher"), "sandbox_dispatcher_" + cid)
val sd = context.actorOf(SandboxDispatcher.props("sandbox_dispatcher_" + cid, cid), "sandbox_dispatcher_" + cid)
this.TransActors += cid -> sd
RepLogger.debug(RepLogger.Sandbox_Logger, s"create transaction dispatcher for ${cid} .")
sd

View File

@ -84,11 +84,9 @@ object EventServer {
def start(sys:ActorSystem ,repContext: RepChainSystemContext) {
implicit val system =sys
implicit val materializer = ActorMaterializer()
//implicit val executionContext = system.dispatcher
implicit val executionContext = system.dispatcher
implicit val executionContext = system.dispatchers.lookup("http-dispatcher")
val evtactor = system.actorOf(Props[RecvEventActor].withDispatcher("http-dispatcher"),"RecvEventActor")
val evtActor = system.actorOf(Props[RecvEventActor],"RecvEventActor")
val port = repContext.getConfig.getHttpServicePort
val actorNumber = repContext.getConfig.getHttpServiceActorNumber
@ -104,7 +102,7 @@ object EventServer {
}~ //提供Event的WebSocket订阅服务
path("event") {
get {
val sourceGraph: Graph[SourceShape[Event], NotUsed] = new EventActor4Stage(evtactor)
val sourceGraph: Graph[SourceShape[Event], NotUsed] = new EventActor4Stage(evtActor)
val source: Source[Event, NotUsed] = Source.fromGraph(sourceGraph)
extractWebSocketUpgrade
@ -118,8 +116,7 @@ object EventServer {
}
}
//val ra = sys.actorOf(RestActor.props("api"), "api")
val ra = new RestRouter(actorNumber,sys)
val ra = sys.actorOf(RestActor.props("api"), "api")
//允许跨域访问,以支持在应用中发起请求
//val httpServer = Http()