重构代码--36修改bug,blockerInfo结构修改导致前端类型识别错误;修改实时图推送的后台代码,采用GraphStage方式推送事件信息。

This commit is contained in:
wuwei1972 2019-04-23 18:04:11 +08:00
parent 9813c3905a
commit 3c566fa380
7 changed files with 184 additions and 35 deletions

View File

@ -59,7 +59,7 @@ class EventActor extends ActorPublisher[Event] {
mediator ! Subscribe(Topic.Event, self)
//发送当前出块人
val pe = PeerExtension(context.system)
self ! new Event( pe.getBlocker.toString, "", Event.Action.CANDIDATOR)
self ! new Event( pe.getBlocker.blocker.toString, "", Event.Action.CANDIDATOR)
val ref = context.actorSelection("/user/modulemanager/memberlistener")
if(ref != null) ref ! cluster.state
}

View File

@ -0,0 +1,49 @@
package rep.log
import akka.actor.{ActorRef,Props}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import rep.protos.peer._
import scala.collection.mutable
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.SourceShape
import rep.log.RecvEventActor.Register
import rep.protos.peer._
import akka.actor.ActorSystem
import rep.utils.IdTool
class EventActor4Stage(eventactor: ActorRef) extends GraphStage[SourceShape[Event]]{
//class EventActor4Stage(system: ActorSystem) extends GraphStage[SourceShape[Event]]{
import scala.concurrent.duration._
//val evtactor = system.actorOf(Props[RecvEventActor],"RecvEventActor_"+IdTool.getUUID)
val out: Outlet[Event] = Outlet("EventActor4Stage")
override def shape: SourceShape[Event] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
implicit def self = stageActor.ref
override def preStart(): Unit = {
val thisStageActor = getStageActor(messageHandler).ref
//evtactor ! Register(thisStageActor)
eventactor ! Register(thisStageActor)
}
setHandler(out,new OutHandler{
override def onPull():Unit={
//此处被messageHandler取代
}
})
private def messageHandler(receive: (ActorRef, Any)): Unit = {
receive match {
case (_, evt:Event) => {
if(this.isAvailable(out) && !this.isClosed(out) )
push(out,evt)
}
case(_,_) =>
}
}
}
}

View File

@ -0,0 +1,73 @@
package rep.log
import rep.network.Topic
import rep.protos.peer._
import akka.stream.actor._
import akka.actor.{Props,Address,Actor,ActorRef,Terminated}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe}
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus
import rep.ui.web.EventServer
import rep.network.tools.PeerExtension
import rep.storage._
import akka.stream.Graph
import scala.collection.mutable.{HashSet,Set}
import rep.log.RecvEventActor.{Register}
import rep.network.util.NodeHelp
object RecvEventActor {
def props: Props = Props[RecvEventActor]
final case class Register(actorRef: ActorRef)
}
class RecvEventActor extends Actor {
var stageActor : ActorRef = null
var stageActors : HashSet[ActorRef] = HashSet[ActorRef]()
val cluster = Cluster(context.system)
override def preStart(): Unit ={
val mediator = DistributedPubSub(context.system).mediator
mediator ! Subscribe(Topic.Event, self)
}
private def clusterInfo(stageActor:ActorRef)={
cluster.state.members.foreach(m=>{
if (m.status == MemberStatus.Up){
if(NodeHelp.isCandidatorNode(m.roles)){
stageActor ! new Event(NodeHelp.getNodeName(m.roles), Topic.Event, Event.Action.MEMBER_UP)
}
}
})
}
override def receive = {
case Register(actorRef) => {
//this.stageActor = actorRef
this.stageActors.add(actorRef)
context.watch(actorRef)
clusterInfo(actorRef)
val pe = PeerExtension(context.system)
self ! new Event( pe.getBlocker.blocker.toString, "", Event.Action.CANDIDATOR)
}
case Terminated(actorRef) => {
//this.stageActor = null
this.stageActors.remove(actorRef)
context.unwatch(actorRef)
//context.stop(self)
}
case evt: Event => {
//if(this.stageActor != null) this.stageActor ! evt
this.stageActors.foreach(f=>f ! evt)
}
}
}

View File

@ -35,6 +35,7 @@ import scala.util.control.Breaks._
import scala.collection.mutable.ArrayBuffer
import rep.log.RepLogger
import rep.protos.peer.Event
import rep.network.util.NodeHelp
/**
* Cluster节点状态监听模块
@ -102,31 +103,9 @@ class MemberListener(MoudleName:String) extends ModuleBase(MoudleName) with Clus
var nodes = Set.empty[ Address ]
private def isCandidatorNode(roles: Set[String]):Boolean = {
var r = false
breakable(
roles.foreach(f=>{
if(f.startsWith("CRFD-Node")){
r = true
break
}
})
)
r
}
private def getNodeName(roles: Set[String]):String = {
var r = ""
breakable(
roles.foreach(f=>{
if(f.startsWith("CRFD-Node")){
r = f.substring(f.indexOf("CRFD-Node")+10)
break
}
})
)
r
}
def receive = {
@ -140,9 +119,9 @@ class MemberListener(MoudleName:String) extends ModuleBase(MoudleName) with Clus
state.members.foreach(m=>{
if (m.status == MemberStatus.Up){
nodes += m.address
if(this.isCandidatorNode(m.roles)){
snodes.append((m.address,this.getNodeName(m.roles)))
sendEvent(EventType.PUBLISH_INFO, mediator, this.getNodeName(m.roles), Topic.Event, Event.Action.MEMBER_UP)
if(NodeHelp.isCandidatorNode(m.roles)){
snodes.append((m.address,NodeHelp.getNodeName(m.roles)))
sendEvent(EventType.PUBLISH_INFO, mediator, NodeHelp.getNodeName(m.roles), Topic.Event, Event.Action.MEMBER_UP)
}
}
})
@ -154,9 +133,9 @@ class MemberListener(MoudleName:String) extends ModuleBase(MoudleName) with Clus
nodes += member.address
RepLogger.info(RepLogger.System_Logger, this.getLogMsgPrefix("Member is Up: {}. {} nodes in cluster"+"~"+member.address+"~"+nodes.size))
pe.getNodeMgr.putNode(member.address)
if(member.roles != null && !member.roles.isEmpty && this.isCandidatorNode(member.roles)){
preloadNodesMap.put(member.address, (TimeUtils.getCurrentTime(),this.getNodeName(member.roles)))
sendEvent(EventType.PUBLISH_INFO, mediator, this.getNodeName(member.roles), Topic.Event, Event.Action.MEMBER_UP)
if(member.roles != null && !member.roles.isEmpty && NodeHelp.isCandidatorNode(member.roles)){
preloadNodesMap.put(member.address, (TimeUtils.getCurrentTime(),NodeHelp.getNodeName(member.roles)))
sendEvent(EventType.PUBLISH_INFO, mediator, NodeHelp.getNodeName(member.roles), Topic.Event, Event.Action.MEMBER_UP)
}
scheduler.scheduleOnce(TimePolicy.getSysNodeStableDelay millis,
@ -171,7 +150,7 @@ class MemberListener(MoudleName:String) extends ModuleBase(MoudleName) with Clus
preloadNodesMap.remove(member.address)
pe.getNodeMgr.removeNode(member.address)
pe.getNodeMgr.removeStableNode(member.address)
sendEvent(EventType.PUBLISH_INFO, mediator, this.getNodeName(member.roles), Topic.Event, Event.Action.MEMBER_DOWN)
sendEvent(EventType.PUBLISH_INFO, mediator, NodeHelp.getNodeName(member.roles), Topic.Event, Event.Action.MEMBER_DOWN)

View File

@ -2,6 +2,7 @@ package rep.network.util
import akka.actor.{ ActorRef, Props }
import rep.app.conf.{ SystemProfile }
import scala.util.control.Breaks._
object NodeHelp {
def isSameNodeForRef(srcRef: ActorRef, destRef: ActorRef): Boolean = {
@ -55,4 +56,31 @@ object NodeHelp {
def isSeedNode(nodeName:String):Boolean={
SystemProfile.getGenesisNodeName.equals(nodeName)
}
def isCandidatorNode(roles: Set[String]):Boolean = {
var r = false
breakable(
roles.foreach(f=>{
if(f.startsWith("CRFD-Node")){
r = true
break
}
})
)
r
}
def getNodeName(roles: Set[String]):String = {
var r = ""
breakable(
roles.foreach(f=>{
if(f.startsWith("CRFD-Node")){
r = f.substring(f.indexOf("CRFD-Node")+10)
break
}
})
)
r
}
}

View File

@ -46,6 +46,12 @@ import rep.sc.Sandbox.SandboxException
import rep.log.RepLogger
import rep.app.conf.SystemProfile
import rep.log.RecvEventActor
import rep.log.EventActor4Stage
import akka.stream.Graph
import akka.stream.SourceShape
import akka.NotUsed
/** Event服务伴生对象
* @author c4w
*/
@ -74,6 +80,10 @@ object EventServer {
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val evtactor = system.actorOf(Props[RecvEventActor],"RecvEventActor")
//提供静态文件的web访问服务
val route_evt =
//提供swagger UI服务
@ -86,10 +96,18 @@ object EventServer {
path("event") {
get {
//must ref to the same actor
val source = Source.actorPublisher[Event](Props[EventActor]).map(evt => BinaryMessage(ByteString(evt.toByteArray)))
//val source = Source.actorPublisher[Event](Props[EventActor]).map(evt => BinaryMessage(ByteString(evt.toByteArray)))
//val sourceGraph: Graph[SourceShape[Event], NotUsed] = new EventActor4Stage(system)
val sourceGraph: Graph[SourceShape[Event], NotUsed] = new EventActor4Stage(evtactor)
val source: Source[Event, NotUsed] = Source.fromGraph(sourceGraph)
extractUpgradeToWebSocket { upgrade =>
complete(upgrade.handleMessagesWithSinkSource(Sink.ignore, source))
complete(upgrade.handleMessagesWithSinkSource(Sink.ignore, source.map(evt => BinaryMessage(ByteString(evt.toByteArray)))))
}
/*extractUpgradeToWebSocket { upgrade =>
complete(upgrade.handleMessagesWithSinkSource(Sink.ignore, source))
}*/
}
}

View File

@ -5,6 +5,7 @@ import rep.protos.peer.CertId
import java.util.UUID
import com.gilt.timeuuid.TimeUuid
object IdTool {
def getUUID: String = {
@ -35,7 +36,8 @@ object IdTool {
}else{
null
}
}
}