PL-7117 ClusterManager per-class usage performance statistics

This commit is contained in:
Andrey Subbotin 2016-05-30 11:59:16 +04:00
parent f403e9bd51
commit d6f3faa692
4 changed files with 260 additions and 35 deletions

View File

@ -24,6 +24,8 @@ import org.apache.commons.io.IOUtils;
import org.jgroups.*;
import org.jgroups.conf.XmlConfigurator;
import org.jgroups.jmx.JmxConfigurator;
import org.perf4j.StopWatch;
import org.perf4j.log4j.Log4JStopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@ -37,10 +39,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
/**
* Standard implementation of middleware clustering based on JGroups.
*
*/
@Component(ClusterManagerAPI.NAME)
public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
@ -66,6 +68,8 @@ public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
protected ThreadLocal<Boolean> forceSyncSending = new ThreadLocal<>();
protected Map<String, MessageStat> messagesStat = new ConcurrentHashMap<>();
protected static final String STATE_MAGIC = "CUBA_STATE";
public ClusterManager() {
@ -81,7 +85,7 @@ public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
SendMessageRunnable sendMessageRunnable = (SendMessageRunnable) r;
log.info("Queue capacity is exceeded. Message {}, {}", sendMessageRunnable.message, sendMessageRunnable.message.getClass());
log.info("Queue capacity is exceeded. Message: {}: {}", sendMessageRunnable.message.getClass(), sendMessageRunnable.message);
}
});
}
@ -95,7 +99,7 @@ public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
if (sync != null && sync) {
internalSend(message, true);
} else {
log.trace("Submitting message {}, {} to send asynchronously", message, message.getClass());
log.trace("Submitting message: {}: {} to send asynchronously", message.getClass(), message);
executor.execute(new SendMessageRunnable(message));
}
}
@ -109,16 +113,25 @@ public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
}
protected void internalSend(Serializable message, boolean sync) {
log.debug("Sending message " + message.getClass() + ": " + message);
byte[] bytes = SerializationSupport.serialize(message);
Message msg = new Message(null, null, bytes);
if (sync) {
msg.setFlag(Message.Flag.RSVP);
}
StopWatch sw = new Log4JStopWatch(String.format("sendClusterMessage(%s)", message.getClass().getSimpleName()));
try {
channel.send(msg);
} catch (Exception e) {
log.error("Error sending message", e);
byte[] bytes = SerializationSupport.serialize(message);
log.debug("Sending message: {}: {} ({} bytes)", message.getClass(), message, bytes.length);
MessageStat stat = messagesStat.get(message.getClass().getName());
if (stat != null) {
stat.updateSent(bytes.length);
}
Message msg = new Message(null, null, bytes);
if (sync) {
msg.setFlag(Message.Flag.RSVP);
}
try {
channel.send(msg);
} catch (Exception e) {
log.error("Error sending message", e);
}
} finally {
sw.stop();
}
}
@ -138,12 +151,16 @@ public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
@Override
public synchronized void addListener(Class messageClass, ClusterListener listener) {
this.listeners.put(messageClass.getName(), listener);
String className = messageClass.getName();
listeners.put(className, listener);
messagesStat.put(className, new MessageStat());
}
@Override
public synchronized void removeListener(Class messageClass, ClusterListener listener) {
listeners.remove(messageClass.getName());
String className = messageClass.getName();
listeners.remove(className);
messagesStat.remove(className);
}
@Override
@ -258,6 +275,74 @@ public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
return currentView == null ? "" : currentView.toString();
}
@Override
public String printClusterStatesStat() {
StringBuilder clusterStateStat = new StringBuilder();
for (Map.Entry<String, ClusterListener> entry : listeners.entrySet()) {
byte[] data = null;
StopWatch sw = new StopWatch();
try {
data = entry.getValue().getState();
} finally {
sw.stop();
}
clusterStateStat
.append(String.format("State: %s, size: %s bytes, serialize time: %s ms\n",
entry.getKey(), data != null ? data.length : -1, sw.getElapsedTime()));
}
return clusterStateStat.toString();
}
public String printMessagesStat() {
StringBuilder messagesStats = new StringBuilder();
for (Map.Entry<String, MessageStat> entry : messagesStat.entrySet()) {
MessageStat stat = entry.getValue();
if (stat != null) {
messagesStats
.append(String.format("Class: %s; received: %s, %s bytes; sent: %s, %s bytes\n",
entry.getKey(), stat.getReceivedMessages(), stat.getReceivedBytes(),
stat.getSentMessages(), stat.getSentBytes()));
}
}
return messagesStats.toString();
}
@Override
public long getSentMessages(String className) {
MessageStat stat = messagesStat.get(className);
if (stat != null) {
return stat.getSentMessages();
}
return 0;
}
@Override
public long getSentBytes(String className) {
MessageStat stat = messagesStat.get(className);
if (stat != null) {
return stat.getSentBytes();
}
return 0;
}
@Override
public long getReceivedMessages(String className) {
MessageStat stat = messagesStat.get(className);
if (stat != null) {
return stat.getReceivedMessages();
}
return 0;
}
@Override
public long getReceivedBytes(String className) {
MessageStat stat = messagesStat.get(className);
if (stat != null) {
return stat.getReceivedBytes();
}
return 0;
}
protected class ClusterReceiver implements Receiver {
@Override
@ -267,17 +352,29 @@ public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
log.debug("Null buffer received");
return;
}
Serializable data = (Serializable) SerializationSupport.deserialize(bytes);
log.debug("Received message " + data.getClass() + ": " + data);
ClusterListener listener = listeners.get(data.getClass().getName());
if (listener != null)
listener.receive(data);
StopWatch sw = new Log4JStopWatch();
String simpleClassName = null;
try {
Serializable data = (Serializable) SerializationSupport.deserialize(bytes);
String className = data.getClass().getName();
simpleClassName = data.getClass().getSimpleName();
log.debug("Received message: {}: {} ({} bytes)", data.getClass(), data, bytes.length);
MessageStat stat = messagesStat.get(className);
if (stat != null) {
stat.updateReceived(bytes.length);
}
ClusterListener listener = listeners.get(className);
if (listener != null) {
listener.receive(data);
}
} finally {
sw.stop(String.format("receiveClusterMessage(%s)", simpleClassName));
}
}
@Override
public void viewAccepted(View new_view) {
log.info("New cluster view: " + new_view);
log.info("New cluster view: {}", new_view);
currentView = new_view;
}
@ -287,7 +384,13 @@ public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
try (DataOutputStream out = new DataOutputStream(output)) {
Map<String, byte[]> state = new HashMap<>();
for (Map.Entry<String, ClusterListener> entry : listeners.entrySet()) {
byte[] data = entry.getValue().getState();
byte[] data;
StopWatch sw = new Log4JStopWatch(String.format("getClusterState(%s)", entry.getKey()));
try {
data = entry.getValue().getState();
} finally {
sw.stop();
}
if (data != null && data.length > 0) {
state.put(entry.getKey(), data);
}
@ -297,7 +400,7 @@ public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
out.writeUTF(STATE_MAGIC);
out.writeInt(state.size());
for (Map.Entry<String, byte[]> entry : state.entrySet()) {
log.debug("Sending state: " + entry.getKey() + " (" + entry.getValue().length + " bytes)");
log.debug("Sending state: {} ({} bytes)", entry.getKey(), entry.getValue().length);
out.writeUTF(entry.getKey());
out.writeInt(entry.getValue().length);
out.write(entry.getValue());
@ -310,7 +413,7 @@ public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
@Override
public void suspect(Address suspected_mbr) {
log.info("Suspected member: " + suspected_mbr);
log.info("Suspected member: {}", suspected_mbr);
}
@Override
@ -330,18 +433,24 @@ public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
for (int i = 0; i < count; i++) {
String name = in.readUTF();
int len = in.readInt();
log.debug("Receiving state: " + name + " (" + len + " bytes)");
byte[] data = new byte[len];
int c = in.read(data);
if (c != len) {
log.error("Error receiving state: invalid data length");
return;
StopWatch sw = new Log4JStopWatch(String.format("setClusterState(%s)", name));
try {
log.debug("Receiving state: {} ({} bytes)", name, len);
byte[] data = new byte[len];
int c = in.read(data);
if (c != len) {
log.error("Error receiving state: invalid data length");
return;
}
ClusterListener listener = listeners.get(name);
if (listener != null) {
listener.setState(data);
}
} finally {
sw.stop();
}
ClusterListener listener = listeners.get(name);
if (listener != null)
listener.setState(data);
}
log.debug("State received");
} catch (Exception e) {
log.error("Error receiving state", e);
}
@ -368,4 +477,37 @@ public class ClusterManager implements ClusterManagerAPI, AppContext.Listener {
internalSend(message, false);
}
}
protected class MessageStat {
protected LongAdder sentBytes = new LongAdder();
protected LongAdder receivedBytes = new LongAdder();
protected LongAdder receivedMessages = new LongAdder();
protected LongAdder sentMessages = new LongAdder();
public void updateReceived(int bytes) {
receivedMessages.increment();
receivedBytes.add(bytes);
}
public void updateSent(int bytes) {
sentMessages.increment();
sentBytes.add(bytes);
}
public long getSentBytes() {
return sentBytes.longValue();
}
public long getSentMessages() {
return sentMessages.longValue();
}
public long getReceivedBytes() {
return receivedBytes.longValue();
}
public long getReceivedMessages() {
return receivedMessages.longValue();
}
}
}

View File

@ -103,4 +103,40 @@ public interface ClusterManagerAPI {
* @return cluster messages count queued to send
*/
int getMessagesCount();
/**
* Serialize cluster state and get state statistics
* @return statistic
*/
String printClusterStatesStat();
/**
* Sent/received messages statistic
* @return statistic
*/
String printMessagesStat();
/**
* Get sent messages count statistic for specified {@param className}
* @return messages count
*/
long getSentMessages(String className);
/**
* Get sent bytes statistic for specified {@param className}
* @return size in bytes
*/
long getSentBytes(String className);
/**
* Get received messages count statistic for specified {@param className}
* @return messages count
*/
long getReceivedMessages(String className);
/**
* Get received bytes statistic for specified {@param className}
* @return size in bytes
*/
long getReceivedBytes(String className);
}

View File

@ -81,4 +81,34 @@ public class ClusterManager implements ClusterManagerMBean {
public int getMessagesCount() {
return clusterManager.getMessagesCount();
}
@Override
public String printClusterStatesStat() {
return clusterManager.printClusterStatesStat();
}
@Override
public String printMessagesStat() {
return clusterManager.printMessagesStat();
}
@Override
public long getSentMessages(String className) {
return clusterManager.getSentMessages(className);
}
@Override
public long getSentBytes(String className) {
return clusterManager.getSentBytes(className);
}
@Override
public long getReceivedMessages(String className) {
return clusterManager.getReceivedMessages(className);
}
@Override
public long getReceivedBytes(String className) {
return clusterManager.getReceivedBytes(className);
}
}

View File

@ -16,7 +16,6 @@
*/
package com.haulmont.cuba.core.jmx;
import com.haulmont.cuba.core.app.ClusterManagerAPI;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
@ -61,4 +60,22 @@ public interface ClusterManagerMBean {
* @see com.haulmont.cuba.core.app.ClusterManagerAPI#getMessagesCount()
*/
int getMessagesCount();
@ManagedOperation(description = "Serialize cluster state and get state statistics")
String printClusterStatesStat();
@ManagedOperation(description = "Sent/received messages statistic")
String printMessagesStat();
@ManagedOperation(description = "Get sent messages count statistic for specified class")
long getSentMessages(String className);
@ManagedOperation(description = "Get sent bytes statistic for specified class")
long getSentBytes(String className);
@ManagedOperation(description = "Get received messages count statistic for specified class")
long getReceivedMessages(String className);
@ManagedOperation(description = "Get received bytes statistic for specified class")
long getReceivedBytes(String className);
}