//xservices/trunk/src/main/java/net/brutex/xservices/util/EventEmitter.java |
---|
0,0 → 1,180 |
package net.brutex.xservices.util; |
import lombok.extern.slf4j.Slf4j; |
import org.apache.commons.io.IOUtils; |
import org.h2.jdbcx.JdbcConnectionPool; |
import org.quartz.*; |
import java.io.IOException; |
import java.io.Reader; |
import java.io.StringReader; |
import java.sql.*; |
import java.time.Instant; |
import java.util.Date; |
import java.util.concurrent.atomic.AtomicBoolean; |
import java.util.concurrent.atomic.AtomicLong; |
import static org.quartz.TriggerBuilder.newTrigger; |
@Slf4j |
public class EventEmitter implements Job, InterruptableJob { |
private final AtomicBoolean isInterrupted = new AtomicBoolean(false); |
@Override |
public void execute(JobExecutionContext context) throws JobExecutionException { |
final Instant d = Instant.now(); |
final long ts = d.toEpochMilli(); |
final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap() |
.get(EventmanagerConfiguration.KEY); |
final String url = conf.getTargeturl(); |
final JdbcConnectionPool pool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection"); |
final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection"); |
final long run_key = context.getMergedJobDataMap().getLong("run_key"); |
final AtomicLong egres_counter = (AtomicLong) context.getMergedJobDataMap().get("egres_counter"); |
final String querySQL = "SELECT btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_type, btx_timestamp FROM brutex.tbl_events_snap ORDER BY btx_timestamp asc FOR UPDATE;"; |
final String deleteSQL = "DELETE FROM brutex.tbl_events_snap where btx_id=?"; |
final String deleteTable = "TRUNCATE TABLE brutex.tbl_events;"; |
final String moveSQL = "INSERT INTO brutex.tbl_events_snap DIRECT SELECT " + |
" btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, ?, btx_event FROM brutex.tbl_events; "; |
final String moveErrorSQL = "MERGE INTO brutex.tbl_events_errors " + |
"KEY (btx_event_type, btx_obj_type, btx_obj_id) " + |
"VALUES (?,?,?,?,?,?,?,?);"; |
/** |
* Move event table data to snapshot |
*/ |
Connection con = null; |
Connection fcon = null; |
try { |
con = pool.getConnection(); |
con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); |
con.setAutoCommit(false); |
Statement stmt = con.createStatement(); |
PreparedStatement moveprep= con.prepareStatement(moveSQL); |
moveprep.setLong(1, run_key); |
moveprep.execute(); |
stmt.execute(deleteTable); |
con.commit(); //all events moved from tbl_events to tbl_events_snap at this point |
fcon = fpool.getConnection(); |
PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL); |
PreparedStatement del = con.prepareStatement(deleteSQL); |
ResultSet rs = stmt.executeQuery(querySQL); |
while(rs.next() && !isInterrupted.get()) { |
/* btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_typ */ |
String id = rs.getString(1); |
Clob c = rs.getClob(2); |
String obj_id = rs.getString(3); |
String event_type = rs.getString(4); |
String obj_type = rs.getString(5); |
long event_ts = rs.getLong(6); |
boolean bretry = false; |
SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getCharacterStream())); |
int retry = 0; |
Reader response = null; |
String rsp = ""; |
boolean succeeded = false; |
while(retry < 3 && !succeeded && !isInterrupted.get()) { |
retry++; |
response = ss.sendSoap(false); |
succeeded = true; |
if(response!=null) { |
rsp = IOUtils.toString(response); |
} |
if (rsp.contains("<soap:Fault") || rsp.contains("<soapenv:Fault")) { succeeded=false; bretry=false;}; |
if (! rsp.contains(":Envelope ")) { succeeded=false; bretry=true;}; |
if (succeeded) { |
// Successfully send |
del.setString(1, id); |
del.execute(); |
con.commit(); |
egres_counter.incrementAndGet(); |
log.debug("Successfully sent event '{}' to target ALF Event Manager.", id); |
} else { |
// Error during sending |
log.warn("Unable to send ALF Event '{}' to event manager. Will retry in 2 seconds. This is the {}. time.", id, retry); |
try { |
Thread.sleep(2000); |
} catch (InterruptedException e) { |
log.error("Interrupted while waiting to retry: {}", e.getMessage()); |
} |
} |
} |
if(! succeeded) { |
log.error("Failed to send ALF Event '{}' to the event manager. Giving up. " + |
"Moving event back to the queue unless there is a superseding event already queued.", id); |
try { |
//this is in file-based db |
errorPrepSql.setString(1, event_type); |
errorPrepSql.setString(2, id); |
errorPrepSql.setString(3, obj_type); |
errorPrepSql.setString(4, obj_id); |
errorPrepSql.setLong(5, event_ts); |
errorPrepSql.setBoolean(6, bretry); |
errorPrepSql.setClob(7, new StringReader(rsp) ); |
errorPrepSql.setClob(8, c); |
errorPrepSql.execute(); |
fcon.commit(); |
//this is in-memory |
del.setString(1, id); |
del.execute(); |
con.commit(); |
} catch (SQLException e) { |
log.error("Exception in SQL execution during writing error events: {}", e.getMessage()); |
} |
} |
} |
} catch (SQLException e) { |
log.error("Exception in SQL execution: {}", e.getMessage()); |
throw new JobExecutionException(e); |
} catch (IOException e) { |
log.error("Exception in SQL execution: {}", e.getMessage()); |
throw new RuntimeException(e); |
} finally { |
try { |
if(fcon!=null) fcon.close(); |
if(con!=null) con.close(); |
} catch (SQLException e) { |
log.error("Error closing the database connections: {}", e.getMessage()); |
throw new RuntimeException(e); |
} |
} |
} |
/** |
* <p> |
* Called by the <code>{@link Scheduler}</code> when a user |
* interrupts the <code>Job</code>. |
* </p> |
* |
* @throws UnableToInterruptJobException if there is an exception while interrupting the job. |
*/ |
@Override |
public synchronized void interrupt() throws UnableToInterruptJobException { |
isInterrupted.set(true); |
log.warn("ALFEmitter received and interrupt."); |
} |
} |
//xservices/trunk/src/main/java/net/brutex/xservices/util/EventmanagerConfiguration.java |
---|
0,0 → 1,68 |
package net.brutex.xservices.util; |
import lombok.Data; |
import lombok.Singular; |
import lombok.extern.slf4j.Slf4j; |
import org.apache.commons.configuration2.Configuration; |
import org.apache.commons.configuration2.FileBasedConfiguration; |
import org.apache.commons.configuration2.PropertiesConfiguration; |
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder; |
import org.apache.commons.configuration2.builder.PropertiesBuilderParametersImpl; |
import org.apache.commons.configuration2.ex.ConfigurationException; |
import javax.servlet.ServletContext; |
/** |
* A configuration object for the MiscService -> Eventmanager. Implemented as singleton. |
* @author Brian Rosenberger, bru@brutex.de |
**/ |
@Data |
@Slf4j |
public class EventmanagerConfiguration { |
public static final String KEY = "net.brutex.xservices.EventmanagerConfiguration"; |
private static class InstanceHolder { |
public static final EventmanagerConfiguration instance = new EventmanagerConfiguration(); |
} |
private EventmanagerConfiguration() { |
refreshConfig(); |
} |
public static EventmanagerConfiguration getInstance() { |
return InstanceHolder.instance; |
} |
private String targeturl; |
private int interval; |
private String jdbc_memdb; |
private String jdbc_filedb; |
public synchronized EventmanagerConfiguration refreshConfig() { |
log.trace("Reading EventmanagerConfiguration from file eventmanager.properties."); |
FileBasedConfigurationBuilder<FileBasedConfiguration> builder = |
new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class) |
.configure(new PropertiesBuilderParametersImpl().setFileName("eventmanager.properties")); |
try { |
Configuration config = builder.getConfiguration(); |
/* Read from eventmanager.properties file */ |
this.targeturl = config.getString("target.url"); |
this.interval = config.getInt("interval", 10); |
this.jdbc_memdb = config.getString("memdb", "jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=-1;"); |
this.jdbc_filedb = config.getString("fdb", "jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=-1;"); |
} catch (ConfigurationException e) { |
log.error("Error loading configuration for event manager in XServices MiscServices: {}", e.getMessage()); |
throw new RuntimeException(e); |
} |
return this; |
} |
} |
//xservices/trunk/src/main/java/net/brutex/xservices/util/MiscServiceServletContextListener.java |
---|
0,0 → 1,192 |
package net.brutex.xservices.util; |
import lombok.extern.slf4j.Slf4j; |
import org.h2.jdbcx.JdbcConnectionPool; |
import org.quartz.*; |
import org.quartz.impl.StdSchedulerFactory; |
import javax.servlet.ServletContext; |
import javax.servlet.ServletContextEvent; |
import javax.servlet.ServletContextListener; |
import javax.servlet.annotation.WebListener; |
import java.sql.Connection; |
import java.sql.ResultSet; |
import java.sql.SQLException; |
import java.sql.Statement; |
import java.time.Instant; |
import java.util.concurrent.atomic.AtomicLong; |
import static org.quartz.TriggerBuilder.newTrigger; |
//For Servlet container 3.x, you can annotate the listener with @WebListener, no need to declares in web.xml. |
/** |
* Handle servlet lifecycle actions for the MiscService servlet, such as |
* initializing in-memory database, persist on shutdown etc. |
*/ |
@WebListener |
@Slf4j |
public class MiscServiceServletContextListener implements ServletContextListener { |
/** |
* SQL initialization for in-memory database |
* INIT=RUNSCRIPT FROM 'classpath:scripts/create.sql'" |
*/ |
private final static String dbinit = "RUNSCRIPT FROM 'classpath:ddl/BRTX_schema.ddl';"; |
private final EventmanagerConfiguration configuration = EventmanagerConfiguration.getInstance().refreshConfig(); |
private final JdbcConnectionPool mempool = getDbPool(configuration.getJdbc_memdb()); |
private final JdbcConnectionPool fdbpool = getDbPool(configuration.getJdbc_filedb()); |
/** |
* Create DB connection pool and initialize the in-memory database with schema. |
* |
* @return connection pool |
*/ |
private static JdbcConnectionPool getDbPool(String dbConnectString) { |
JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", ""); |
p.setMaxConnections(16); |
p.setLoginTimeout(5); |
try { |
Connection c = p.getConnection(); |
Statement s = c.createStatement(); |
s.execute(dbinit); |
log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit); |
c.close(); |
log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString); |
} catch (SQLException e) { |
log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage()); |
throw new RuntimeException(e); |
} |
return p; |
} |
@Override |
public void contextDestroyed(ServletContextEvent arg0) { |
log.trace("contextDestroyed called."); |
try { |
Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler"); |
log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs()); |
JobKey key = JobKey.jobKey("ALFEmitter"); |
synchronized (scheduler) { |
if (!scheduler.isShutdown() && scheduler.checkExists(key) ) { |
scheduler.interrupt(key); |
scheduler.deleteJob(key); |
log.info("Gracefully stopped the ALFEventEmitter job."); |
} |
if (!scheduler.isShutdown()) { |
scheduler.shutdown(true); |
} |
} |
} catch (SchedulerException e) { |
log.error("Failed to stop the ALFEmitter job: {}", e.getMessage()); |
throw new RuntimeException(e); |
} |
log.info("ServletContextListener destroyed. Saving in-memory database to file based database."); |
int act_i = mempool.getActiveConnections(); |
if (act_i > 0) { |
log.warn("There are still {} connections to the XServices in-memory database active.", act_i); |
} |
try { |
log.info("Create/Re-open file based database to persist memory database."); |
Connection con = fdbpool.getConnection(); |
Statement s = con.createStatement(); |
final String insert = "INSERT INTO brutex.tbl_events SELECT * from LINK UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from LINK2;"; |
s.execute(insert); |
int count = s.getUpdateCount(); |
log.info("Persisted {} rows in file-based database.", count); |
log.info("Shutting down in-memory database. Closing file-based database. Please wait ..."); |
s.execute("SHUTDOWN;"); |
con.close(); |
log.info("Shutting down databases complete."); |
} catch (SQLException e) { |
log.error("An error occurred during database persistence: {}", e.getMessage()); |
throw new RuntimeException(e); |
} |
log.debug("Handled {} egress events.", arg0.getServletContext().getAttribute("egres_counter")); |
log.debug("Handled {} ingress events.", arg0.getServletContext().getAttribute("ingres_counter")); |
} |
//Run this before web application is started |
@Override |
public void contextInitialized(ServletContextEvent arg0) { |
log.debug("ServletContextListener started"); |
ServletContext context = arg0.getServletContext(); |
readConfiguration(context); |
context.setAttribute("mdbConnection", mempool); |
context.setAttribute("fdbConnection", fdbpool); |
context.setAttribute("ingres_counter", 0); |
AtomicLong egres = new AtomicLong(0); |
context.setAttribute("egres_counter", egres); |
context.setAttribute("ingres_counter", new AtomicLong(0)); |
try { |
StdSchedulerFactory fact = new StdSchedulerFactory(); |
fact.initialize("MiscServicesScheduler-quartz.properties"); |
Scheduler scheduler = fact.getScheduler(); |
scheduler.start(); |
context.setAttribute("scheduler", scheduler); |
} catch (SchedulerException e) { |
log.error("Error creating scheduler within ServletContext: {}", e.getMessage()); |
throw new RuntimeException(e); |
} |
//Load events from file based database into in-memory database |
try { |
log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database."); |
final String link = "CREATE LINKED TABLE IF NOT EXISTS LINK('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " + |
"CREATE LINKED TABLE IF NOT EXISTS LINK2('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap');"; |
final String recoverSQL = "INSERT INTO LINK DIRECT SELECT * FROM brutex.tbl_events;"; |
final String truncate = "TRUNCATE TABLE brutex.tbl_events;"; |
int count = 0; |
Connection con = fdbpool.getConnection(); |
con.setAutoCommit(false); |
Statement statement = con.createStatement(); |
statement.execute(link); |
con.commit(); |
ResultSet rs = statement.executeQuery("SELECT COUNT(1) FROM brutex.tbl_events"); |
if(rs.next()) count = rs.getInt(1); |
statement.execute(recoverSQL); |
log.info("Recovered {} events and loaded them into in-memory database.", count); |
statement.execute(truncate); |
con.commit(); |
con.close(); |
} catch (SQLException e) { |
log.error("Exception during recovery of events from previous runs: {}", e.getMessage()); |
throw new RuntimeException(e); |
} |
//Start initial run of the emitter |
startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler")); |
} |
private synchronized void startEmitterImmediate(AtomicLong egres_counter, Scheduler scheduler) { |
try { |
if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) { |
JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build(); |
job2.getJobDataMap().put("mdbConnection", mempool); |
job2.getJobDataMap().put("fdbConnection", fdbpool); |
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli()); |
job2.getJobDataMap().put("egres_counter", egres_counter); |
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance()); |
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build(); |
scheduler.scheduleJob(job2, t); |
} |
} catch (SchedulerException ex) { |
log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage()); |
} |
} |
private void readConfiguration(ServletContext ctx) { |
/* Configure ServletContext attributes using configuration object*/ |
EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig(); |
ctx.setAttribute(EventmanagerConfiguration.KEY, c); |
} |
} |
//xservices/trunk/src/main/java/net/brutex/xservices/util/SimpleSoap.java |
---|
0,0 → 1,117 |
/* |
* Copyright 2013 Brian Rosenberger (Brutex Network) |
* |
* Licensed under the Apache License, Version 2.0 (the "License"); |
* you may not use this file except in compliance with the License. |
* You may obtain a copy of the License at |
* |
* http://www.apache.org/licenses/LICENSE-2.0 |
* |
* Unless required by applicable law or agreed to in writing, software |
* distributed under the License is distributed on an "AS IS" BASIS, |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
* See the License for the specific language governing permissions and |
* limitations under the License. |
*/ |
package net.brutex.xservices.util; |
import lombok.extern.slf4j.Slf4j; |
import org.apache.http.HttpEntity; |
import org.apache.http.client.ClientProtocolException; |
import org.apache.http.client.entity.EntityBuilder; |
import org.apache.http.client.fluent.Request; |
import org.apache.http.client.fluent.Response; |
import java.io.BufferedReader; |
import java.io.IOException; |
import java.io.InputStreamReader; |
import java.io.Reader; |
import java.util.concurrent.atomic.AtomicBoolean; |
/** |
* Construct a HTTP POST and send it. |
* |
* @author Brian Rosenberger, bru(at)brutex.de |
* @since 0.1 |
*/ |
@Slf4j |
public class SimpleSoap { |
private final String url; |
private final String soapBody; |
private final String id; |
private long duration = 0; |
final AtomicBoolean isInterrupted = new AtomicBoolean(false); |
/** |
* Instantiates a new simple http event. |
* |
* @param url the url |
* @param soapBody the soap body |
*/ |
public SimpleSoap(String url, String id, String soapBody) { |
this.url = url; |
this.id = id; |
this.soapBody = soapBody; |
} |
/** |
* Send soap. |
* |
* @param isDropResponse show interest in response or not |
* @throws ClientProtocolException the client protocol exception |
* @throws IOException Signals that an I/O exception has occurred. |
*/ |
public Reader sendSoap(boolean isDropResponse) { |
Reader response = null; |
long start = System.currentTimeMillis(); |
EntityBuilder entitybuilder = EntityBuilder.create(); |
entitybuilder.setContentEncoding("UTF-8"); |
entitybuilder.setText(soapBody); |
HttpEntity entity = entitybuilder.build(); |
log.trace("Sending event '{}' to target ALF Event Manager.", id); |
if(isInterrupted.get()) return null; |
try { |
Response resp = Request.Post(url) |
.addHeader("Accept", "text/xml") |
.addHeader("Content-Type", "text/xml; charset=utf-8") |
.addHeader("SOAPAction", "") |
.body(entity).execute(); |
if (!isDropResponse) { |
HttpEntity e = resp.returnResponse().getEntity(); |
response = new BufferedReader(new InputStreamReader(e.getContent())); |
/* |
StringBuilder sb = new StringBuilder(); |
BufferedReader in = new BufferedReader(new InputStreamReader(e.getContent())); |
String s; |
while ((s = in.readLine()) != null) { |
sb.append(s); |
} |
log.trace("Response: \n {}", sb.toString()); |
if (sb.toString().contains("<soap:Fault>")) { return false;}; |
if (! sb.toString().contains(":Envelope ")) { return false;}; |
*/ |
} else { |
log.debug("Response intentionally ignored."); |
} |
} catch (IOException e) { |
log.error("Error sending ALF Event '{}'. Got IOException: {}", id, e.getMessage()); |
} |
duration = System.currentTimeMillis() - start; |
return response; |
} |
public void interrupt() { |
this.isInterrupted.set(true); |
} |
} |
Property changes: |
Added: svn:mime-type |
+text/plain |
\ No newline at end of property |