0,0 → 1,83 |
package net.brutex.xservices.util; |
|
import lombok.extern.slf4j.Slf4j; |
import org.apache.commons.io.IOUtils; |
import org.h2.jdbcx.JdbcConnectionPool; |
import org.quartz.*; |
|
import java.io.IOException; |
import java.io.Reader; |
import java.io.StringReader; |
import java.sql.*; |
import java.time.Instant; |
import java.util.concurrent.atomic.AtomicBoolean; |
import java.util.concurrent.atomic.AtomicLong; |
|
@Slf4j |
@DisallowConcurrentExecution |
public class EventLogCleanerJob implements Job, InterruptableJob { |
|
private final AtomicBoolean isInterrupted = new AtomicBoolean(false); |
|
@Override |
public void execute(JobExecutionContext context) throws JobExecutionException { |
final Instant d = Instant.now(); |
final long ts = d.toEpochMilli(); |
|
log.info("EventLogCleaner is executing now."); |
final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap() |
.get(EventmanagerConfiguration.KEY); |
|
final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection"); |
final JdbcConnectionPool mpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection"); |
|
final String moveSQL = "INSERT INTO brutex.tbl_events_all DIRECT SELECT * FROM MEM_ALL_EVENTS " + |
"where btx_timestamp < " + (ts-5000) + " "; |
final String deleteTable = "DELETE FROM MEM_ALL_EVENTS where btx_timestamp < " + (ts-5000); |
final String deleteMemTable = "DELETE FROM brutex.tbl_events_all where btx_timestamp < " + (ts-5000); |
|
|
/** |
* Move event table data to all events log |
*/ |
|
|
try (Connection fcon = fpool.getConnection(); |
Connection mcon = mpool.getConnection()){ |
fcon.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); |
fcon.setAutoCommit(false); |
Statement stmt = fcon.createStatement(); |
stmt.execute(moveSQL); |
int count = stmt.getUpdateCount(); |
fcon.commit(); |
Statement mstm = mcon.createStatement(); |
|
|
mstm.execute(deleteMemTable); |
int count2 = mstm.getUpdateCount(); |
mcon.commit(); |
|
log.info("EventLogCleaner moved '{}/ deleted {}' events into the persistence space.", count, count2); |
|
} catch (SQLException e) { |
log.error("Exception in SQL execution: {}", e.getMessage()); |
throw new JobExecutionException(e); |
} |
} |
|
|
/** |
* <p> |
* Called by the <code>{@link Scheduler}</code> when a user |
* interrupts the <code>Job</code>. |
* </p> |
* |
* @throws UnableToInterruptJobException if there is an exception while interrupting the job. |
*/ |
@Override |
public synchronized void interrupt() throws UnableToInterruptJobException { |
isInterrupted.set(true); |
log.warn("EventLogCleaner received and interrupt."); |
Thread.currentThread().interrupt(); |
} |
} |