add http api handler

This commit is contained in:
gongdewei 2020-05-21 21:58:33 +08:00
parent c654abafed
commit 78316f340d
13 changed files with 1381 additions and 44 deletions

View File

@ -0,0 +1,54 @@
package com.taobao.arthas.core.command.model;
import com.taobao.arthas.core.shell.term.impl.http.api.ApiState;
/**
* Command async exec process result, not the command exec result
* @author gongdewei 2020/4/2
*/
public class CommandRequestModel extends ResultModel {
private ApiState state;
private String command;
private String message;
public CommandRequestModel(String command, ApiState state) {
this.command = command;
this.state = state;
}
public CommandRequestModel(String command, ApiState state, String message) {
this.state = state;
this.command = command;
this.message = message;
}
public String getCommand() {
return command;
}
public void setCommand(String command) {
this.command = command;
}
public ApiState getState() {
return state;
}
public void setState(ApiState state) {
this.state = state;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String getType() {
return "command";
}
}

View File

@ -0,0 +1,22 @@
package com.taobao.arthas.core.command.model;
/**
* Command input status for webui
* @author gongdewei 2020/4/14
*/
public enum InputStatus {
/**
* Allow input new commands
*/
ALLOW_INPUT,
/**
* Allow interrupt running job
*/
ALLOW_INTERRUPT,
/**
* Disable input and interrupt
*/
DISABLED
}

View File

@ -0,0 +1,28 @@
package com.taobao.arthas.core.command.model;
/**
* Input status for webui
* @author gongdewei 2020/4/14
*/
public class InputStatusModel extends ResultModel {
private InputStatus inputStatus;
public InputStatusModel(InputStatus inputStatus) {
this.inputStatus = inputStatus;
}
public InputStatus getInputStatus() {
return inputStatus;
}
public void setInputStatus(InputStatus inputStatus) {
this.inputStatus = inputStatus;
}
@Override
public String getType() {
return "input_status";
}
}

View File

@ -0,0 +1,61 @@
package com.taobao.arthas.core.command.model;
/**
* @author gongdewei 2020/4/20
*/
public class WelcomeModel extends ResultModel {
private String pid;
private String time;
private String version;
private String wiki;
private String tutorials;
public WelcomeModel() {
}
@Override
public String getType() {
return "welcome";
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getWiki() {
return wiki;
}
public void setWiki(String wiki) {
this.wiki = wiki;
}
public String getTutorials() {
return tutorials;
}
public void setTutorials(String tutorials) {
this.tutorials = tutorials;
}
}

View File

@ -1,6 +1,7 @@
package com.taobao.arthas.core.shell.term.impl.http;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
@ -8,6 +9,7 @@ import java.net.URL;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.common.IOUtils;
import com.taobao.arthas.core.shell.term.impl.http.api.HttpApiHandler;
import com.taobao.arthas.core.shell.term.impl.httptelnet.HttpTelnetTermServer;
import io.netty.channel.ChannelFuture;
@ -15,7 +17,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
@ -27,9 +28,13 @@ import io.netty.handler.codec.http.LastHttpContent;
import io.termd.core.http.HttpTtyConnection;
import io.termd.core.util.Logging;
import static com.taobao.arthas.core.util.HttpUtils.createRedirectResponse;
import static com.taobao.arthas.core.util.HttpUtils.createResponse;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
* @author hengyunabc 2019-11-06
* @author gongdewei 2020-03-18
*/
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger logger = LoggerFactory.getLogger(HttpTelnetTermServer.class);
@ -38,10 +43,13 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequ
private File dir;
private HttpApiHandler httpApiHandler;
public HttpRequestHandler(String wsUri, File dir) {
this.wsUri = wsUri;
this.dir = dir;
dir.mkdirs();
this.httpApiHandler = HttpApiHandler.getInstance();
}
@Override
@ -53,26 +61,74 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequ
send100Continue(ctx);
}
HttpResponse response = new DefaultHttpResponse(request.protocolVersion(),
HttpResponseStatus.INTERNAL_SERVER_ERROR);
HttpResponse response = null;
String path = new URI(request.uri()).getPath();
if ("/".equals(path)) {
path = "/index.html";
}
boolean isHttpApiResponse = false;
try {
//handle http restful api
if ("/api".equals(path)) {
response = httpApiHandler.handle(request);
isHttpApiResponse = true;
}
//handle webui requests
if (path.equals("/ui")){
response = createRedirectResponse(request, "/ui/");
}
if (path.equals("/ui/")) {
path += "index.html";
}
//try classpath resource first
if (response == null){
response = readFileFromResource(request, path);
}
//try output dir later, avoid overlay classpath resources files
if (response == null){
response = DirectoryBrowser.view(dir, path, request.protocolVersion());
}
//not found
if (response == null){
response = createResponse(request, HttpResponseStatus.NOT_FOUND, "Not found");
}
} catch (Throwable e) {
logger.error("arthas process http request error: " + request.uri(), e);
} finally {
//If it is null, an error may occur
if (response == null){
response = createResponse(request, HttpResponseStatus.INTERNAL_SERVER_ERROR, "Server error");
}
ctx.write(response);
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
future.addListener(ChannelFutureListener.CLOSE);
//reuse http api response buf
if (isHttpApiResponse && response instanceof DefaultFullHttpResponse) {
final HttpResponse finalResponse = response;
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
httpApiHandler.onCompleted((DefaultFullHttpResponse) finalResponse);
}
});
}
}
}
}
private FullHttpResponse readFileFromResource(FullHttpRequest request, String path) throws IOException {
DefaultFullHttpResponse fullResp = null;
InputStream in = null;
try {
DefaultFullHttpResponse fileViewResult = DirectoryBrowser.view(dir, path, request.protocolVersion());
if (fileViewResult != null) {
response = fileViewResult;
} else {
URL res = HttpTtyConnection.class.getResource("/com/taobao/arthas/core/http" + path);
if (res != null) {
DefaultFullHttpResponse fullResp = new DefaultFullHttpResponse(request.protocolVersion(),
fullResp = new DefaultFullHttpResponse(request.protocolVersion(),
HttpResponseStatus.OK);
in = res.openStream();
byte[] tmp = new byte[256];
@ -97,21 +153,11 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequ
fullResp.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType);
}
}
response = fullResp;
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
}
}
} catch (Throwable e) {
logger.error("arthas process http request error: " + request.uri(), e);
} finally {
ctx.write(response);
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
future.addListener(ChannelFutureListener.CLOSE);
IOUtils.close(in);
}
}
return fullResp;
}
private static void send100Continue(ChannelHandlerContext ctx) {

View File

@ -0,0 +1,48 @@
package com.taobao.arthas.core.shell.term.impl.http.api;
/**
* Http api action enums
*
* @author gongdewei 2020-03-25
*/
public enum ApiAction {
/**
* Execute command synchronized
*/
EXEC,
/**
* Execute command async
*/
ASYNC_EXEC,
/**
* Interrupt executing job
*/
INTERRUPT_JOB,
/**
* Pull the results from result queue of the session
*/
PULL_RESULTS,
/**
* Create a new session
*/
INIT_SESSION,
/**
* Join a exist session
*/
JOIN_SESSION,
/**
* Terminate the session
*/
CLOSE_SESSION,
/**
* Get session info
*/
SESSION_INFO
}

View File

@ -0,0 +1,16 @@
package com.taobao.arthas.core.shell.term.impl.http.api;
/**
* Http Api exception
* @author gongdewei 2020-03-19
*/
public class ApiException extends Exception {
public ApiException(String message) {
super(message);
}
public ApiException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,87 @@
package com.taobao.arthas.core.shell.term.impl.http.api;
import java.util.Map;
/**
* Http Api request
*
* @author gongdewei 2020-03-19
*/
public class ApiRequest {
private String action;
private String command;
private String requestId;
private String sessionId;
private String consumerId;
private Integer timeout;
private Map<String, Object> options;
@Override
public String toString() {
return "ApiRequest{" +
"action='" + action + '\'' +
", command='" + command + '\'' +
", requestId='" + requestId + '\'' +
", sessionId='" + sessionId + '\'' +
", consumerId='" + consumerId + '\'' +
", timeout=" + timeout +
", options=" + options +
'}';
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getCommand() {
return command;
}
public void setCommand(String command) {
this.command = command;
}
public Map<String, Object> getOptions() {
return options;
}
public void setOptions(Map<String, Object> options) {
this.options = options;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
public String getConsumerId() {
return consumerId;
}
public void setConsumerId(String consumerId) {
this.consumerId = consumerId;
}
public Integer getTimeout() {
return timeout;
}
public void setTimeout(Integer timeout) {
this.timeout = timeout;
}
}

View File

@ -0,0 +1,79 @@
package com.taobao.arthas.core.shell.term.impl.http.api;
/**
* Http Api exception
* @author gongdewei 2020-03-19
*/
public class ApiResponse<T> {
private String requestId;
private ApiState state;
private String message;
private String sessionId;
private String consumerId;
private String jobId;
private T body;
public String getRequestId() {
return requestId;
}
public ApiResponse<T> setRequestId(String requestId) {
this.requestId = requestId;
return this;
}
public ApiState getState() {
return state;
}
public ApiResponse<T> setState(ApiState state) {
this.state = state;
return this;
}
public String getMessage() {
return message;
}
public ApiResponse<T> setMessage(String message) {
this.message = message;
return this;
}
public String getSessionId() {
return sessionId;
}
public ApiResponse<T> setSessionId(String sessionId) {
this.sessionId = sessionId;
return this;
}
public String getConsumerId() {
return consumerId;
}
public ApiResponse<T> setConsumerId(String consumerId) {
this.consumerId = consumerId;
return this;
}
public String getJobId() {
return jobId;
}
public ApiResponse<T> setJobId(String jobId) {
this.jobId = jobId;
return this;
}
public T getBody() {
return body;
}
public ApiResponse<T> setBody(T body) {
this.body = body;
return this;
}
}

View File

@ -0,0 +1,35 @@
package com.taobao.arthas.core.shell.term.impl.http.api;
/**
* Http API response state
*
* @author gongdewei 2020-03-19
*/
public enum ApiState {
/**
* Scheduled async exec job
*/
SCHEDULED,
// RUNNING,
/**
* Request processed successfully
*/
SUCCEEDED,
/**
* Request processing interrupt
*/
INTERRUPTED,
/**
* Request processing failed
*/
FAILED,
/**
* Request is refused
*/
REFUSED
}

View File

@ -0,0 +1,708 @@
package com.taobao.arthas.core.shell.term.impl.http.api;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.taobao.arthas.common.PidUtils;
import com.taobao.arthas.core.command.model.*;
import com.taobao.arthas.core.distribution.PackingResultDistributor;
import com.taobao.arthas.core.distribution.ResultConsumer;
import com.taobao.arthas.core.distribution.ResultDistributor;
import com.taobao.arthas.core.distribution.impl.PackingResultDistributorImpl;
import com.taobao.arthas.core.distribution.impl.ResultConsumerImpl;
import com.taobao.arthas.core.server.ArthasBootstrap;
import com.taobao.arthas.core.shell.cli.CliToken;
import com.taobao.arthas.core.shell.cli.CliTokens;
import com.taobao.arthas.core.shell.cli.Completion;
import com.taobao.arthas.core.shell.handlers.Handler;
import com.taobao.arthas.core.shell.history.HistoryManager;
import com.taobao.arthas.core.shell.history.impl.HistoryManagerImpl;
import com.taobao.arthas.core.shell.session.Session;
import com.taobao.arthas.core.shell.session.SessionManager;
import com.taobao.arthas.core.shell.system.Job;
import com.taobao.arthas.core.shell.system.JobListener;
import com.taobao.arthas.core.shell.system.impl.InternalCommandManager;
import com.taobao.arthas.core.shell.system.impl.JobControllerImpl;
import com.taobao.arthas.core.shell.term.SignalHandler;
import com.taobao.arthas.core.shell.term.Term;
import com.taobao.arthas.core.util.ArthasBanner;
import com.taobao.arthas.core.util.DateUtils;
import com.taobao.arthas.core.util.JsonUtils;
import com.taobao.arthas.core.util.StringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import io.termd.core.function.Function;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Http Restful Api Handler
*
* @author gongdewei 2020-03-18
*/
public class HttpApiHandler {
private static final Logger logger = LoggerFactory.getLogger(HttpApiHandler.class);
public static final int DEFAULT_EXEC_TIMEOUT = 30000;
private final SessionManager sessionManager;
private final AtomicInteger requestIdGenerator = new AtomicInteger(0);
private static HttpApiHandler instance;
private final InternalCommandManager commandManager;
private final JobControllerImpl jobController;
private final HistoryManager historyManager;
private int jsonBufferSize = 1024 * 256;
private int poolSize = 8;
private ArrayBlockingQueue<ByteBuf> byteBufPool = new ArrayBlockingQueue<ByteBuf>(poolSize);
private ArrayBlockingQueue<char[]> charsBufPool = new ArrayBlockingQueue<char[]>(poolSize);
private ArrayBlockingQueue<byte[]> bytesPool = new ArrayBlockingQueue<byte[]>(poolSize);
public static HttpApiHandler getInstance() {
if (instance == null) {
synchronized (HttpApiHandler.class) {
instance = new HttpApiHandler();
}
}
return instance;
}
private HttpApiHandler() {
sessionManager = ArthasBootstrap.getInstance().getSessionManager();
commandManager = sessionManager.getCommandManager();
jobController = sessionManager.getJobController();
historyManager = HistoryManagerImpl.getInstance();
//init buf pool
JsonUtils.setSerializeWriterBufferThreshold(jsonBufferSize);
for (int i = 0; i < poolSize; i++) {
byteBufPool.offer(Unpooled.buffer(jsonBufferSize));
charsBufPool.offer(new char[jsonBufferSize]);
bytesPool.offer(new byte[jsonBufferSize]);
}
}
public HttpResponse handle(FullHttpRequest request) throws Exception {
ApiResponse result;
String requestBody = null;
String requestId = "req_" + requestIdGenerator.addAndGet(1);
try {
HttpMethod method = request.method();
if (HttpMethod.POST.equals(method)) {
requestBody = getBody(request);
ApiRequest apiRequest = parseRequest(requestBody);
apiRequest.setRequestId(requestId);
result = processRequest(apiRequest);
} else {
result = createResponse(ApiState.REFUSED, "Unsupported http method: " + method.name());
}
} catch (Throwable e) {
result = createResponse(ApiState.FAILED, "Process request error: " + e.getMessage());
logger.error("arthas process http api request error: " + request.uri() + ", request body: " + requestBody, e);
}
if (result == null) {
result = createResponse(ApiState.FAILED, "The request was not processed");
}
result.setRequestId(requestId);
//http response content
ByteBuf content = null;
//fastjson buf
char[] charsBuf = null;
byte[] bytesBuf = null;
try {
//apply response content buf first
content = byteBufPool.poll(2000, TimeUnit.MILLISECONDS);
if (content == null) {
throw new ApiException("get response content buf failure");
}
//apply fastjson buf from pool
charsBuf = charsBufPool.poll();
bytesBuf = bytesPool.poll();
if (charsBuf == null || bytesBuf == null) {
throw new ApiException("get json buf failure");
}
JsonUtils.setSerializeWriterBufThreadLocal(charsBuf, bytesBuf);
//create http response
DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(),
HttpResponseStatus.OK, content.retain());
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8");
writeResult(response, result);
return response;
} catch (Exception e) {
//response is discarded
if (content != null) {
content.release();
byteBufPool.offer(content);
}
throw e;
} finally {
//give back json buf to pool
JsonUtils.setSerializeWriterBufThreadLocal(null, null);
if (charsBuf != null) {
charsBufPool.offer(charsBuf);
}
if (bytesBuf != null) {
bytesPool.offer(bytesBuf);
}
}
}
public void onCompleted(DefaultFullHttpResponse httpResponse) {
ByteBuf content = httpResponse.content();
content.clear();
if (content.capacity() == jsonBufferSize) {
if (!byteBufPool.offer(content)) {
content.release();
}
} else {
//replace content ByteBuf
content.release();
if (byteBufPool.remainingCapacity() > 0) {
byteBufPool.offer(Unpooled.buffer(jsonBufferSize));
}
}
}
private void writeResult(DefaultFullHttpResponse response, Object result) throws IOException {
ByteBufOutputStream out = new ByteBufOutputStream(response.content());
try {
JSON.writeJSONString(out, result);
} catch (IOException e) {
logger.error("write json to response failed", e);
throw e;
}
}
private ApiRequest parseRequest(String requestBody) throws ApiException {
if (StringUtils.isBlank(requestBody)) {
throw new ApiException("parse request failed: request body is empty");
}
try {
//ObjectMapper objectMapper = new ObjectMapper();
//return objectMapper.readValue(requestBody, ApiRequest.class);
return JSON.parseObject(requestBody, ApiRequest.class);
} catch (Exception e) {
throw new ApiException("parse request failed: " + e.getMessage(), e);
}
}
private ApiResponse processRequest(ApiRequest apiRequest) {
String actionStr = apiRequest.getAction();
try {
if (StringUtils.isBlank(actionStr)) {
throw new ApiException("'action' is required");
}
ApiAction action;
try {
action = ApiAction.valueOf(actionStr.trim().toUpperCase());
} catch (IllegalArgumentException e) {
throw new ApiException("unknown action: " + actionStr);
}
//no session required
if (ApiAction.INIT_SESSION.equals(action)) {
return processInitSessionRequest(apiRequest);
}
//required session
String sessionId = apiRequest.getSessionId();
if (StringUtils.isBlank(sessionId)) {
throw new ApiException("'sessionId' is required");
}
Session session = sessionManager.getSession(sessionId);
if (session == null) {
throw new ApiException("session not found: " + sessionId);
}
sessionManager.updateAccessTime(session);
//dispatch requests
ApiResponse response = dispatchRequest(action, apiRequest, session);
if (response != null) {
return response;
}
} catch (ApiException e) {
logger.info("process http api request failed: {}", e.getMessage());
return createResponse(ApiState.FAILED, e.getMessage());
} catch (Throwable e) {
logger.error("process http api request failed: " + e.getMessage(), e);
return createResponse(ApiState.FAILED, "process http api request failed: " + e.getMessage());
}
return createResponse(ApiState.REFUSED, "Unsupported action: " + actionStr);
}
private ApiResponse dispatchRequest(ApiAction action, ApiRequest apiRequest, Session session) throws ApiException {
switch (action) {
case EXEC:
return processExecRequest(apiRequest, session);
case ASYNC_EXEC:
return processAsyncExecRequest(apiRequest, session);
case INTERRUPT_JOB:
return processInterruptJob(apiRequest, session);
case PULL_RESULTS:
return processPullResultsRequest(apiRequest, session);
case SESSION_INFO:
return processSessionInfoRequest(apiRequest, session);
case JOIN_SESSION:
return processJoinSessionRequest(apiRequest, session);
case CLOSE_SESSION:
return processCloseSessionRequest(apiRequest, session);
case INIT_SESSION:
break;
}
return null;
}
private ApiResponse processInitSessionRequest(ApiRequest apiRequest) throws ApiException {
ApiResponse response = new ApiResponse();
//create session
Session session = sessionManager.createSession();
if (session != null) {
//create consumer
ResultConsumer resultConsumer = new ResultConsumerImpl();
session.getResultDistributor().addConsumer(resultConsumer);
session.getResultDistributor().appendResult(new MessageModel("Welcome to arthas!"));
//welcome message
WelcomeModel welcomeModel = new WelcomeModel();
welcomeModel.setVersion(ArthasBanner.version());
welcomeModel.setWiki(ArthasBanner.wiki());
welcomeModel.setTutorials(ArthasBanner.tutorials());
welcomeModel.setPid(PidUtils.currentPid());
welcomeModel.setTime(DateUtils.getCurrentDate());
session.getResultDistributor().appendResult(welcomeModel);
//allow input
updateSessionInputStatus(session, InputStatus.ALLOW_INPUT);
response.setSessionId(session.getSessionId())
.setConsumerId(resultConsumer.getConsumerId())
.setState(ApiState.SUCCEEDED);
} else {
throw new ApiException("create api session failed");
}
return response;
}
/**
* Update session input status for all consumer
*
* @param session
* @param inputStatus
*/
private void updateSessionInputStatus(Session session, InputStatus inputStatus) {
session.getResultDistributor().appendResult(new InputStatusModel(inputStatus));
}
private ApiResponse processJoinSessionRequest(ApiRequest apiRequest, Session session) {
//create consumer
ResultConsumer resultConsumer = new ResultConsumerImpl();
//disable input and interrupt
resultConsumer.appendResult(new InputStatusModel(InputStatus.DISABLED));
session.getResultDistributor().addConsumer(resultConsumer);
ApiResponse response = new ApiResponse();
response.setSessionId(session.getSessionId())
.setConsumerId(resultConsumer.getConsumerId())
.setState(ApiState.SUCCEEDED);
return response;
}
private ApiResponse processSessionInfoRequest(ApiRequest apiRequest, Session session) {
ApiResponse response = new ApiResponse();
Map<String, Object> body = new TreeMap<String, Object>();
body.put("pid", session.getPid());
body.put("createTime", session.getCreateTime());
body.put("lastAccessTime", session.getLastAccessTime());
response.setState(ApiState.SUCCEEDED)
.setSessionId(session.getSessionId())
//.setConsumerId(consumerId)
.setBody(body);
return response;
}
private ApiResponse processCloseSessionRequest(ApiRequest apiRequest, Session session) {
sessionManager.removeSession(session.getSessionId());
ApiResponse response = new ApiResponse();
response.setState(ApiState.SUCCEEDED);
return response;
}
/**
* Execute command sync, wait for job finish or timeout, sending results immediately
*
* @param apiRequest
* @param session
* @return
*/
private ApiResponse processExecRequest(ApiRequest apiRequest, Session session) {
String commandLine = apiRequest.getCommand();
Map<String, Object> body = new TreeMap<String, Object>();
body.put("command", commandLine);
ApiResponse response = new ApiResponse();
response.setSessionId(session.getSessionId())
.setBody(body);
if (!session.tryLock()) {
response.setState(ApiState.REFUSED)
.setMessage("Another command is executing.");
return response;
}
int lock = session.getLock();
PackingResultDistributor packingResultDistributor = null;
Job job = null;
try {
Job foregroundJob = session.getForegroundJob();
if (foregroundJob != null) {
response.setState(ApiState.REFUSED)
.setMessage("Another job is running.");
logger.info("Another job is running, jobId: {}", foregroundJob.id());
return response;
}
//distribute result message both to origin session channel and request channel by CompositeResultDistributor
packingResultDistributor = new PackingResultDistributorImpl(session);
//ResultDistributor resultDistributor = new CompositeResultDistributorImpl(packingResultDistributor, session.getResultDistributor());
job = this.createJob(commandLine, session, packingResultDistributor);
session.setForegroundJob(job);
updateSessionInputStatus(session, InputStatus.ALLOW_INTERRUPT);
job.run();
} catch (Throwable e) {
logger.error("Exec command failed:" + e.getMessage() + ", command:" + commandLine, e);
response.setState(ApiState.FAILED).setMessage("Exec command failed:" + e.getMessage());
return response;
} finally {
if (session.getLock() == lock) {
session.unLock();
}
}
//wait for job completed or timeout
Integer timeout = apiRequest.getTimeout();
if (timeout == null || timeout <= 0) {
timeout = DEFAULT_EXEC_TIMEOUT;
}
boolean timeExpired = !waitForJob(job, timeout);
if (timeExpired) {
logger.warn("Job is exceeded time limit, force interrupt it, jobId: {}", job.id());
job.interrupt();
response.setState(ApiState.INTERRUPTED).setMessage("The job is exceeded time limit, force interrupt");
} else {
response.setState(ApiState.SUCCEEDED);
}
//packing results
body.put("jobId", job.id());
body.put("jobStatus", job.status());
body.put("timeExpired", timeExpired);
if (timeExpired) {
body.put("timeout", timeout);
}
body.put("results", packingResultDistributor.getResults());
response.setSessionId(session.getSessionId())
//.setConsumerId(consumerId)
.setBody(body);
return response;
}
/**
* Execute command async, create and schedule the job running, but no wait for the results.
*
* @param apiRequest
* @param session
* @return
*/
private ApiResponse processAsyncExecRequest(ApiRequest apiRequest, Session session) {
String commandLine = apiRequest.getCommand();
Map<String, Object> body = new TreeMap<String, Object>();
body.put("command", commandLine);
ApiResponse response = new ApiResponse();
response.setSessionId(session.getSessionId())
.setBody(body);
if (!session.tryLock()) {
response.setState(ApiState.REFUSED)
.setMessage("Another command is executing.");
return response;
}
int lock = session.getLock();
try {
Job foregroundJob = session.getForegroundJob();
if (foregroundJob != null) {
response.setState(ApiState.REFUSED)
.setMessage("Another job is running.");
logger.info("Another job is running, jobId: {}", foregroundJob.id());
return response;
}
//create job
Job job = this.createJob(commandLine, session, session.getResultDistributor());
body.put("jobId", job.id());
body.put("jobStatus", job.status());
response.setState(ApiState.SCHEDULED);
//add command before exec job
CommandRequestModel commandRequestModel = new CommandRequestModel(commandLine, response.getState());
commandRequestModel.setJobId(job.id());
session.getResultDistributor().appendResult(commandRequestModel);
session.setForegroundJob(job);
updateSessionInputStatus(session, InputStatus.ALLOW_INTERRUPT);
//run job
job.run();
return response;
} catch (Throwable e) {
logger.error("Async exec command failed:" + e.getMessage() + ", command:" + commandLine, e);
response.setState(ApiState.FAILED).setMessage("Async exec command failed:" + e.getMessage());
CommandRequestModel commandRequestModel = new CommandRequestModel(commandLine, response.getState(), response.getMessage());
session.getResultDistributor().appendResult(commandRequestModel);
return response;
} finally {
if (session.getLock() == lock) {
session.unLock();
}
}
}
private ApiResponse processInterruptJob(ApiRequest apiRequest, Session session) {
Job job = session.getForegroundJob();
if (job == null) {
return new ApiResponse().setState(ApiState.FAILED).setMessage("no foreground job is running");
}
job.interrupt();
Map<String, Object> body = new TreeMap<String, Object>();
body.put("jobId", job.id());
body.put("jobStatus", job.status());
return new ApiResponse()
.setState(ApiState.SUCCEEDED)
.setBody(body);
}
/**
* Pull results from result queue
*
* @param apiRequest
* @param session
* @return
*/
private ApiResponse processPullResultsRequest(ApiRequest apiRequest, Session session) throws ApiException {
String consumerId = apiRequest.getConsumerId();
if (StringUtils.isBlank(consumerId)) {
throw new ApiException("'consumerId' is required");
}
ResultConsumer consumer = session.getResultDistributor().getConsumer(consumerId);
if (consumer == null) {
throw new ApiException("consumer not found: " + consumerId);
}
List<ResultModel> results = consumer.pollResults();
Map<String, Object> body = new TreeMap<String, Object>();
body.put("results", results);
ApiResponse response = new ApiResponse();
response.setState(ApiState.SUCCEEDED)
.setSessionId(session.getSessionId())
.setConsumerId(consumerId)
.setBody(body);
return response;
}
private boolean waitForJob(Job job, int timeout) {
long startTime = System.currentTimeMillis();
while (true) {
switch (job.status()) {
case STOPPED:
case TERMINATED:
return true;
}
if (System.currentTimeMillis() - startTime > timeout) {
return false;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
private synchronized Job createJob(List<CliToken> args, Session session, ResultDistributor resultDistributor) {
Job job = jobController.createJob(commandManager, args, session, new ApiJobHandler(session), new ApiTerm(session), resultDistributor);
return job;
}
private Job createJob(String line, Session session, ResultDistributor resultDistributor) {
historyManager.addHistory(line);
historyManager.saveHistory();
return createJob(CliTokens.tokenize(line), session, resultDistributor);
}
private ApiResponse createResponse(ApiState apiState, String message) {
ApiResponse apiResponse = new ApiResponse();
apiResponse.setState(apiState);
apiResponse.setMessage(message);
return apiResponse;
}
private String getBody(FullHttpRequest request) {
ByteBuf buf = request.content();
return buf.toString(CharsetUtil.UTF_8);
}
private class ApiJobHandler implements JobListener {
private Session session;
public ApiJobHandler(Session session) {
this.session = session;
}
@Override
public void onForeground(Job job) {
session.setForegroundJob(job);
}
@Override
public void onBackground(Job job) {
if (session.getForegroundJob() == job) {
session.setForegroundJob(null);
updateSessionInputStatus(session, InputStatus.ALLOW_INPUT);
}
}
@Override
public void onTerminated(Job job) {
if (session.getForegroundJob() == job) {
session.setForegroundJob(null);
updateSessionInputStatus(session, InputStatus.ALLOW_INPUT);
}
}
@Override
public void onSuspend(Job job) {
if (session.getForegroundJob() == job) {
session.setForegroundJob(null);
updateSessionInputStatus(session, InputStatus.ALLOW_INPUT);
}
}
}
private class ApiTerm implements Term {
private Session session;
public ApiTerm(Session session) {
this.session = session;
}
@Override
public Term resizehandler(Handler<Void> handler) {
return this;
}
@Override
public String type() {
return "web";
}
@Override
public int width() {
return 1000;
}
@Override
public int height() {
return 200;
}
@Override
public Term stdinHandler(Handler<String> handler) {
return this;
}
@Override
public Term stdoutHandler(Function<String, String> handler) {
return this;
}
@Override
public Term write(String data) {
return this;
}
@Override
public long lastAccessedTime() {
return session.getLastAccessTime();
}
@Override
public Term echo(String text) {
return this;
}
@Override
public Term setSession(Session session) {
return this;
}
@Override
public Term interruptHandler(SignalHandler handler) {
return this;
}
@Override
public Term suspendHandler(SignalHandler handler) {
return this;
}
@Override
public void readline(String prompt, Handler<String> lineHandler) {
}
@Override
public void readline(String prompt, Handler<String> lineHandler, Handler<Completion> completionHandler) {
}
@Override
public Term closeHandler(Handler<Void> handler) {
return this;
}
@Override
public void close() {
}
}
}

View File

@ -0,0 +1,60 @@
package com.taobao.arthas.core.util;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import java.io.UnsupportedEncodingException;
import java.util.Set;
/**
* @author gongdewei 2020/3/31
*/
public class HttpUtils {
/**
* Get cookie value by name
* @param cookies request cookies
* @param cookieName the cookie name
*/
public static String getCookieValue(Set<Cookie> cookies, String cookieName) {
for (Cookie cookie : cookies) {
if(cookie.name().equals(cookieName)){
return cookie.value();
}
}
return null;
}
/**
*
* @param response
* @param name
* @param value
*/
public static void setCookie(DefaultFullHttpResponse response, String name, String value) {
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode(name, value));
}
/**
* Create http response with status code and content
* @param request request
* @param status response status code
* @param content response content
*/
public static DefaultHttpResponse createResponse(FullHttpRequest request, HttpResponseStatus status, String content) {
DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), status);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=utf-8");
try {
response.content().writeBytes(content.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
}
return response;
}
public static HttpResponse createRedirectResponse(FullHttpRequest request, String url) {
DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FOUND);
response.headers().set(HttpHeaderNames.LOCATION, url);
return response;
}
}

View File

@ -0,0 +1,93 @@
package com.taobao.arthas.core.util;
import com.alibaba.fastjson.serializer.SerializeWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
/**
* @author gongdewei 2020/5/15
*/
public class JsonUtils {
private static final Logger logger = LoggerFactory.getLogger(JsonUtils.class);
private static Field serializeWriterBufLocalField;
private static Field serializeWriterBytesBufLocal;
private static Field serializeWriterBufferThreshold;
/**
* Set Fastjson SerializeWriter Buffer Threshold
* @param value
*/
public static void setSerializeWriterBufferThreshold(int value) {
Class<SerializeWriter> clazz = SerializeWriter.class;
try {
if (serializeWriterBufferThreshold == null) {
serializeWriterBufferThreshold = clazz.getDeclaredField("BUFFER_THRESHOLD");
}
serializeWriterBufferThreshold.setAccessible(true);
serializeWriterBufferThreshold.set(null, value);
} catch (Exception e) {
logger.error("update SerializeWriter.BUFFER_THRESHOLD value failed", e);
}
}
/**
* Set Fastjson SerializeWriter ThreadLocal value
* @param bufSize
*/
public static void setSerializeWriterBufThreadLocal(int bufSize) {
Class<SerializeWriter> clazz = SerializeWriter.class;
try {
//set threadLocal value
if (serializeWriterBufLocalField == null) {
serializeWriterBufLocalField = clazz.getDeclaredField("bufLocal");
}
serializeWriterBufLocalField.setAccessible(true);
ThreadLocal<char[]> bufLocal = (ThreadLocal<char[]>) serializeWriterBufLocalField.get(null);
char[] charsLocal = bufLocal.get();
if (charsLocal == null || charsLocal.length < bufSize) {
bufLocal.set(new char[bufSize]);
}
if (serializeWriterBytesBufLocal == null) {
serializeWriterBytesBufLocal = clazz.getDeclaredField("bytesBufLocal");
}
serializeWriterBytesBufLocal.setAccessible(true);
ThreadLocal<byte[]> bytesBufLocal = (ThreadLocal<byte[]>) serializeWriterBytesBufLocal.get(null);
byte[] bytesLocal = bytesBufLocal.get();
if (bytesLocal == null || bytesLocal.length < bufSize) {
bytesBufLocal.set(new byte[bufSize]);
}
} catch (Exception e) {
logger.error("update SerializeWriter.BUFFER_THRESHOLD value failed", e);
}
}
/**
* Set Fastjson SerializeWriter ThreadLocal value
*/
public static void setSerializeWriterBufThreadLocal(char[] charsBuf, byte[] bytesBuf) {
Class<SerializeWriter> clazz = SerializeWriter.class;
try {
//set threadLocal value
if (serializeWriterBufLocalField == null) {
serializeWriterBufLocalField = clazz.getDeclaredField("bufLocal");
}
serializeWriterBufLocalField.setAccessible(true);
ThreadLocal<char[]> bufLocal = (ThreadLocal<char[]>) serializeWriterBufLocalField.get(null);
bufLocal.set(charsBuf);
if (serializeWriterBytesBufLocal == null) {
serializeWriterBytesBufLocal = clazz.getDeclaredField("bytesBufLocal");
}
serializeWriterBytesBufLocal.setAccessible(true);
ThreadLocal<byte[]> bytesBufLocal = (ThreadLocal<byte[]>) serializeWriterBytesBufLocal.get(null);
bytesBufLocal.set(bytesBuf);
} catch (Exception e) {
logger.error("update SerializeWriter.BUFFER_THRESHOLD value failed", e);
}
}
}