//xservices/trunk/src/main/java/net/brutex/xservices/util/EventEmitter.java |
---|
51,10 → 51,11 |
* Move event table data to snapshot |
*/ |
Connection con = null; |
Connection fcon = null; |
try { |
con = pool.getConnection(); |
try( Connection con = pool.getConnection(); |
Connection fcon = fpool.getConnection(); |
) { |
con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); |
con.setAutoCommit(false); |
Statement stmt = con.createStatement(); |
65,7 → 66,7 |
con.commit(); //all events moved from tbl_events to tbl_events_snap at this point |
fcon = fpool.getConnection(); |
PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL); |
PreparedStatement del = con.prepareStatement(deleteSQL); |
83,7 → 84,8 |
long event_ts = rs.getLong(6); |
boolean bretry = false; |
SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getCharacterStream())); |
//SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getAsciiStream()c.getCharacterStream())); |
SimpleSoap ss = new SimpleSoap(url, id, c.getSubString(1L, (int) c.length())); |
int retry = 0; |
Reader response = null; |
String rsp = ""; |
152,16 → 154,8 |
} catch (IOException e) { |
log.error("Exception in SQL execution: {}", e.getMessage()); |
throw new RuntimeException(e); |
} finally { |
try { |
if(fcon!=null) fcon.close(); |
if(con!=null) con.close(); |
} catch (SQLException e) { |
log.error("Error closing the database connections: {}", e.getMessage()); |
throw new RuntimeException(e); |
} |
} |
} |
/** |
//xservices/trunk/src/main/java/net/brutex/xservices/util/EventLogCleanerJob.java |
---|
0,0 → 1,83 |
package net.brutex.xservices.util; |
import lombok.extern.slf4j.Slf4j; |
import org.apache.commons.io.IOUtils; |
import org.h2.jdbcx.JdbcConnectionPool; |
import org.quartz.*; |
import java.io.IOException; |
import java.io.Reader; |
import java.io.StringReader; |
import java.sql.*; |
import java.time.Instant; |
import java.util.concurrent.atomic.AtomicBoolean; |
import java.util.concurrent.atomic.AtomicLong; |
@Slf4j |
@DisallowConcurrentExecution |
public class EventLogCleanerJob implements Job, InterruptableJob { |
private final AtomicBoolean isInterrupted = new AtomicBoolean(false); |
@Override |
public void execute(JobExecutionContext context) throws JobExecutionException { |
final Instant d = Instant.now(); |
final long ts = d.toEpochMilli(); |
log.info("EventLogCleaner is executing now."); |
final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap() |
.get(EventmanagerConfiguration.KEY); |
final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection"); |
final JdbcConnectionPool mpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection"); |
final String moveSQL = "INSERT INTO brutex.tbl_events_all DIRECT SELECT * FROM MEM_ALL_EVENTS " + |
"where btx_timestamp < " + (ts-5000) + " "; |
final String deleteTable = "DELETE FROM MEM_ALL_EVENTS where btx_timestamp < " + (ts-5000); |
final String deleteMemTable = "DELETE FROM brutex.tbl_events_all where btx_timestamp < " + (ts-5000); |
/** |
* Move event table data to all events log |
*/ |
try (Connection fcon = fpool.getConnection(); |
Connection mcon = mpool.getConnection()){ |
fcon.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); |
fcon.setAutoCommit(false); |
Statement stmt = fcon.createStatement(); |
stmt.execute(moveSQL); |
int count = stmt.getUpdateCount(); |
fcon.commit(); |
Statement mstm = mcon.createStatement(); |
mstm.execute(deleteMemTable); |
int count2 = mstm.getUpdateCount(); |
mcon.commit(); |
log.info("EventLogCleaner moved '{}/ deleted {}' events into the persistence space.", count, count2); |
} catch (SQLException e) { |
log.error("Exception in SQL execution: {}", e.getMessage()); |
throw new JobExecutionException(e); |
} |
} |
/** |
* <p> |
* Called by the <code>{@link Scheduler}</code> when a user |
* interrupts the <code>Job</code>. |
* </p> |
* |
* @throws UnableToInterruptJobException if there is an exception while interrupting the job. |
*/ |
@Override |
public synchronized void interrupt() throws UnableToInterruptJobException { |
isInterrupted.set(true); |
log.warn("EventLogCleaner received and interrupt."); |
Thread.currentThread().interrupt(); |
} |
} |
//xservices/trunk/src/main/java/net/brutex/xservices/util/EventmanagerConfiguration.java |
---|
8,6 → 8,8 |
import org.apache.commons.configuration2.PropertiesConfiguration; |
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder; |
import org.apache.commons.configuration2.builder.PropertiesBuilderParametersImpl; |
import org.apache.commons.configuration2.event.ConfigurationEvent; |
import org.apache.commons.configuration2.event.EventListener; |
import org.apache.commons.configuration2.ex.ConfigurationException; |
import javax.servlet.ServletContext; |
39,6 → 41,8 |
private int interval; |
private String jdbc_memdb; |
private String jdbc_filedb; |
private boolean isEmitterActive = true; |
private int cleaner_interval; |
public synchronized EventmanagerConfiguration refreshConfig() { |
45,7 → 49,7 |
log.trace("Reading EventmanagerConfiguration from file eventmanager.properties."); |
FileBasedConfigurationBuilder<FileBasedConfiguration> builder = |
new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class) |
.configure(new PropertiesBuilderParametersImpl().setFileName("eventmanager.properties")); |
.configure(new PropertiesBuilderParametersImpl().setFileName("../eventmanager.properties")); |
try { |
Configuration config = builder.getConfiguration(); |
55,6 → 59,8 |
this.interval = config.getInt("interval", 10); |
this.jdbc_memdb = config.getString("memdb", "jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=-1;"); |
this.jdbc_filedb = config.getString("fdb", "jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=-1;"); |
this.isEmitterActive = config.getBoolean("emitter_active", true); |
this.cleaner_interval = config.getInt("cleaner_interval", 5); |
} catch (ConfigurationException e) { |
//xservices/trunk/src/main/java/net/brutex/xservices/util/MiscServiceServletContextListener.java |
---|
15,6 → 15,8 |
import java.sql.SQLException; |
import java.sql.Statement; |
import java.time.Instant; |
import java.time.temporal.ChronoUnit; |
import java.util.Date; |
import java.util.concurrent.atomic.AtomicLong; |
import static org.quartz.TriggerBuilder.newTrigger; |
48,12 → 50,12 |
*/ |
private static JdbcConnectionPool getDbPool(String dbConnectString) { |
JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", ""); |
p.setMaxConnections(16); |
p.setLoginTimeout(5); |
try { |
Connection c = p.getConnection(); |
p.setMaxConnections(256); |
p.setLoginTimeout(20); |
try (Connection c = p.getConnection();){ |
Statement s = c.createStatement(); |
s.execute(dbinit); |
c.commit(); |
log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit); |
c.close(); |
log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString); |
72,6 → 74,7 |
log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs()); |
JobKey key = JobKey.jobKey("ALFEmitter"); |
JobKey cleanerkey = JobKey.jobKey("EventLogCleaner"); |
synchronized (scheduler) { |
if (!scheduler.isShutdown() && scheduler.checkExists(key) ) { |
scheduler.interrupt(key); |
78,6 → 81,11 |
scheduler.deleteJob(key); |
log.info("Gracefully stopped the ALFEventEmitter job."); |
} |
if (!scheduler.isShutdown() && scheduler.checkExists(cleanerkey) ) { |
scheduler.interrupt(cleanerkey); |
scheduler.deleteJob(cleanerkey); |
log.info("Gracefully stopped the ALFEventEmitter job."); |
} |
if (!scheduler.isShutdown()) { |
scheduler.shutdown(true); |
} |
98,12 → 106,19 |
Connection con = fdbpool.getConnection(); |
Statement s = con.createStatement(); |
final String insert = "INSERT INTO brutex.tbl_events SELECT * from LINK UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from LINK2;"; |
final String insert = "INSERT INTO brutex.tbl_events SELECT * from MEM_INBOUND UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from MEM_OUTBOUND;"; |
s.execute(insert); |
int count = s.getUpdateCount(); |
log.info("Persisted {} rows in file-based database.", count); |
log.info("Persisted {} active event rows in file-based database.", count); |
final String save_all = "INSERT INTO brutex.tbl_events_all SELECT * from MEM_ALL_EVENTS;"; |
s.execute(save_all); |
count = s.getUpdateCount(); |
log.info("Persisted {} event rows from all_events log in file-based database", count); |
log.info("Shutting down in-memory database. Closing file-based database. Please wait ..."); |
s.execute("SHUTDOWN;"); |
con.close(); |
log.info("Shutting down databases complete."); |
} catch (SQLException e) { |
141,9 → 156,8 |
//Load events from file based database into in-memory database |
try { |
log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database."); |
final String link = "CREATE LINKED TABLE IF NOT EXISTS LINK('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " + |
"CREATE LINKED TABLE IF NOT EXISTS LINK2('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap');"; |
final String recoverSQL = "INSERT INTO LINK DIRECT SELECT * FROM brutex.tbl_events;"; |
final String link = getLinkSQL(); |
final String recoverSQL = "INSERT INTO MEM_INBOUND DIRECT SELECT * FROM brutex.tbl_events;"; |
final String truncate = "TRUNCATE TABLE brutex.tbl_events;"; |
int count = 0; |
Connection con = fdbpool.getConnection(); |
163,10 → 177,26 |
throw new RuntimeException(e); |
} |
//Start initial run of the emitter |
if(configuration.isEmitterActive()) { |
startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler")); |
} |
private synchronized void startEmitterImmediate(AtomicLong egres_counter, Scheduler scheduler) { |
//Start mem db log cleaner |
if(configuration.getCleaner_interval()>0) { |
startEventLogCleaner((Scheduler) context.getAttribute("scheduler")); |
} |
} |
private String getLinkSQL() { |
final String dbDriverClass = "org.h2.Driver"; |
final String link = "CREATE LINKED TABLE IF NOT EXISTS MEM_INBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " + |
"CREATE LINKED TABLE IF NOT EXISTS MEM_OUTBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap'); " + |
"CREATE LINKED TABLE IF NOT EXISTS MEM_ALL_EVENTS('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_all') AUTOCOMMIT OFF; " + |
""; |
return link; |
} |
private synchronized void startEmitterImmediate(AtomicLong egres, Scheduler scheduler) { |
try { |
if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) { |
JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build(); |
173,7 → 203,7 |
job2.getJobDataMap().put("mdbConnection", mempool); |
job2.getJobDataMap().put("fdbConnection", fdbpool); |
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli()); |
job2.getJobDataMap().put("egres_counter", egres_counter); |
job2.getJobDataMap().put("egres_counter", egres); |
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance()); |
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build(); |
scheduler.scheduleJob(job2, t); |
183,6 → 213,27 |
} |
} |
private void startEventLogCleaner(Scheduler scheduler) { |
try { |
if (!scheduler.checkExists(JobKey.jobKey("EventLogCleaner"))) { |
JobDetail job2 = JobBuilder.newJob(EventLogCleanerJob.class).withIdentity("EventLogCleaner").build(); |
job2.getJobDataMap().put("mdbConnection", mempool); |
job2.getJobDataMap().put("fdbConnection", fdbpool); |
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli()); |
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance()); |
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("EventLogCleaner") |
.withSchedule(SimpleScheduleBuilder.simpleSchedule() |
.withIntervalInMinutes(configuration.getCleaner_interval()) |
.repeatForever()) |
.startAt(Date.from(Instant.now().plus(configuration.getCleaner_interval(), ChronoUnit.MINUTES))) |
.build(); |
scheduler.scheduleJob(job2, t); |
} |
} catch (SchedulerException ex) { |
log.error("Could not start EventLogCleaner to process existing queue directly after startup: {}", ex.getMessage()); |
} |
} |
private void readConfiguration(ServletContext ctx) { |
/* Configure ServletContext attributes using configuration object*/ |
EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig(); |
//xservices/trunk/src/main/java/net/brutex/xservices/util/SimpleSoap.java |
---|
22,11 → 22,14 |
import org.apache.http.client.entity.EntityBuilder; |
import org.apache.http.client.fluent.Request; |
import org.apache.http.client.fluent.Response; |
import org.apache.http.entity.ContentType; |
import java.io.BufferedReader; |
import java.io.IOException; |
import java.io.InputStreamReader; |
import java.io.Reader; |
import java.nio.charset.Charset; |
import java.nio.charset.StandardCharsets; |
import java.util.concurrent.atomic.AtomicBoolean; |
69,11 → 72,13 |
public Reader sendSoap(boolean isDropResponse) { |
Reader response = null; |
long start = System.currentTimeMillis(); |
EntityBuilder entitybuilder = EntityBuilder.create(); |
entitybuilder.setContentEncoding("UTF-8"); |
entitybuilder.setText(soapBody); |
HttpEntity entity = entitybuilder.build(); |
HttpEntity entity = EntityBuilder.create() |
.setText(soapBody) |
.setContentType(ContentType.create("text/xml", StandardCharsets.UTF_8)) |
.setContentEncoding("UTF-8") |
.build(); |
log.trace("Sending event '{}' to target ALF Event Manager.", id); |
if(isInterrupted.get()) return null; |
81,7 → 86,7 |
try { |
Response resp = Request.Post(url) |
.addHeader("Accept", "text/xml") |
.addHeader("Content-Type", "text/xml; charset=utf-8") |
//.addHeader("Content-Type", "text/xml; charset=utf-8") |
.addHeader("SOAPAction", "") |
.body(entity).execute(); |