refactor log to logback's sifting appender to simplify Task's logging logic (#751)

* refactor to use SiftingAppender to write Tasks' log

* refactor to use shared CONSTS for TaskLogInfo

* fix refactor bug
This commit is contained in:
Baoqi 2019-08-30 19:19:57 +08:00 committed by 乔占卫
parent e71fdac539
commit f00ab67c05
12 changed files with 116 additions and 446 deletions

View File

@ -9,19 +9,26 @@
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="TASKLOGFILE" class="cn.escheduler.server.worker.log.TaskLogAppender">
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<filter class="cn.escheduler.server.worker.log.TaskLogFilter"></filter>
<file>${log.base}/{processDefinitionId}/{processInstanceId}/{taskInstanceId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
<Discriminator class="cn.escheduler.server.worker.log.TaskLogDiscriminator">
<key>taskAppId</key>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
</appender>
</sift>
</appender>
<appender name="WORKERLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">

View File

@ -9,19 +9,26 @@
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="TASKLOGFILE" class="cn.escheduler.server.worker.log.TaskLogAppender">
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<filter class="cn.escheduler.server.worker.log.TaskLogFilter"></filter>
<file>${log.base}/{processDefinitionId}/{processInstanceId}/{taskInstanceId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
<Discriminator class="cn.escheduler.server.worker.log.TaskLogDiscriminator">
<key>taskAppId</key>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
</appender>
</sift>
</appender>
<appender name="COMBINEDLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">

View File

@ -34,6 +34,11 @@ public class LoggerUtils {
*/
private static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX);
/**
* Task Logger's prefix
*/
public static final String TASK_LOGGER_INFO_PREFIX = "TaskLogInfo";
/**
* build job id
* @param affix
@ -46,7 +51,7 @@ public class LoggerUtils {
int processDefId,
int processInstId,
int taskId){
return String.format("%s_%s_%s_%s",affix,
return String.format("%s-%s/%s/%s",affix,
processDefId,
processInstId,
taskId);

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.server.worker.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.FileAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* task log appender
*/
public class TaskLogAppender extends FileAppender<ILoggingEvent> {
private String currentlyActiveFile;
@Override
protected void append(ILoggingEvent event) {
if (currentlyActiveFile == null){
currentlyActiveFile = getFile();
}
String activeFile = currentlyActiveFile;
// thread name taskThreadName-processDefineId_processInstanceId_taskInstanceId
String threadName = event.getThreadName();
String[] threadNameArr = threadName.split("-");
// logId = processDefineId_processInstanceId_taskInstanceId
String logId = threadNameArr[1];
// split logId
threadNameArr = logId.split("_");
String processDefineId = threadNameArr[0];
String processInstanceId = threadNameArr[1];
String taskInstanceId = threadNameArr[2];
activeFile = activeFile.replace("{processDefinitionId}",processDefineId);
activeFile = activeFile.replace("{processInstanceId}",processInstanceId);
activeFile = activeFile.replace("{taskInstanceId}",taskInstanceId);
setFile(activeFile);
start();
super.subAppend(event);
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.server.worker.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.sift.AbstractDiscriminator;
import cn.escheduler.server.utils.LoggerUtils;
public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
private String key;
/**
* logger name should be like:
* Task Logger name should be like: TaskLogInfo-{processDefinitionId}/{processInstanceId}/{taskInstanceId}
*/
public String getDiscriminatingValue(ILoggingEvent event) {
String loggerName = event.getLoggerName();
String prefix = LoggerUtils.TASK_LOGGER_INFO_PREFIX + "-";
if (loggerName.startsWith(prefix)) {
return loggerName.substring(prefix.length());
} else {
return "unknown_task";
}
}
@Override
public void start() {
started = true;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
}

View File

@ -19,6 +19,7 @@ package cn.escheduler.server.worker.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.spi.FilterReply;
import cn.escheduler.server.utils.LoggerUtils;
/**
* task log filter
@ -27,7 +28,7 @@ public class TaskLogFilter extends Filter<ILoggingEvent> {
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith("TaskLogInfo-")){
if (event.getLoggerName().startsWith(LoggerUtils.TASK_LOGGER_INFO_PREFIX)) {
return FilterReply.ACCEPT;
}
return FilterReply.DENY;

View File

@ -1,345 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.server.worker.log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
/**
* custom task logger
*/
public class TaskLogger implements Logger {
private static Logger logger = LoggerFactory.getLogger(TaskLogger.class);
private String taskAppId;
public TaskLogger(String taskAppId) {
this.taskAppId = taskAppId;
}
private String addJobId(String msg) {
return String.format("[taskAppId=%s] %s", taskAppId, msg);
}
@Override
public String getName() {
return logger.getName();
}
@Override
public boolean isTraceEnabled() {
return logger.isTraceEnabled();
}
@Override
public void trace(String msg) {
logger.trace(addJobId(msg));
}
@Override
public void trace(String format, Object arg) {
logger.trace(addJobId(format), arg);
}
@Override
public void trace(String format, Object arg1, Object arg2) {
logger.trace(addJobId(format), arg1, arg2);
}
@Override
public void trace(String format, Object... arguments) {
logger.trace(addJobId(format), arguments);
}
@Override
public void trace(String msg, Throwable t) {
logger.trace(addJobId(msg), t);
}
@Override
public boolean isTraceEnabled(Marker marker) {
return logger.isTraceEnabled(marker);
}
@Override
public void trace(Marker marker, String msg) {
logger.trace(marker, addJobId(msg));
}
@Override
public void trace(Marker marker, String format, Object arg) {
logger.trace(marker, addJobId(format), arg);
}
@Override
public void trace(Marker marker, String format, Object arg1, Object arg2) {
logger.trace(marker, addJobId(format), arg1, arg2);
}
@Override
public void trace(Marker marker, String format, Object... argArray) {
logger.trace(marker, addJobId(format), argArray);
}
@Override
public void trace(Marker marker, String msg, Throwable t) {
logger.trace(marker, addJobId(msg), t);
}
@Override
public boolean isDebugEnabled() {
return logger.isDebugEnabled();
}
@Override
public void debug(String msg) {
logger.debug(addJobId(msg));
}
@Override
public void debug(String format, Object arg) {
logger.debug(addJobId(format), arg);
}
@Override
public void debug(String format, Object arg1, Object arg2) {
logger.debug(addJobId(format), arg1, arg2);
}
@Override
public void debug(String format, Object... arguments) {
logger.debug(addJobId(format), arguments);
}
@Override
public void debug(String msg, Throwable t) {
logger.debug(addJobId(msg), t);
}
@Override
public boolean isDebugEnabled(Marker marker) {
return logger.isDebugEnabled();
}
@Override
public void debug(Marker marker, String msg) {
logger.debug(marker, addJobId(msg));
}
@Override
public void debug(Marker marker, String format, Object arg) {
logger.debug(marker, addJobId(format), arg);
}
@Override
public void debug(Marker marker, String format, Object arg1, Object arg2) {
logger.debug(marker, addJobId(format), arg1, arg2);
}
@Override
public void debug(Marker marker, String format, Object... arguments) {
logger.debug(marker, addJobId(format), arguments);
}
@Override
public void debug(Marker marker, String msg, Throwable t) {
logger.debug(marker, addJobId(msg), t);
}
@Override
public boolean isInfoEnabled() {
return logger.isInfoEnabled();
}
@Override
public void info(String msg) {
logger.info(addJobId(msg));
}
@Override
public void info(String format, Object arg) {
logger.info(addJobId(format), arg);
}
@Override
public void info(String format, Object arg1, Object arg2) {
logger.info(addJobId(format), arg1, arg2);
}
@Override
public void info(String format, Object... arguments) {
logger.info(addJobId(format), arguments);
}
@Override
public void info(String msg, Throwable t) {
logger.info(addJobId(msg), t);
}
@Override
public boolean isInfoEnabled(Marker marker) {
return logger.isInfoEnabled();
}
@Override
public void info(Marker marker, String msg) {
logger.info(marker, addJobId(msg));
}
@Override
public void info(Marker marker, String format, Object arg) {
logger.info(marker, addJobId(format), arg);
}
@Override
public void info(Marker marker, String format, Object arg1, Object arg2) {
logger.info(marker, addJobId(format), arg1, arg2);
}
@Override
public void info(Marker marker, String format, Object... arguments) {
logger.info(marker, addJobId(format), arguments);
}
@Override
public void info(Marker marker, String msg, Throwable t) {
logger.info(marker, addJobId(msg), t);
}
@Override
public boolean isWarnEnabled() {
return logger.isWarnEnabled();
}
@Override
public void warn(String msg) {
logger.warn(addJobId(msg));
}
@Override
public void warn(String format, Object arg) {
logger.warn(addJobId(format), arg);
}
@Override
public void warn(String format, Object arg1, Object arg2) {
logger.warn(addJobId(format), arg1, arg2);
}
@Override
public void warn(String format, Object... arguments) {
logger.warn(addJobId(format), arguments);
}
@Override
public void warn(String msg, Throwable t) {
logger.warn(addJobId(msg), t);
}
@Override
public boolean isWarnEnabled(Marker marker) {
return logger.isWarnEnabled();
}
@Override
public void warn(Marker marker, String msg) {
logger.warn(marker, addJobId(msg));
}
@Override
public void warn(Marker marker, String format, Object arg) {
logger.warn(marker, addJobId(format), arg);
}
@Override
public void warn(Marker marker, String format, Object arg1, Object arg2) {
logger.warn(marker, addJobId(format), arg1, arg2);
}
@Override
public void warn(Marker marker, String format, Object... arguments) {
logger.warn(marker, addJobId(format), arguments);
}
@Override
public void warn(Marker marker, String msg, Throwable t) {
logger.warn(marker, addJobId(msg), t);
}
@Override
public boolean isErrorEnabled() {
return logger.isErrorEnabled();
}
@Override
public void error(String msg) {
logger.error(addJobId(msg));
}
@Override
public void error(String format, Object arg) {
logger.error(addJobId(format), arg);
}
@Override
public void error(String format, Object arg1, Object arg2) {
logger.error(addJobId(format), arg1, arg2);
}
@Override
public void error(String format, Object... arguments) {
logger.error(addJobId(format), arguments);
}
@Override
public void error(String msg, Throwable t) {
logger.error(addJobId(msg), t);
}
@Override
public boolean isErrorEnabled(Marker marker) {
return logger.isErrorEnabled();
}
@Override
public void error(Marker marker, String msg) {
logger.error(marker, addJobId(msg));
}
@Override
public void error(Marker marker, String format, Object arg) {
logger.error(marker, addJobId(format), arg);
}
@Override
public void error(Marker marker, String format, Object arg1, Object arg2) {
logger.error(marker, addJobId(format), arg1, arg2);
}
@Override
public void error(Marker marker, String format, Object... arguments) {
logger.error(marker, addJobId(format), arguments);
}
@Override
public void error(Marker marker, String msg, Throwable t) {
logger.error(marker, addJobId(msg), t);
}
}

View File

@ -40,7 +40,6 @@ import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.Tenant;
import cn.escheduler.server.utils.LoggerUtils;
import cn.escheduler.server.utils.ParamUtils;
import cn.escheduler.server.worker.log.TaskLogger;
import cn.escheduler.server.worker.task.AbstractTask;
import cn.escheduler.server.worker.task.TaskManager;
import cn.escheduler.server.worker.task.TaskProps;
@ -66,11 +65,6 @@ public class TaskScheduleThread implements Runnable {
*/
private final Logger logger = LoggerFactory.getLogger(TaskScheduleThread.class);
/**
* task prefix
*/
private static final String TASK_PREFIX = "TASK";
/**
* task instance
*/
@ -147,7 +141,7 @@ public class TaskScheduleThread implements Runnable {
taskInstance.getId()));
// custom logger
TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX,
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));

View File

@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
@ -347,7 +348,8 @@ public abstract class AbstractCommandExecutor {
*/
private void parseProcessOutput(Process process) {
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskAppId);
ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName).submit(new Runnable(){
ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName);
parseProcessOutputExecutorService.submit(new Runnable(){
@Override
public void run() {
BufferedReader inReader = null;
@ -373,7 +375,7 @@ public abstract class AbstractCommandExecutor {
}
}
});
parseProcessOutputExecutorService.shutdown();
}
public int getPid() {

View File

@ -9,19 +9,26 @@
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="TASKLOGFILE" class="cn.escheduler.server.worker.log.TaskLogAppender">
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<filter class="cn.escheduler.server.worker.log.TaskLogFilter"></filter>
<file>${log.base}/{processDefinitionId}/{processInstanceId}/{taskInstanceId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
<Discriminator class="cn.escheduler.server.worker.log.TaskLogDiscriminator">
<key>taskAppId</key>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
</appender>
</sift>
</appender>
<appender name="WORKERLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">

View File

@ -23,7 +23,6 @@ import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.utils.LoggerUtils;
import cn.escheduler.server.worker.log.TaskLogger;
import cn.escheduler.server.worker.task.AbstractTask;
import cn.escheduler.server.worker.task.TaskManager;
import cn.escheduler.server.worker.task.TaskProps;
@ -43,7 +42,6 @@ import java.util.Date;
public class ShellCommandExecutorTest {
private static final Logger logger = LoggerFactory.getLogger(ShellCommandExecutorTest.class);
private static final String TASK_PREFIX = "TASK";
private ProcessDao processDao = null;
@ -75,7 +73,7 @@ public class ShellCommandExecutorTest {
// custom logger
TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX,
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefinitionId(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));

View File

@ -23,7 +23,6 @@ import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.utils.LoggerUtils;
import cn.escheduler.server.worker.log.TaskLogger;
import cn.escheduler.server.worker.task.AbstractTask;
import cn.escheduler.server.worker.task.TaskManager;
import cn.escheduler.server.worker.task.TaskProps;
@ -43,7 +42,6 @@ import java.util.Date;
public class SqlExecutorTest {
private static final Logger logger = LoggerFactory.getLogger(SqlExecutorTest.class);
private static final String TASK_PREFIX = "TASK";
private ProcessDao processDao = null;
@ -118,7 +116,7 @@ public class SqlExecutorTest {
// custom logger
TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX,
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefinitionId(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));