Subversion Repositories XServices

Compare Revisions

Ignore whitespace Rev 201 → Rev 203

/xservices/trunk/src/main/java/net/brutex/xservices/util/EventEmitter.java
51,10 → 51,11
* Move event table data to snapshot
*/
 
Connection con = null;
Connection fcon = null;
try {
con = pool.getConnection();
 
try( Connection con = pool.getConnection();
Connection fcon = fpool.getConnection();
) {
 
con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
con.setAutoCommit(false);
Statement stmt = con.createStatement();
65,7 → 66,7
con.commit(); //all events moved from tbl_events to tbl_events_snap at this point
 
 
fcon = fpool.getConnection();
 
PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL);
 
PreparedStatement del = con.prepareStatement(deleteSQL);
83,7 → 84,8
long event_ts = rs.getLong(6);
boolean bretry = false;
 
SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getCharacterStream()));
//SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getAsciiStream()c.getCharacterStream()));
SimpleSoap ss = new SimpleSoap(url, id, c.getSubString(1L, (int) c.length()));
int retry = 0;
Reader response = null;
String rsp = "";
152,14 → 154,6
} catch (IOException e) {
log.error("Exception in SQL execution: {}", e.getMessage());
throw new RuntimeException(e);
} finally {
try {
if(fcon!=null) fcon.close();
if(con!=null) con.close();
} catch (SQLException e) {
log.error("Error closing the database connections: {}", e.getMessage());
throw new RuntimeException(e);
}
}
}
 
/xservices/trunk/src/main/java/net/brutex/xservices/util/EventLogCleanerJob.java
0,0 → 1,83
package net.brutex.xservices.util;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.h2.jdbcx.JdbcConnectionPool;
import org.quartz.*;
 
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.sql.*;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
 
@Slf4j
@DisallowConcurrentExecution
public class EventLogCleanerJob implements Job, InterruptableJob {
 
private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
 
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
final Instant d = Instant.now();
final long ts = d.toEpochMilli();
 
log.info("EventLogCleaner is executing now.");
final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
.get(EventmanagerConfiguration.KEY);
 
final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
final JdbcConnectionPool mpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
 
final String moveSQL = "INSERT INTO brutex.tbl_events_all DIRECT SELECT * FROM MEM_ALL_EVENTS " +
"where btx_timestamp < " + (ts-5000) + " ";
final String deleteTable = "DELETE FROM MEM_ALL_EVENTS where btx_timestamp < " + (ts-5000);
final String deleteMemTable = "DELETE FROM brutex.tbl_events_all where btx_timestamp < " + (ts-5000);
 
 
/**
* Move event table data to all events log
*/
 
 
try (Connection fcon = fpool.getConnection();
Connection mcon = mpool.getConnection()){
fcon.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
fcon.setAutoCommit(false);
Statement stmt = fcon.createStatement();
stmt.execute(moveSQL);
int count = stmt.getUpdateCount();
fcon.commit();
Statement mstm = mcon.createStatement();
 
 
mstm.execute(deleteMemTable);
int count2 = mstm.getUpdateCount();
mcon.commit();
 
log.info("EventLogCleaner moved '{}/ deleted {}' events into the persistence space.", count, count2);
 
} catch (SQLException e) {
log.error("Exception in SQL execution: {}", e.getMessage());
throw new JobExecutionException(e);
}
}
 
 
/**
* <p>
* Called by the <code>{@link Scheduler}</code> when a user
* interrupts the <code>Job</code>.
* </p>
*
* @throws UnableToInterruptJobException if there is an exception while interrupting the job.
*/
@Override
public synchronized void interrupt() throws UnableToInterruptJobException {
isInterrupted.set(true);
log.warn("EventLogCleaner received and interrupt.");
Thread.currentThread().interrupt();
}
}
/xservices/trunk/src/main/java/net/brutex/xservices/util/EventmanagerConfiguration.java
8,6 → 8,8
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
import org.apache.commons.configuration2.builder.PropertiesBuilderParametersImpl;
import org.apache.commons.configuration2.event.ConfigurationEvent;
import org.apache.commons.configuration2.event.EventListener;
import org.apache.commons.configuration2.ex.ConfigurationException;
 
import javax.servlet.ServletContext;
39,6 → 41,8
private int interval;
private String jdbc_memdb;
private String jdbc_filedb;
private boolean isEmitterActive = true;
private int cleaner_interval;
 
 
public synchronized EventmanagerConfiguration refreshConfig() {
45,7 → 49,7
log.trace("Reading EventmanagerConfiguration from file eventmanager.properties.");
FileBasedConfigurationBuilder<FileBasedConfiguration> builder =
new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class)
.configure(new PropertiesBuilderParametersImpl().setFileName("eventmanager.properties"));
.configure(new PropertiesBuilderParametersImpl().setFileName("../eventmanager.properties"));
 
try {
Configuration config = builder.getConfiguration();
55,6 → 59,8
this.interval = config.getInt("interval", 10);
this.jdbc_memdb = config.getString("memdb", "jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=-1;");
this.jdbc_filedb = config.getString("fdb", "jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=-1;");
this.isEmitterActive = config.getBoolean("emitter_active", true);
this.cleaner_interval = config.getInt("cleaner_interval", 5);
 
 
} catch (ConfigurationException e) {
/xservices/trunk/src/main/java/net/brutex/xservices/util/MiscServiceServletContextListener.java
15,6 → 15,8
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
 
import static org.quartz.TriggerBuilder.newTrigger;
48,12 → 50,12
*/
private static JdbcConnectionPool getDbPool(String dbConnectString) {
JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
p.setMaxConnections(16);
p.setLoginTimeout(5);
try {
Connection c = p.getConnection();
p.setMaxConnections(256);
p.setLoginTimeout(20);
try (Connection c = p.getConnection();){
Statement s = c.createStatement();
s.execute(dbinit);
c.commit();
log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
c.close();
log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
72,6 → 74,7
log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
 
JobKey key = JobKey.jobKey("ALFEmitter");
JobKey cleanerkey = JobKey.jobKey("EventLogCleaner");
synchronized (scheduler) {
if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
scheduler.interrupt(key);
78,6 → 81,11
scheduler.deleteJob(key);
log.info("Gracefully stopped the ALFEventEmitter job.");
}
if (!scheduler.isShutdown() && scheduler.checkExists(cleanerkey) ) {
scheduler.interrupt(cleanerkey);
scheduler.deleteJob(cleanerkey);
log.info("Gracefully stopped the ALFEventEmitter job.");
}
if (!scheduler.isShutdown()) {
scheduler.shutdown(true);
}
98,12 → 106,19
Connection con = fdbpool.getConnection();
Statement s = con.createStatement();
 
final String insert = "INSERT INTO brutex.tbl_events SELECT * from LINK UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from LINK2;";
final String insert = "INSERT INTO brutex.tbl_events SELECT * from MEM_INBOUND UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from MEM_OUTBOUND;";
s.execute(insert);
int count = s.getUpdateCount();
log.info("Persisted {} rows in file-based database.", count);
log.info("Persisted {} active event rows in file-based database.", count);
 
final String save_all = "INSERT INTO brutex.tbl_events_all SELECT * from MEM_ALL_EVENTS;";
s.execute(save_all);
count = s.getUpdateCount();
log.info("Persisted {} event rows from all_events log in file-based database", count);
 
log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
s.execute("SHUTDOWN;");
 
con.close();
log.info("Shutting down databases complete.");
} catch (SQLException e) {
141,9 → 156,8
//Load events from file based database into in-memory database
try {
log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
final String link = "CREATE LINKED TABLE IF NOT EXISTS LINK('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
"CREATE LINKED TABLE IF NOT EXISTS LINK2('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap');";
final String recoverSQL = "INSERT INTO LINK DIRECT SELECT * FROM brutex.tbl_events;";
final String link = getLinkSQL();
final String recoverSQL = "INSERT INTO MEM_INBOUND DIRECT SELECT * FROM brutex.tbl_events;";
final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
int count = 0;
Connection con = fdbpool.getConnection();
163,10 → 177,26
throw new RuntimeException(e);
}
//Start initial run of the emitter
startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
if(configuration.isEmitterActive()) {
startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
}
 
//Start mem db log cleaner
if(configuration.getCleaner_interval()>0) {
startEventLogCleaner((Scheduler) context.getAttribute("scheduler"));
}
}
 
private synchronized void startEmitterImmediate(AtomicLong egres_counter, Scheduler scheduler) {
private String getLinkSQL() {
final String dbDriverClass = "org.h2.Driver";
final String link = "CREATE LINKED TABLE IF NOT EXISTS MEM_INBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
"CREATE LINKED TABLE IF NOT EXISTS MEM_OUTBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap'); " +
"CREATE LINKED TABLE IF NOT EXISTS MEM_ALL_EVENTS('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_all') AUTOCOMMIT OFF; " +
"";
return link;
}
 
private synchronized void startEmitterImmediate(AtomicLong egres, Scheduler scheduler) {
try {
if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
173,7 → 203,7
job2.getJobDataMap().put("mdbConnection", mempool);
job2.getJobDataMap().put("fdbConnection", fdbpool);
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
job2.getJobDataMap().put("egres_counter", egres_counter);
job2.getJobDataMap().put("egres_counter", egres);
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
scheduler.scheduleJob(job2, t);
183,6 → 213,27
}
}
 
private void startEventLogCleaner(Scheduler scheduler) {
try {
if (!scheduler.checkExists(JobKey.jobKey("EventLogCleaner"))) {
JobDetail job2 = JobBuilder.newJob(EventLogCleanerJob.class).withIdentity("EventLogCleaner").build();
job2.getJobDataMap().put("mdbConnection", mempool);
job2.getJobDataMap().put("fdbConnection", fdbpool);
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("EventLogCleaner")
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInMinutes(configuration.getCleaner_interval())
.repeatForever())
.startAt(Date.from(Instant.now().plus(configuration.getCleaner_interval(), ChronoUnit.MINUTES)))
.build();
scheduler.scheduleJob(job2, t);
}
} catch (SchedulerException ex) {
log.error("Could not start EventLogCleaner to process existing queue directly after startup: {}", ex.getMessage());
}
}
 
private void readConfiguration(ServletContext ctx) {
/* Configure ServletContext attributes using configuration object*/
EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();
/xservices/trunk/src/main/java/net/brutex/xservices/util/SimpleSoap.java
22,11 → 22,14
import org.apache.http.client.entity.EntityBuilder;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import org.apache.http.entity.ContentType;
 
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
 
 
69,11 → 72,13
public Reader sendSoap(boolean isDropResponse) {
Reader response = null;
long start = System.currentTimeMillis();
EntityBuilder entitybuilder = EntityBuilder.create();
entitybuilder.setContentEncoding("UTF-8");
entitybuilder.setText(soapBody);
HttpEntity entity = entitybuilder.build();
 
HttpEntity entity = EntityBuilder.create()
.setText(soapBody)
.setContentType(ContentType.create("text/xml", StandardCharsets.UTF_8))
.setContentEncoding("UTF-8")
.build();
 
log.trace("Sending event '{}' to target ALF Event Manager.", id);
 
if(isInterrupted.get()) return null;
81,7 → 86,7
try {
Response resp = Request.Post(url)
.addHeader("Accept", "text/xml")
.addHeader("Content-Type", "text/xml; charset=utf-8")
//.addHeader("Content-Type", "text/xml; charset=utf-8")
.addHeader("SOAPAction", "")
.body(entity).execute();