201 |
brianR |
1 |
package net.brutex.xservices.util;
|
|
|
2 |
|
|
|
3 |
import lombok.extern.slf4j.Slf4j;
|
|
|
4 |
import org.apache.commons.io.IOUtils;
|
|
|
5 |
import org.h2.jdbcx.JdbcConnectionPool;
|
|
|
6 |
import org.quartz.*;
|
|
|
7 |
|
|
|
8 |
import java.io.IOException;
|
|
|
9 |
import java.io.Reader;
|
|
|
10 |
import java.io.StringReader;
|
|
|
11 |
import java.sql.*;
|
|
|
12 |
import java.time.Instant;
|
|
|
13 |
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
14 |
import java.util.concurrent.atomic.AtomicLong;
|
|
|
15 |
|
|
|
16 |
@Slf4j
|
203 |
brianR |
17 |
@DisallowConcurrentExecution
|
|
|
18 |
public class EventLogCleanerJob implements Job, InterruptableJob {
|
201 |
brianR |
19 |
|
|
|
20 |
private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
|
|
|
21 |
|
|
|
22 |
@Override
|
|
|
23 |
public void execute(JobExecutionContext context) throws JobExecutionException {
|
|
|
24 |
final Instant d = Instant.now();
|
|
|
25 |
final long ts = d.toEpochMilli();
|
203 |
brianR |
26 |
|
|
|
27 |
log.info("EventLogCleaner is executing now.");
|
201 |
brianR |
28 |
final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
|
|
|
29 |
.get(EventmanagerConfiguration.KEY);
|
|
|
30 |
|
|
|
31 |
final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
|
203 |
brianR |
32 |
final JdbcConnectionPool mpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
|
201 |
brianR |
33 |
|
203 |
brianR |
34 |
final String moveSQL = "INSERT INTO brutex.tbl_events_all DIRECT SELECT * FROM MEM_ALL_EVENTS " +
|
|
|
35 |
"where btx_timestamp < " + (ts-5000) + " ";
|
|
|
36 |
final String deleteTable = "DELETE FROM MEM_ALL_EVENTS where btx_timestamp < " + (ts-5000);
|
|
|
37 |
final String deleteMemTable = "DELETE FROM brutex.tbl_events_all where btx_timestamp < " + (ts-5000);
|
201 |
brianR |
38 |
|
|
|
39 |
|
|
|
40 |
/**
|
203 |
brianR |
41 |
* Move event table data to all events log
|
201 |
brianR |
42 |
*/
|
|
|
43 |
|
|
|
44 |
|
203 |
brianR |
45 |
try (Connection fcon = fpool.getConnection();
|
|
|
46 |
Connection mcon = mpool.getConnection()){
|
|
|
47 |
fcon.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
|
|
|
48 |
fcon.setAutoCommit(false);
|
|
|
49 |
Statement stmt = fcon.createStatement();
|
|
|
50 |
stmt.execute(moveSQL);
|
|
|
51 |
int count = stmt.getUpdateCount();
|
|
|
52 |
fcon.commit();
|
|
|
53 |
Statement mstm = mcon.createStatement();
|
201 |
brianR |
54 |
|
|
|
55 |
|
203 |
brianR |
56 |
mstm.execute(deleteMemTable);
|
|
|
57 |
int count2 = mstm.getUpdateCount();
|
|
|
58 |
mcon.commit();
|
201 |
brianR |
59 |
|
203 |
brianR |
60 |
log.info("EventLogCleaner moved '{}/ deleted {}' events into the persistence space.", count, count2);
|
201 |
brianR |
61 |
|
|
|
62 |
} catch (SQLException e) {
|
|
|
63 |
log.error("Exception in SQL execution: {}", e.getMessage());
|
|
|
64 |
throw new JobExecutionException(e);
|
|
|
65 |
}
|
|
|
66 |
}
|
|
|
67 |
|
|
|
68 |
|
203 |
brianR |
69 |
/**
|
201 |
brianR |
70 |
* <p>
|
|
|
71 |
* Called by the <code>{@link Scheduler}</code> when a user
|
|
|
72 |
* interrupts the <code>Job</code>.
|
|
|
73 |
* </p>
|
|
|
74 |
*
|
|
|
75 |
* @throws UnableToInterruptJobException if there is an exception while interrupting the job.
|
|
|
76 |
*/
|
|
|
77 |
@Override
|
|
|
78 |
public synchronized void interrupt() throws UnableToInterruptJobException {
|
|
|
79 |
isInterrupted.set(true);
|
203 |
brianR |
80 |
log.warn("EventLogCleaner received and interrupt.");
|
|
|
81 |
Thread.currentThread().interrupt();
|
201 |
brianR |
82 |
}
|
|
|
83 |
}
|