diff --git a/src/main/scala/rep/log/EventActor.scala b/src/main/scala/rep/log/EventActor.scala index 2fc6bf97..c4ba3b7f 100644 --- a/src/main/scala/rep/log/EventActor.scala +++ b/src/main/scala/rep/log/EventActor.scala @@ -59,7 +59,7 @@ class EventActor extends ActorPublisher[Event] { mediator ! Subscribe(Topic.Event, self) //发送当前出块人 val pe = PeerExtension(context.system) - self ! new Event( pe.getBlocker.toString, "", Event.Action.CANDIDATOR) + self ! new Event( pe.getBlocker.blocker.toString, "", Event.Action.CANDIDATOR) val ref = context.actorSelection("/user/modulemanager/memberlistener") if(ref != null) ref ! cluster.state } diff --git a/src/main/scala/rep/log/EventActor4Stage.scala b/src/main/scala/rep/log/EventActor4Stage.scala new file mode 100644 index 00000000..4fbd736d --- /dev/null +++ b/src/main/scala/rep/log/EventActor4Stage.scala @@ -0,0 +1,49 @@ +package rep.log + +import akka.actor.{ActorRef,Props} +import akka.stream.{Attributes, FlowShape, Inlet, Outlet} +import rep.protos.peer._ +import scala.collection.mutable +import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} +import akka.stream.SourceShape +import rep.log.RecvEventActor.Register +import rep.protos.peer._ +import akka.actor.ActorSystem +import rep.utils.IdTool + +class EventActor4Stage(eventactor: ActorRef) extends GraphStage[SourceShape[Event]]{ +//class EventActor4Stage(system: ActorSystem) extends GraphStage[SourceShape[Event]]{ + import scala.concurrent.duration._ + + //val evtactor = system.actorOf(Props[RecvEventActor],"RecvEventActor_"+IdTool.getUUID) + + val out: Outlet[Event] = Outlet("EventActor4Stage") + override def shape: SourceShape[Event] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + implicit def self = stageActor.ref + + override def preStart(): Unit = { + val thisStageActor = getStageActor(messageHandler).ref + //evtactor ! Register(thisStageActor) + eventactor ! Register(thisStageActor) + } + + setHandler(out,new OutHandler{ + override def onPull():Unit={ + //此处被messageHandler取代 + } + }) + + private def messageHandler(receive: (ActorRef, Any)): Unit = { + receive match { + case (_, evt:Event) => { + if(this.isAvailable(out) && !this.isClosed(out) ) + push(out,evt) + } + case(_,_) => + } + } + } + +} \ No newline at end of file diff --git a/src/main/scala/rep/log/RecvEventActor.scala b/src/main/scala/rep/log/RecvEventActor.scala new file mode 100644 index 00000000..2766b23a --- /dev/null +++ b/src/main/scala/rep/log/RecvEventActor.scala @@ -0,0 +1,73 @@ +package rep.log + +import rep.network.Topic +import rep.protos.peer._ +import akka.stream.actor._ +import akka.actor.{Props,Address,Actor,ActorRef,Terminated} +import akka.cluster.pubsub.DistributedPubSub +import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe} +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.cluster.MemberStatus +import rep.ui.web.EventServer +import rep.network.tools.PeerExtension +import rep.storage._ +import akka.stream.Graph +import scala.collection.mutable.{HashSet,Set} +import rep.log.RecvEventActor.{Register} +import rep.network.util.NodeHelp + +object RecvEventActor { + def props: Props = Props[RecvEventActor] + + final case class Register(actorRef: ActorRef) + + +} + +class RecvEventActor extends Actor { + var stageActor : ActorRef = null + var stageActors : HashSet[ActorRef] = HashSet[ActorRef]() + val cluster = Cluster(context.system) + + + override def preStart(): Unit ={ + val mediator = DistributedPubSub(context.system).mediator + mediator ! Subscribe(Topic.Event, self) + } + + private def clusterInfo(stageActor:ActorRef)={ + cluster.state.members.foreach(m=>{ + if (m.status == MemberStatus.Up){ + if(NodeHelp.isCandidatorNode(m.roles)){ + stageActor ! new Event(NodeHelp.getNodeName(m.roles), Topic.Event, Event.Action.MEMBER_UP) + } + } + }) + } + + override def receive = { + case Register(actorRef) => { + //this.stageActor = actorRef + this.stageActors.add(actorRef) + context.watch(actorRef) + clusterInfo(actorRef) + val pe = PeerExtension(context.system) + self ! new Event( pe.getBlocker.blocker.toString, "", Event.Action.CANDIDATOR) + } + + case Terminated(actorRef) => { + //this.stageActor = null + this.stageActors.remove(actorRef) + context.unwatch(actorRef) + + //context.stop(self) + } + + case evt: Event => { + //if(this.stageActor != null) this.stageActor ! evt + this.stageActors.foreach(f=>f ! evt) + } + } +} + diff --git a/src/main/scala/rep/network/cluster/MemberListener.scala b/src/main/scala/rep/network/cluster/MemberListener.scala index f2787cd8..93bbdbf7 100644 --- a/src/main/scala/rep/network/cluster/MemberListener.scala +++ b/src/main/scala/rep/network/cluster/MemberListener.scala @@ -35,6 +35,7 @@ import scala.util.control.Breaks._ import scala.collection.mutable.ArrayBuffer import rep.log.RepLogger import rep.protos.peer.Event +import rep.network.util.NodeHelp /** * Cluster节点状态监听模块 @@ -102,31 +103,9 @@ class MemberListener(MoudleName:String) extends ModuleBase(MoudleName) with Clus var nodes = Set.empty[ Address ] - private def isCandidatorNode(roles: Set[String]):Boolean = { - var r = false - breakable( - roles.foreach(f=>{ - if(f.startsWith("CRFD-Node")){ - r = true - break - } - }) - ) - r - } - private def getNodeName(roles: Set[String]):String = { - var r = "" - breakable( - roles.foreach(f=>{ - if(f.startsWith("CRFD-Node")){ - r = f.substring(f.indexOf("CRFD-Node")+10) - break - } - }) - ) - r - } + + def receive = { @@ -140,9 +119,9 @@ class MemberListener(MoudleName:String) extends ModuleBase(MoudleName) with Clus state.members.foreach(m=>{ if (m.status == MemberStatus.Up){ nodes += m.address - if(this.isCandidatorNode(m.roles)){ - snodes.append((m.address,this.getNodeName(m.roles))) - sendEvent(EventType.PUBLISH_INFO, mediator, this.getNodeName(m.roles), Topic.Event, Event.Action.MEMBER_UP) + if(NodeHelp.isCandidatorNode(m.roles)){ + snodes.append((m.address,NodeHelp.getNodeName(m.roles))) + sendEvent(EventType.PUBLISH_INFO, mediator, NodeHelp.getNodeName(m.roles), Topic.Event, Event.Action.MEMBER_UP) } } }) @@ -154,9 +133,9 @@ class MemberListener(MoudleName:String) extends ModuleBase(MoudleName) with Clus nodes += member.address RepLogger.info(RepLogger.System_Logger, this.getLogMsgPrefix("Member is Up: {}. {} nodes in cluster"+"~"+member.address+"~"+nodes.size)) pe.getNodeMgr.putNode(member.address) - if(member.roles != null && !member.roles.isEmpty && this.isCandidatorNode(member.roles)){ - preloadNodesMap.put(member.address, (TimeUtils.getCurrentTime(),this.getNodeName(member.roles))) - sendEvent(EventType.PUBLISH_INFO, mediator, this.getNodeName(member.roles), Topic.Event, Event.Action.MEMBER_UP) + if(member.roles != null && !member.roles.isEmpty && NodeHelp.isCandidatorNode(member.roles)){ + preloadNodesMap.put(member.address, (TimeUtils.getCurrentTime(),NodeHelp.getNodeName(member.roles))) + sendEvent(EventType.PUBLISH_INFO, mediator, NodeHelp.getNodeName(member.roles), Topic.Event, Event.Action.MEMBER_UP) } scheduler.scheduleOnce(TimePolicy.getSysNodeStableDelay millis, @@ -171,7 +150,7 @@ class MemberListener(MoudleName:String) extends ModuleBase(MoudleName) with Clus preloadNodesMap.remove(member.address) pe.getNodeMgr.removeNode(member.address) pe.getNodeMgr.removeStableNode(member.address) - sendEvent(EventType.PUBLISH_INFO, mediator, this.getNodeName(member.roles), Topic.Event, Event.Action.MEMBER_DOWN) + sendEvent(EventType.PUBLISH_INFO, mediator, NodeHelp.getNodeName(member.roles), Topic.Event, Event.Action.MEMBER_DOWN) diff --git a/src/main/scala/rep/network/util/NodeHelp.scala b/src/main/scala/rep/network/util/NodeHelp.scala index bc25ffce..7777953c 100644 --- a/src/main/scala/rep/network/util/NodeHelp.scala +++ b/src/main/scala/rep/network/util/NodeHelp.scala @@ -2,6 +2,7 @@ package rep.network.util import akka.actor.{ ActorRef, Props } import rep.app.conf.{ SystemProfile } +import scala.util.control.Breaks._ object NodeHelp { def isSameNodeForRef(srcRef: ActorRef, destRef: ActorRef): Boolean = { @@ -55,4 +56,31 @@ object NodeHelp { def isSeedNode(nodeName:String):Boolean={ SystemProfile.getGenesisNodeName.equals(nodeName) } + + def isCandidatorNode(roles: Set[String]):Boolean = { + var r = false + breakable( + roles.foreach(f=>{ + if(f.startsWith("CRFD-Node")){ + r = true + break + } + }) + ) + r + } + + def getNodeName(roles: Set[String]):String = { + var r = "" + breakable( + roles.foreach(f=>{ + if(f.startsWith("CRFD-Node")){ + r = f.substring(f.indexOf("CRFD-Node")+10) + break + } + }) + ) + r + } + } \ No newline at end of file diff --git a/src/main/scala/rep/ui/web/EventServer.scala b/src/main/scala/rep/ui/web/EventServer.scala index e03f978a..d2103f6c 100644 --- a/src/main/scala/rep/ui/web/EventServer.scala +++ b/src/main/scala/rep/ui/web/EventServer.scala @@ -46,6 +46,12 @@ import rep.sc.Sandbox.SandboxException import rep.log.RepLogger import rep.app.conf.SystemProfile +import rep.log.RecvEventActor +import rep.log.EventActor4Stage +import akka.stream.Graph +import akka.stream.SourceShape +import akka.NotUsed + /** Event服务伴生对象 * @author c4w */ @@ -74,6 +80,10 @@ object EventServer { implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher + val evtactor = system.actorOf(Props[RecvEventActor],"RecvEventActor") + + + //提供静态文件的web访问服务 val route_evt = //提供swagger UI服务 @@ -86,10 +96,18 @@ object EventServer { path("event") { get { //must ref to the same actor - val source = Source.actorPublisher[Event](Props[EventActor]).map(evt => BinaryMessage(ByteString(evt.toByteArray))) + //val source = Source.actorPublisher[Event](Props[EventActor]).map(evt => BinaryMessage(ByteString(evt.toByteArray))) + //val sourceGraph: Graph[SourceShape[Event], NotUsed] = new EventActor4Stage(system) + val sourceGraph: Graph[SourceShape[Event], NotUsed] = new EventActor4Stage(evtactor) + val source: Source[Event, NotUsed] = Source.fromGraph(sourceGraph) + + extractUpgradeToWebSocket { upgrade => - complete(upgrade.handleMessagesWithSinkSource(Sink.ignore, source)) + complete(upgrade.handleMessagesWithSinkSource(Sink.ignore, source.map(evt => BinaryMessage(ByteString(evt.toByteArray))))) } + /*extractUpgradeToWebSocket { upgrade => + complete(upgrade.handleMessagesWithSinkSource(Sink.ignore, source)) + }*/ } } diff --git a/src/main/scala/rep/utils/IdTool.scala b/src/main/scala/rep/utils/IdTool.scala index bef3362a..f828e704 100644 --- a/src/main/scala/rep/utils/IdTool.scala +++ b/src/main/scala/rep/utils/IdTool.scala @@ -5,6 +5,7 @@ import rep.protos.peer.CertId import java.util.UUID import com.gilt.timeuuid.TimeUuid + object IdTool { def getUUID: String = { @@ -35,7 +36,8 @@ object IdTool { }else{ null } - } + + } \ No newline at end of file