201 |
brianR |
1 |
package net.brutex.xservices.util;
|
|
|
2 |
|
|
|
3 |
|
|
|
4 |
import lombok.extern.slf4j.Slf4j;
|
|
|
5 |
import org.h2.jdbcx.JdbcConnectionPool;
|
|
|
6 |
import org.quartz.*;
|
|
|
7 |
import org.quartz.impl.StdSchedulerFactory;
|
|
|
8 |
|
|
|
9 |
import javax.servlet.ServletContext;
|
|
|
10 |
import javax.servlet.ServletContextEvent;
|
|
|
11 |
import javax.servlet.ServletContextListener;
|
|
|
12 |
import javax.servlet.annotation.WebListener;
|
|
|
13 |
import java.sql.Connection;
|
|
|
14 |
import java.sql.ResultSet;
|
|
|
15 |
import java.sql.SQLException;
|
|
|
16 |
import java.sql.Statement;
|
|
|
17 |
import java.time.Instant;
|
|
|
18 |
import java.util.concurrent.atomic.AtomicLong;
|
|
|
19 |
|
|
|
20 |
import static org.quartz.TriggerBuilder.newTrigger;
|
|
|
21 |
|
|
|
22 |
//For Servlet container 3.x, you can annotate the listener with @WebListener, no need to declares in web.xml.
|
|
|
23 |
|
|
|
24 |
/**
|
|
|
25 |
* Handle servlet lifecycle actions for the MiscService servlet, such as
|
|
|
26 |
* initializing in-memory database, persist on shutdown etc.
|
|
|
27 |
*/
|
|
|
28 |
|
|
|
29 |
|
|
|
30 |
@WebListener
|
|
|
31 |
@Slf4j
|
|
|
32 |
public class MiscServiceServletContextListener implements ServletContextListener {
|
|
|
33 |
|
|
|
34 |
/**
|
|
|
35 |
* SQL initialization for in-memory database
|
|
|
36 |
* INIT=RUNSCRIPT FROM 'classpath:scripts/create.sql'"
|
|
|
37 |
*/
|
|
|
38 |
private final static String dbinit = "RUNSCRIPT FROM 'classpath:ddl/BRTX_schema.ddl';";
|
|
|
39 |
|
|
|
40 |
private final EventmanagerConfiguration configuration = EventmanagerConfiguration.getInstance().refreshConfig();
|
|
|
41 |
private final JdbcConnectionPool mempool = getDbPool(configuration.getJdbc_memdb());
|
|
|
42 |
private final JdbcConnectionPool fdbpool = getDbPool(configuration.getJdbc_filedb());
|
|
|
43 |
|
|
|
44 |
/**
|
|
|
45 |
* Create DB connection pool and initialize the in-memory database with schema.
|
|
|
46 |
*
|
|
|
47 |
* @return connection pool
|
|
|
48 |
*/
|
|
|
49 |
private static JdbcConnectionPool getDbPool(String dbConnectString) {
|
|
|
50 |
JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
|
|
|
51 |
p.setMaxConnections(16);
|
|
|
52 |
p.setLoginTimeout(5);
|
|
|
53 |
try {
|
|
|
54 |
Connection c = p.getConnection();
|
|
|
55 |
Statement s = c.createStatement();
|
|
|
56 |
s.execute(dbinit);
|
|
|
57 |
log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
|
|
|
58 |
c.close();
|
|
|
59 |
log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
|
|
|
60 |
} catch (SQLException e) {
|
|
|
61 |
log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage());
|
|
|
62 |
throw new RuntimeException(e);
|
|
|
63 |
}
|
|
|
64 |
return p;
|
|
|
65 |
}
|
|
|
66 |
|
|
|
67 |
@Override
|
|
|
68 |
public void contextDestroyed(ServletContextEvent arg0) {
|
|
|
69 |
log.trace("contextDestroyed called.");
|
|
|
70 |
try {
|
|
|
71 |
Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler");
|
|
|
72 |
log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
|
|
|
73 |
|
|
|
74 |
JobKey key = JobKey.jobKey("ALFEmitter");
|
|
|
75 |
synchronized (scheduler) {
|
|
|
76 |
if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
|
|
|
77 |
scheduler.interrupt(key);
|
|
|
78 |
scheduler.deleteJob(key);
|
|
|
79 |
log.info("Gracefully stopped the ALFEventEmitter job.");
|
|
|
80 |
}
|
|
|
81 |
if (!scheduler.isShutdown()) {
|
|
|
82 |
scheduler.shutdown(true);
|
|
|
83 |
}
|
|
|
84 |
}
|
|
|
85 |
} catch (SchedulerException e) {
|
|
|
86 |
log.error("Failed to stop the ALFEmitter job: {}", e.getMessage());
|
|
|
87 |
throw new RuntimeException(e);
|
|
|
88 |
}
|
|
|
89 |
|
|
|
90 |
log.info("ServletContextListener destroyed. Saving in-memory database to file based database.");
|
|
|
91 |
int act_i = mempool.getActiveConnections();
|
|
|
92 |
if (act_i > 0) {
|
|
|
93 |
log.warn("There are still {} connections to the XServices in-memory database active.", act_i);
|
|
|
94 |
}
|
|
|
95 |
|
|
|
96 |
try {
|
|
|
97 |
log.info("Create/Re-open file based database to persist memory database.");
|
|
|
98 |
Connection con = fdbpool.getConnection();
|
|
|
99 |
Statement s = con.createStatement();
|
|
|
100 |
|
|
|
101 |
final String insert = "INSERT INTO brutex.tbl_events SELECT * from LINK UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from LINK2;";
|
|
|
102 |
s.execute(insert);
|
|
|
103 |
int count = s.getUpdateCount();
|
|
|
104 |
log.info("Persisted {} rows in file-based database.", count);
|
|
|
105 |
log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
|
|
|
106 |
s.execute("SHUTDOWN;");
|
|
|
107 |
con.close();
|
|
|
108 |
log.info("Shutting down databases complete.");
|
|
|
109 |
} catch (SQLException e) {
|
|
|
110 |
log.error("An error occurred during database persistence: {}", e.getMessage());
|
|
|
111 |
throw new RuntimeException(e);
|
|
|
112 |
}
|
|
|
113 |
log.debug("Handled {} egress events.", arg0.getServletContext().getAttribute("egres_counter"));
|
|
|
114 |
log.debug("Handled {} ingress events.", arg0.getServletContext().getAttribute("ingres_counter"));
|
|
|
115 |
}
|
|
|
116 |
|
|
|
117 |
//Run this before web application is started
|
|
|
118 |
@Override
|
|
|
119 |
public void contextInitialized(ServletContextEvent arg0) {
|
|
|
120 |
log.debug("ServletContextListener started");
|
|
|
121 |
ServletContext context = arg0.getServletContext();
|
|
|
122 |
readConfiguration(context);
|
|
|
123 |
|
|
|
124 |
context.setAttribute("mdbConnection", mempool);
|
|
|
125 |
context.setAttribute("fdbConnection", fdbpool);
|
|
|
126 |
context.setAttribute("ingres_counter", 0);
|
|
|
127 |
AtomicLong egres = new AtomicLong(0);
|
|
|
128 |
context.setAttribute("egres_counter", egres);
|
|
|
129 |
context.setAttribute("ingres_counter", new AtomicLong(0));
|
|
|
130 |
try {
|
|
|
131 |
StdSchedulerFactory fact = new StdSchedulerFactory();
|
|
|
132 |
fact.initialize("MiscServicesScheduler-quartz.properties");
|
|
|
133 |
Scheduler scheduler = fact.getScheduler();
|
|
|
134 |
scheduler.start();
|
|
|
135 |
context.setAttribute("scheduler", scheduler);
|
|
|
136 |
} catch (SchedulerException e) {
|
|
|
137 |
log.error("Error creating scheduler within ServletContext: {}", e.getMessage());
|
|
|
138 |
throw new RuntimeException(e);
|
|
|
139 |
}
|
|
|
140 |
|
|
|
141 |
//Load events from file based database into in-memory database
|
|
|
142 |
try {
|
|
|
143 |
log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
|
|
|
144 |
final String link = "CREATE LINKED TABLE IF NOT EXISTS LINK('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
|
|
|
145 |
"CREATE LINKED TABLE IF NOT EXISTS LINK2('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap');";
|
|
|
146 |
final String recoverSQL = "INSERT INTO LINK DIRECT SELECT * FROM brutex.tbl_events;";
|
|
|
147 |
final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
|
|
|
148 |
int count = 0;
|
|
|
149 |
Connection con = fdbpool.getConnection();
|
|
|
150 |
con.setAutoCommit(false);
|
|
|
151 |
Statement statement = con.createStatement();
|
|
|
152 |
statement.execute(link);
|
|
|
153 |
con.commit();
|
|
|
154 |
ResultSet rs = statement.executeQuery("SELECT COUNT(1) FROM brutex.tbl_events");
|
|
|
155 |
if(rs.next()) count = rs.getInt(1);
|
|
|
156 |
statement.execute(recoverSQL);
|
|
|
157 |
log.info("Recovered {} events and loaded them into in-memory database.", count);
|
|
|
158 |
statement.execute(truncate);
|
|
|
159 |
con.commit();
|
|
|
160 |
con.close();
|
|
|
161 |
} catch (SQLException e) {
|
|
|
162 |
log.error("Exception during recovery of events from previous runs: {}", e.getMessage());
|
|
|
163 |
throw new RuntimeException(e);
|
|
|
164 |
}
|
|
|
165 |
//Start initial run of the emitter
|
|
|
166 |
startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
|
|
|
167 |
}
|
|
|
168 |
|
|
|
169 |
private synchronized void startEmitterImmediate(AtomicLong egres_counter, Scheduler scheduler) {
|
|
|
170 |
try {
|
|
|
171 |
if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
|
|
|
172 |
JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
|
|
|
173 |
job2.getJobDataMap().put("mdbConnection", mempool);
|
|
|
174 |
job2.getJobDataMap().put("fdbConnection", fdbpool);
|
|
|
175 |
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
|
|
|
176 |
job2.getJobDataMap().put("egres_counter", egres_counter);
|
|
|
177 |
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
|
|
|
178 |
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
|
|
|
179 |
scheduler.scheduleJob(job2, t);
|
|
|
180 |
}
|
|
|
181 |
} catch (SchedulerException ex) {
|
|
|
182 |
log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage());
|
|
|
183 |
}
|
|
|
184 |
}
|
|
|
185 |
|
|
|
186 |
private void readConfiguration(ServletContext ctx) {
|
|
|
187 |
/* Configure ServletContext attributes using configuration object*/
|
|
|
188 |
EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();
|
|
|
189 |
ctx.setAttribute(EventmanagerConfiguration.KEY, c);
|
|
|
190 |
}
|
|
|
191 |
|
|
|
192 |
}
|