Rev 201 | Blame | Compare with Previous | Last modification | View Log | Download | RSS feed
package net.brutex.xservices.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.h2.jdbcx.JdbcConnectionPool;
import org.quartz.*;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.sql.*;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@DisallowConcurrentExecution
public class EventLogCleanerJob implements Job, InterruptableJob {
private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
final Instant d = Instant.now();
final long ts = d.toEpochMilli();
log.info("EventLogCleaner is executing now.");
final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
.get(EventmanagerConfiguration.KEY);
final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
final JdbcConnectionPool mpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
final String moveSQL = "INSERT INTO brutex.tbl_events_all DIRECT SELECT * FROM MEM_ALL_EVENTS " +
"where btx_timestamp < " + (ts-5000) + " ";
final String deleteTable = "DELETE FROM MEM_ALL_EVENTS where btx_timestamp < " + (ts-5000);
final String deleteMemTable = "DELETE FROM brutex.tbl_events_all where btx_timestamp < " + (ts-5000);
/**
* Move event table data to all events log
*/
try (Connection fcon = fpool.getConnection();
Connection mcon = mpool.getConnection()){
fcon.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
fcon.setAutoCommit(false);
Statement stmt = fcon.createStatement();
stmt.execute(moveSQL);
int count = stmt.getUpdateCount();
fcon.commit();
Statement mstm = mcon.createStatement();
mstm.execute(deleteMemTable);
int count2 = mstm.getUpdateCount();
mcon.commit();
log.info("EventLogCleaner moved '{}/ deleted {}' events into the persistence space.", count, count2);
} catch (SQLException e) {
log.error("Exception in SQL execution: {}", e.getMessage());
throw new JobExecutionException(e);
}
}
/**
* <p>
* Called by the <code>{@link Scheduler}</code> when a user
* interrupts the <code>Job</code>.
* </p>
*
* @throws UnableToInterruptJobException if there is an exception while interrupting the job.
*/
@Override
public synchronized void interrupt() throws UnableToInterruptJobException {
isInterrupted.set(true);
log.warn("EventLogCleaner received and interrupt.");
Thread.currentThread().interrupt();
}
}