0,0 → 1,192 |
package net.brutex.xservices.util; |
|
|
import lombok.extern.slf4j.Slf4j; |
import org.h2.jdbcx.JdbcConnectionPool; |
import org.quartz.*; |
import org.quartz.impl.StdSchedulerFactory; |
|
import javax.servlet.ServletContext; |
import javax.servlet.ServletContextEvent; |
import javax.servlet.ServletContextListener; |
import javax.servlet.annotation.WebListener; |
import java.sql.Connection; |
import java.sql.ResultSet; |
import java.sql.SQLException; |
import java.sql.Statement; |
import java.time.Instant; |
import java.util.concurrent.atomic.AtomicLong; |
|
import static org.quartz.TriggerBuilder.newTrigger; |
|
//For Servlet container 3.x, you can annotate the listener with @WebListener, no need to declares in web.xml. |
|
/** |
* Handle servlet lifecycle actions for the MiscService servlet, such as |
* initializing in-memory database, persist on shutdown etc. |
*/ |
|
|
@WebListener |
@Slf4j |
public class MiscServiceServletContextListener implements ServletContextListener { |
|
/** |
* SQL initialization for in-memory database |
* INIT=RUNSCRIPT FROM 'classpath:scripts/create.sql'" |
*/ |
private final static String dbinit = "RUNSCRIPT FROM 'classpath:ddl/BRTX_schema.ddl';"; |
|
private final EventmanagerConfiguration configuration = EventmanagerConfiguration.getInstance().refreshConfig(); |
private final JdbcConnectionPool mempool = getDbPool(configuration.getJdbc_memdb()); |
private final JdbcConnectionPool fdbpool = getDbPool(configuration.getJdbc_filedb()); |
|
/** |
* Create DB connection pool and initialize the in-memory database with schema. |
* |
* @return connection pool |
*/ |
private static JdbcConnectionPool getDbPool(String dbConnectString) { |
JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", ""); |
p.setMaxConnections(16); |
p.setLoginTimeout(5); |
try { |
Connection c = p.getConnection(); |
Statement s = c.createStatement(); |
s.execute(dbinit); |
log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit); |
c.close(); |
log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString); |
} catch (SQLException e) { |
log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage()); |
throw new RuntimeException(e); |
} |
return p; |
} |
|
@Override |
public void contextDestroyed(ServletContextEvent arg0) { |
log.trace("contextDestroyed called."); |
try { |
Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler"); |
log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs()); |
|
JobKey key = JobKey.jobKey("ALFEmitter"); |
synchronized (scheduler) { |
if (!scheduler.isShutdown() && scheduler.checkExists(key) ) { |
scheduler.interrupt(key); |
scheduler.deleteJob(key); |
log.info("Gracefully stopped the ALFEventEmitter job."); |
} |
if (!scheduler.isShutdown()) { |
scheduler.shutdown(true); |
} |
} |
} catch (SchedulerException e) { |
log.error("Failed to stop the ALFEmitter job: {}", e.getMessage()); |
throw new RuntimeException(e); |
} |
|
log.info("ServletContextListener destroyed. Saving in-memory database to file based database."); |
int act_i = mempool.getActiveConnections(); |
if (act_i > 0) { |
log.warn("There are still {} connections to the XServices in-memory database active.", act_i); |
} |
|
try { |
log.info("Create/Re-open file based database to persist memory database."); |
Connection con = fdbpool.getConnection(); |
Statement s = con.createStatement(); |
|
final String insert = "INSERT INTO brutex.tbl_events SELECT * from LINK UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from LINK2;"; |
s.execute(insert); |
int count = s.getUpdateCount(); |
log.info("Persisted {} rows in file-based database.", count); |
log.info("Shutting down in-memory database. Closing file-based database. Please wait ..."); |
s.execute("SHUTDOWN;"); |
con.close(); |
log.info("Shutting down databases complete."); |
} catch (SQLException e) { |
log.error("An error occurred during database persistence: {}", e.getMessage()); |
throw new RuntimeException(e); |
} |
log.debug("Handled {} egress events.", arg0.getServletContext().getAttribute("egres_counter")); |
log.debug("Handled {} ingress events.", arg0.getServletContext().getAttribute("ingres_counter")); |
} |
|
//Run this before web application is started |
@Override |
public void contextInitialized(ServletContextEvent arg0) { |
log.debug("ServletContextListener started"); |
ServletContext context = arg0.getServletContext(); |
readConfiguration(context); |
|
context.setAttribute("mdbConnection", mempool); |
context.setAttribute("fdbConnection", fdbpool); |
context.setAttribute("ingres_counter", 0); |
AtomicLong egres = new AtomicLong(0); |
context.setAttribute("egres_counter", egres); |
context.setAttribute("ingres_counter", new AtomicLong(0)); |
try { |
StdSchedulerFactory fact = new StdSchedulerFactory(); |
fact.initialize("MiscServicesScheduler-quartz.properties"); |
Scheduler scheduler = fact.getScheduler(); |
scheduler.start(); |
context.setAttribute("scheduler", scheduler); |
} catch (SchedulerException e) { |
log.error("Error creating scheduler within ServletContext: {}", e.getMessage()); |
throw new RuntimeException(e); |
} |
|
//Load events from file based database into in-memory database |
try { |
log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database."); |
final String link = "CREATE LINKED TABLE IF NOT EXISTS LINK('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " + |
"CREATE LINKED TABLE IF NOT EXISTS LINK2('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap');"; |
final String recoverSQL = "INSERT INTO LINK DIRECT SELECT * FROM brutex.tbl_events;"; |
final String truncate = "TRUNCATE TABLE brutex.tbl_events;"; |
int count = 0; |
Connection con = fdbpool.getConnection(); |
con.setAutoCommit(false); |
Statement statement = con.createStatement(); |
statement.execute(link); |
con.commit(); |
ResultSet rs = statement.executeQuery("SELECT COUNT(1) FROM brutex.tbl_events"); |
if(rs.next()) count = rs.getInt(1); |
statement.execute(recoverSQL); |
log.info("Recovered {} events and loaded them into in-memory database.", count); |
statement.execute(truncate); |
con.commit(); |
con.close(); |
} catch (SQLException e) { |
log.error("Exception during recovery of events from previous runs: {}", e.getMessage()); |
throw new RuntimeException(e); |
} |
//Start initial run of the emitter |
startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler")); |
} |
|
private synchronized void startEmitterImmediate(AtomicLong egres_counter, Scheduler scheduler) { |
try { |
if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) { |
JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build(); |
job2.getJobDataMap().put("mdbConnection", mempool); |
job2.getJobDataMap().put("fdbConnection", fdbpool); |
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli()); |
job2.getJobDataMap().put("egres_counter", egres_counter); |
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance()); |
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build(); |
scheduler.scheduleJob(job2, t); |
} |
} catch (SchedulerException ex) { |
log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage()); |
} |
} |
|
private void readConfiguration(ServletContext ctx) { |
/* Configure ServletContext attributes using configuration object*/ |
EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig(); |
ctx.setAttribute(EventmanagerConfiguration.KEY, c); |
} |
|
} |