10,14 → 10,12 |
import java.io.StringReader; |
import java.sql.*; |
import java.time.Instant; |
import java.util.Date; |
import java.util.concurrent.atomic.AtomicBoolean; |
import java.util.concurrent.atomic.AtomicLong; |
|
import static org.quartz.TriggerBuilder.newTrigger; |
|
@Slf4j |
public class EventEmitter implements Job, InterruptableJob { |
@DisallowConcurrentExecution |
public class EventLogCleanerJob implements Job, InterruptableJob { |
|
private final AtomicBoolean isInterrupted = new AtomicBoolean(false); |
|
25,143 → 23,47 |
public void execute(JobExecutionContext context) throws JobExecutionException { |
final Instant d = Instant.now(); |
final long ts = d.toEpochMilli(); |
|
log.info("EventLogCleaner is executing now."); |
final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap() |
.get(EventmanagerConfiguration.KEY); |
|
final String url = conf.getTargeturl(); |
|
final JdbcConnectionPool pool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection"); |
final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection"); |
final long run_key = context.getMergedJobDataMap().getLong("run_key"); |
final AtomicLong egres_counter = (AtomicLong) context.getMergedJobDataMap().get("egres_counter"); |
final JdbcConnectionPool mpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection"); |
|
final String querySQL = "SELECT btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_type, btx_timestamp FROM brutex.tbl_events_snap ORDER BY btx_timestamp asc FOR UPDATE;"; |
final String deleteSQL = "DELETE FROM brutex.tbl_events_snap where btx_id=?"; |
final String deleteTable = "TRUNCATE TABLE brutex.tbl_events;"; |
final String moveSQL = "INSERT INTO brutex.tbl_events_all DIRECT SELECT * FROM MEM_ALL_EVENTS " + |
"where btx_timestamp < " + (ts-5000) + " "; |
final String deleteTable = "DELETE FROM MEM_ALL_EVENTS where btx_timestamp < " + (ts-5000); |
final String deleteMemTable = "DELETE FROM brutex.tbl_events_all where btx_timestamp < " + (ts-5000); |
|
final String moveSQL = "INSERT INTO brutex.tbl_events_snap DIRECT SELECT " + |
" btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, ?, btx_event FROM brutex.tbl_events; "; |
|
|
final String moveErrorSQL = "MERGE INTO brutex.tbl_events_errors " + |
"KEY (btx_event_type, btx_obj_type, btx_obj_id) " + |
"VALUES (?,?,?,?,?,?,?,?);"; |
|
/** |
* Move event table data to snapshot |
* Move event table data to all events log |
*/ |
|
Connection con = null; |
Connection fcon = null; |
try { |
con = pool.getConnection(); |
con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); |
con.setAutoCommit(false); |
Statement stmt = con.createStatement(); |
PreparedStatement moveprep= con.prepareStatement(moveSQL); |
moveprep.setLong(1, run_key); |
moveprep.execute(); |
stmt.execute(deleteTable); |
con.commit(); //all events moved from tbl_events to tbl_events_snap at this point |
|
try (Connection fcon = fpool.getConnection(); |
Connection mcon = mpool.getConnection()){ |
fcon.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); |
fcon.setAutoCommit(false); |
Statement stmt = fcon.createStatement(); |
stmt.execute(moveSQL); |
int count = stmt.getUpdateCount(); |
fcon.commit(); |
Statement mstm = mcon.createStatement(); |
|
fcon = fpool.getConnection(); |
PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL); |
|
PreparedStatement del = con.prepareStatement(deleteSQL); |
mstm.execute(deleteMemTable); |
int count2 = mstm.getUpdateCount(); |
mcon.commit(); |
|
ResultSet rs = stmt.executeQuery(querySQL); |
log.info("EventLogCleaner moved '{}/ deleted {}' events into the persistence space.", count, count2); |
|
|
while(rs.next() && !isInterrupted.get()) { |
/* btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_typ */ |
String id = rs.getString(1); |
Clob c = rs.getClob(2); |
String obj_id = rs.getString(3); |
String event_type = rs.getString(4); |
String obj_type = rs.getString(5); |
long event_ts = rs.getLong(6); |
boolean bretry = false; |
|
SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getCharacterStream())); |
int retry = 0; |
Reader response = null; |
String rsp = ""; |
boolean succeeded = false; |
while(retry < 3 && !succeeded && !isInterrupted.get()) { |
retry++; |
response = ss.sendSoap(false); |
succeeded = true; |
if(response!=null) { |
rsp = IOUtils.toString(response); |
} |
|
if (rsp.contains("<soap:Fault") || rsp.contains("<soapenv:Fault")) { succeeded=false; bretry=false;}; |
if (! rsp.contains(":Envelope ")) { succeeded=false; bretry=true;}; |
|
if (succeeded) { |
// Successfully send |
del.setString(1, id); |
del.execute(); |
con.commit(); |
egres_counter.incrementAndGet(); |
log.debug("Successfully sent event '{}' to target ALF Event Manager.", id); |
} else { |
// Error during sending |
log.warn("Unable to send ALF Event '{}' to event manager. Will retry in 2 seconds. This is the {}. time.", id, retry); |
try { |
Thread.sleep(2000); |
} catch (InterruptedException e) { |
log.error("Interrupted while waiting to retry: {}", e.getMessage()); |
} |
} |
} |
|
if(! succeeded) { |
log.error("Failed to send ALF Event '{}' to the event manager. Giving up. " + |
"Moving event back to the queue unless there is a superseding event already queued.", id); |
|
|
try { |
//this is in file-based db |
errorPrepSql.setString(1, event_type); |
errorPrepSql.setString(2, id); |
errorPrepSql.setString(3, obj_type); |
errorPrepSql.setString(4, obj_id); |
errorPrepSql.setLong(5, event_ts); |
errorPrepSql.setBoolean(6, bretry); |
errorPrepSql.setClob(7, new StringReader(rsp) ); |
errorPrepSql.setClob(8, c); |
errorPrepSql.execute(); |
fcon.commit(); |
|
//this is in-memory |
del.setString(1, id); |
del.execute(); |
con.commit(); |
} catch (SQLException e) { |
log.error("Exception in SQL execution during writing error events: {}", e.getMessage()); |
} |
} |
} |
|
|
} catch (SQLException e) { |
log.error("Exception in SQL execution: {}", e.getMessage()); |
throw new JobExecutionException(e); |
} catch (IOException e) { |
log.error("Exception in SQL execution: {}", e.getMessage()); |
throw new RuntimeException(e); |
} finally { |
try { |
if(fcon!=null) fcon.close(); |
if(con!=null) con.close(); |
} catch (SQLException e) { |
log.error("Error closing the database connections: {}", e.getMessage()); |
throw new RuntimeException(e); |
} |
} |
} |
|
|
/** |
175,6 → 77,7 |
@Override |
public synchronized void interrupt() throws UnableToInterruptJobException { |
isInterrupted.set(true); |
log.warn("ALFEmitter received and interrupt."); |
log.warn("EventLogCleaner received and interrupt."); |
Thread.currentThread().interrupt(); |
} |
} |