PL-10578 Sequential Queries feature is enabled by default but sys_query_result cleaning is not (make deleteForInactiveSessions method more scalable)

This commit is contained in:
Konstantin Krivopustov 2018-04-01 14:43:40 +04:00
parent fe2e820aa3
commit 0a7e89e1ad
2 changed files with 141 additions and 22 deletions

View File

@ -18,10 +18,7 @@
package com.haulmont.cuba.core.app.queryresults;
import com.haulmont.bali.db.QueryRunner;
import com.haulmont.cuba.core.EntityManager;
import com.haulmont.cuba.core.Persistence;
import com.haulmont.cuba.core.Query;
import com.haulmont.cuba.core.Transaction;
import com.haulmont.cuba.core.*;
import com.haulmont.cuba.core.app.ClusterManagerAPI;
import com.haulmont.cuba.core.app.DataServiceQueryBuilder;
import com.haulmont.cuba.core.global.*;
@ -65,7 +62,11 @@ public class QueryResultsManager implements QueryResultsManagerAPI {
@Inject
protected Metadata metadata;
private static final int BATCH_SIZE = 100;
protected static final int BATCH_SIZE = 100;
protected static final int DELETE_BATCH_SIZE = 100;
protected static final int INACTIVE_DELETION_MAX = 100000;
@Override
public void savePreviousQueryResults(LoadContext loadContext) {
@ -119,7 +120,7 @@ public class QueryResultsManager implements QueryResultsManagerAPI {
insert(queryKey, idList);
}
private boolean resultsAlreadySaved(Integer queryKey, LoadContext.Query query) {
protected boolean resultsAlreadySaved(Integer queryKey, LoadContext.Query query) {
LinkedHashMap<Integer, QueryHolder> recentQueries =
userSessionSource.getUserSession().getAttribute("_recentQueries");
if (recentQueries == null) {
@ -234,28 +235,49 @@ public class QueryResultsManager implements QueryResultsManagerAPI {
|| !configuration.getConfig(GlobalConfig.class).getAllowQueryFromSelected())
return;
internalDeleteForInactiveSessions();
}
public void internalDeleteForInactiveSessions() {
log.debug("Delete query results for inactive user sessions");
StringBuilder sb = new StringBuilder("delete from SYS_QUERY_RESULT");
Collection<UserSession> userSessionEntities = userSessions.getUserSessionsStream().collect(Collectors.toList());
DbTypeConverter converter = persistence.getDbTypeConverter();
if (!userSessionEntities.isEmpty()) {
sb.append(" where SESSION_ID not in (");
for (Iterator<UserSession> it = userSessionEntities.iterator(); it.hasNext(); ) {
UserSession userSession = it.next();
UUID userSessionId = userSession.getId();
String userSessionIdStr = converter.getSqlObject(userSessionId).toString();
sb.append("'").append(userSessionIdStr).append("'");
if (it.hasNext())
sb.append(",");
}
sb.append(")");
List<Object[]> rows;
try (Transaction tx = persistence.createTransaction()) {
TypedQuery<Object[]> query = persistence.getEntityManager().createQuery(
"select e.id, e.sessionId from sys$QueryResult e", Object[].class);
query.setMaxResults(INACTIVE_DELETION_MAX);
rows = query.getResultList();
}
if (rows.size() == INACTIVE_DELETION_MAX) {
log.debug("Processing " + INACTIVE_DELETION_MAX + " records, run again for the rest");
}
Set<UUID> sessionIds = userSessions.getUserSessionsStream().map(UserSession::getId).collect(Collectors.toSet());
List<Long> ids = new ArrayList<>();
int i = 0;
for (Object[] row : rows) {
if (!sessionIds.contains((UUID) row[1])) {
ids.add((Long) row[0]);
}
i++;
if (i % DELETE_BATCH_SIZE == 0) {
delete(ids);
ids.clear();
}
}
if (!ids.isEmpty())
delete(ids);
}
protected void delete(List<Long> ids) {
log.debug("Deleting " + ids.size() + " records");
String str = ids.stream().map(String::valueOf).collect(Collectors.joining(","));
QueryRunner runner = new QueryRunner(persistence.getDataSource());
try {
runner.update(sb.toString());
runner.update("delete from SYS_QUERY_RESULT where ID in (" + str + ")");
} catch (SQLException e) {
throw new RuntimeException(e);
throw new RuntimeException("Error deleting query result records", e);
}
}
}

View File

@ -0,0 +1,97 @@
/*
* Copyright (c) 2008-2018 Haulmont.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spec.cuba.core.query_results
import com.haulmont.bali.db.QueryRunner
import com.haulmont.cuba.core.Persistence
import com.haulmont.cuba.core.app.queryresults.QueryResultsManager
import com.haulmont.cuba.core.entity.QueryResult
import com.haulmont.cuba.core.global.AppBeans
import com.haulmont.cuba.core.global.Metadata
import com.haulmont.cuba.core.global.UserSessionSource
import com.haulmont.cuba.security.app.UserSessions
import com.haulmont.cuba.security.global.UserSession
import com.haulmont.cuba.testsupport.TestContainer
import org.junit.ClassRule
import spock.lang.Shared
import spock.lang.Specification
class QueryResultsManagerTest extends Specification {
@Shared @ClassRule
public TestContainer cont = TestContainer.Common.INSTANCE
private Persistence persistence
private Metadata metadata
private QueryResultsManager queryResultsManager
void setup() {
persistence = cont.persistence()
metadata = cont.metadata()
queryResultsManager = AppBeans.get(QueryResultsManager)
}
void cleanup() {
new QueryRunner(persistence.getDataSource()).update("delete from SYS_QUERY_RESULT")
}
def "test deleteForInactiveSessions - empty table"() {
when:
queryResultsManager.internalDeleteForInactiveSessions()
then:
def list = persistence.callInTransaction { em -> em.createQuery('select e from sys$QueryResult e').resultList }
list.isEmpty()
}
def "test deleteForInactiveSessions - table with values"() {
def userSessionSource = AppBeans.get(UserSessionSource)
def userSessions = AppBeans.get(UserSessions)
def user = userSessionSource.userSession.user
def session1 = new UserSession(UUID.randomUUID(), user, Collections.emptyList(), Locale.ENGLISH, false)
userSessions.add(session1)
def session2 = new UserSession(UUID.randomUUID(), user, Collections.emptyList(), Locale.ENGLISH, false)
userSessions.add(session2)
persistence.callInTransaction { em ->
for (i in 0..255) {
def entity = metadata.create(QueryResult)
entity.setSessionId(UUID.randomUUID())
entity.setQueryKey(i)
em.persist(entity)
}
def entity = metadata.create(QueryResult)
entity.setSessionId(session1.id)
entity.setQueryKey(1000)
em.persist(entity)
}
when:
queryResultsManager.internalDeleteForInactiveSessions()
then:
def list = persistence.callInTransaction { em -> em.createQuery('select e from sys$QueryResult e').resultList }
list.size() == 1
list[0].sessionId == session1.id
}
}