Line 13... |
Line 13... |
13 |
import java.sql.Connection;
|
13 |
import java.sql.Connection;
|
14 |
import java.sql.ResultSet;
|
14 |
import java.sql.ResultSet;
|
15 |
import java.sql.SQLException;
|
15 |
import java.sql.SQLException;
|
16 |
import java.sql.Statement;
|
16 |
import java.sql.Statement;
|
17 |
import java.time.Instant;
|
17 |
import java.time.Instant;
|
- |
|
18 |
import java.time.temporal.ChronoUnit;
|
- |
|
19 |
import java.util.Date;
|
18 |
import java.util.concurrent.atomic.AtomicLong;
|
20 |
import java.util.concurrent.atomic.AtomicLong;
|
Line 19... |
Line 21... |
19 |
|
21 |
|
Line 20... |
Line 22... |
20 |
import static org.quartz.TriggerBuilder.newTrigger;
|
22 |
import static org.quartz.TriggerBuilder.newTrigger;
|
Line 46... |
Line 48... |
46 |
*
|
48 |
*
|
47 |
* @return connection pool
|
49 |
* @return connection pool
|
48 |
*/
|
50 |
*/
|
49 |
private static JdbcConnectionPool getDbPool(String dbConnectString) {
|
51 |
private static JdbcConnectionPool getDbPool(String dbConnectString) {
|
50 |
JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
|
52 |
JdbcConnectionPool p = JdbcConnectionPool.create(dbConnectString, "", "");
|
51 |
p.setMaxConnections(16);
|
53 |
p.setMaxConnections(256);
|
52 |
p.setLoginTimeout(5);
|
54 |
p.setLoginTimeout(20);
|
53 |
try {
|
- |
|
54 |
Connection c = p.getConnection();
|
55 |
try (Connection c = p.getConnection();){
|
55 |
Statement s = c.createStatement();
|
56 |
Statement s = c.createStatement();
|
56 |
s.execute(dbinit);
|
57 |
s.execute(dbinit);
|
- |
|
58 |
c.commit();
|
57 |
log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
|
59 |
log.trace("Running SQL against database '{}': '{}'", dbConnectString, dbinit);
|
58 |
c.close();
|
60 |
c.close();
|
59 |
log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
|
61 |
log.debug("Successfully created schema for database 'Brutex' at '{}'.", dbConnectString);
|
60 |
} catch (SQLException e) {
|
62 |
} catch (SQLException e) {
|
61 |
log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage());
|
63 |
log.error("Error creating the schema for database 'Brutex' using '{}': {}", dbConnectString, e.getMessage());
|
Line 70... |
Line 72... |
70 |
try {
|
72 |
try {
|
71 |
Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler");
|
73 |
Scheduler scheduler = (Scheduler) arg0.getServletContext().getAttribute("scheduler");
|
72 |
log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
|
74 |
log.debug("Active jobs to be terminated: {}", scheduler.getCurrentlyExecutingJobs());
|
Line 73... |
Line 75... |
73 |
|
75 |
|
- |
|
76 |
JobKey key = JobKey.jobKey("ALFEmitter");
|
74 |
JobKey key = JobKey.jobKey("ALFEmitter");
|
77 |
JobKey cleanerkey = JobKey.jobKey("EventLogCleaner");
|
75 |
synchronized (scheduler) {
|
78 |
synchronized (scheduler) {
|
76 |
if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
|
79 |
if (!scheduler.isShutdown() && scheduler.checkExists(key) ) {
|
77 |
scheduler.interrupt(key);
|
80 |
scheduler.interrupt(key);
|
78 |
scheduler.deleteJob(key);
|
81 |
scheduler.deleteJob(key);
|
79 |
log.info("Gracefully stopped the ALFEventEmitter job.");
|
82 |
log.info("Gracefully stopped the ALFEventEmitter job.");
|
- |
|
83 |
}
|
- |
|
84 |
if (!scheduler.isShutdown() && scheduler.checkExists(cleanerkey) ) {
|
- |
|
85 |
scheduler.interrupt(cleanerkey);
|
- |
|
86 |
scheduler.deleteJob(cleanerkey);
|
- |
|
87 |
log.info("Gracefully stopped the ALFEventEmitter job.");
|
80 |
}
|
88 |
}
|
81 |
if (!scheduler.isShutdown()) {
|
89 |
if (!scheduler.isShutdown()) {
|
82 |
scheduler.shutdown(true);
|
90 |
scheduler.shutdown(true);
|
83 |
}
|
91 |
}
|
84 |
}
|
92 |
}
|
Line 96... |
Line 104... |
96 |
try {
|
104 |
try {
|
97 |
log.info("Create/Re-open file based database to persist memory database.");
|
105 |
log.info("Create/Re-open file based database to persist memory database.");
|
98 |
Connection con = fdbpool.getConnection();
|
106 |
Connection con = fdbpool.getConnection();
|
99 |
Statement s = con.createStatement();
|
107 |
Statement s = con.createStatement();
|
Line 100... |
Line 108... |
100 |
|
108 |
|
101 |
final String insert = "INSERT INTO brutex.tbl_events SELECT * from LINK UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from LINK2;";
|
109 |
final String insert = "INSERT INTO brutex.tbl_events SELECT * from MEM_INBOUND UNION SELECT " + "btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, btx_event from MEM_OUTBOUND;";
|
102 |
s.execute(insert);
|
110 |
s.execute(insert);
|
103 |
int count = s.getUpdateCount();
|
111 |
int count = s.getUpdateCount();
|
- |
|
112 |
log.info("Persisted {} active event rows in file-based database.", count);
|
- |
|
113 |
|
- |
|
114 |
final String save_all = "INSERT INTO brutex.tbl_events_all SELECT * from MEM_ALL_EVENTS;";
|
- |
|
115 |
s.execute(save_all);
|
- |
|
116 |
count = s.getUpdateCount();
|
- |
|
117 |
log.info("Persisted {} event rows from all_events log in file-based database", count);
|
104 |
log.info("Persisted {} rows in file-based database.", count);
|
118 |
|
105 |
log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
|
119 |
log.info("Shutting down in-memory database. Closing file-based database. Please wait ...");
|
- |
|
120 |
s.execute("SHUTDOWN;");
|
106 |
s.execute("SHUTDOWN;");
|
121 |
|
107 |
con.close();
|
122 |
con.close();
|
108 |
log.info("Shutting down databases complete.");
|
123 |
log.info("Shutting down databases complete.");
|
109 |
} catch (SQLException e) {
|
124 |
} catch (SQLException e) {
|
110 |
log.error("An error occurred during database persistence: {}", e.getMessage());
|
125 |
log.error("An error occurred during database persistence: {}", e.getMessage());
|
Line 139... |
Line 154... |
139 |
}
|
154 |
}
|
Line 140... |
Line 155... |
140 |
|
155 |
|
141 |
//Load events from file based database into in-memory database
|
156 |
//Load events from file based database into in-memory database
|
142 |
try {
|
157 |
try {
|
143 |
log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
|
158 |
log.info("Start recovery of previously unsend alf events. Trying to load them into in-memory database.");
|
144 |
final String link = "CREATE LINKED TABLE IF NOT EXISTS LINK('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
|
- |
|
145 |
"CREATE LINKED TABLE IF NOT EXISTS LINK2('org.h2.Driver', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap');";
|
159 |
final String link = getLinkSQL();
|
146 |
final String recoverSQL = "INSERT INTO LINK DIRECT SELECT * FROM brutex.tbl_events;";
|
160 |
final String recoverSQL = "INSERT INTO MEM_INBOUND DIRECT SELECT * FROM brutex.tbl_events;";
|
147 |
final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
|
161 |
final String truncate = "TRUNCATE TABLE brutex.tbl_events;";
|
148 |
int count = 0;
|
162 |
int count = 0;
|
149 |
Connection con = fdbpool.getConnection();
|
163 |
Connection con = fdbpool.getConnection();
|
150 |
con.setAutoCommit(false);
|
164 |
con.setAutoCommit(false);
|
Line 161... |
Line 175... |
161 |
} catch (SQLException e) {
|
175 |
} catch (SQLException e) {
|
162 |
log.error("Exception during recovery of events from previous runs: {}", e.getMessage());
|
176 |
log.error("Exception during recovery of events from previous runs: {}", e.getMessage());
|
163 |
throw new RuntimeException(e);
|
177 |
throw new RuntimeException(e);
|
164 |
}
|
178 |
}
|
165 |
//Start initial run of the emitter
|
179 |
//Start initial run of the emitter
|
- |
|
180 |
if(configuration.isEmitterActive()) {
|
166 |
startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
|
181 |
startEmitterImmediate(egres, (Scheduler) context.getAttribute("scheduler"));
|
- |
|
182 |
}
|
- |
|
183 |
|
- |
|
184 |
//Start mem db log cleaner
|
- |
|
185 |
if(configuration.getCleaner_interval()>0) {
|
- |
|
186 |
startEventLogCleaner((Scheduler) context.getAttribute("scheduler"));
|
- |
|
187 |
}
|
- |
|
188 |
}
|
- |
|
189 |
|
- |
|
190 |
private String getLinkSQL() {
|
- |
|
191 |
final String dbDriverClass = "org.h2.Driver";
|
- |
|
192 |
final String link = "CREATE LINKED TABLE IF NOT EXISTS MEM_INBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events'); " +
|
- |
|
193 |
"CREATE LINKED TABLE IF NOT EXISTS MEM_OUTBOUND('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_snap'); " +
|
- |
|
194 |
"CREATE LINKED TABLE IF NOT EXISTS MEM_ALL_EVENTS('"+dbDriverClass+"', '"+configuration.getJdbc_memdb()+"', '', '', 'brutex.tbl_events_all') AUTOCOMMIT OFF; " +
|
- |
|
195 |
"";
|
- |
|
196 |
return link;
|
167 |
}
|
197 |
}
|
Line 168... |
Line 198... |
168 |
|
198 |
|
169 |
private synchronized void startEmitterImmediate(AtomicLong egres_counter, Scheduler scheduler) {
|
199 |
private synchronized void startEmitterImmediate(AtomicLong egres, Scheduler scheduler) {
|
170 |
try {
|
200 |
try {
|
171 |
if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
|
201 |
if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) {
|
172 |
JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
|
202 |
JobDetail job2 = JobBuilder.newJob(EventEmitter.class).withIdentity("ALFEmitter").build();
|
173 |
job2.getJobDataMap().put("mdbConnection", mempool);
|
203 |
job2.getJobDataMap().put("mdbConnection", mempool);
|
174 |
job2.getJobDataMap().put("fdbConnection", fdbpool);
|
204 |
job2.getJobDataMap().put("fdbConnection", fdbpool);
|
175 |
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
|
205 |
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
|
176 |
job2.getJobDataMap().put("egres_counter", egres_counter);
|
206 |
job2.getJobDataMap().put("egres_counter", egres);
|
177 |
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
|
207 |
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
|
178 |
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
|
208 |
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("ALFEmitter").startNow().build();
|
179 |
scheduler.scheduleJob(job2, t);
|
209 |
scheduler.scheduleJob(job2, t);
|
180 |
}
|
210 |
}
|
181 |
} catch (SchedulerException ex) {
|
211 |
} catch (SchedulerException ex) {
|
182 |
log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage());
|
212 |
log.error("Could not start EventEmitter to process existing queue directly after startup: {}", ex.getMessage());
|
183 |
}
|
213 |
}
|
Line -... |
Line 214... |
- |
|
214 |
}
|
- |
|
215 |
|
- |
|
216 |
private void startEventLogCleaner(Scheduler scheduler) {
|
- |
|
217 |
try {
|
- |
|
218 |
if (!scheduler.checkExists(JobKey.jobKey("EventLogCleaner"))) {
|
- |
|
219 |
JobDetail job2 = JobBuilder.newJob(EventLogCleanerJob.class).withIdentity("EventLogCleaner").build();
|
- |
|
220 |
job2.getJobDataMap().put("mdbConnection", mempool);
|
- |
|
221 |
job2.getJobDataMap().put("fdbConnection", fdbpool);
|
- |
|
222 |
job2.getJobDataMap().put("run_key", Instant.now().toEpochMilli());
|
- |
|
223 |
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, EventmanagerConfiguration.getInstance());
|
- |
|
224 |
SimpleTrigger t = (SimpleTrigger) newTrigger().withIdentity("EventLogCleaner")
|
- |
|
225 |
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
|
- |
|
226 |
.withIntervalInMinutes(configuration.getCleaner_interval())
|
- |
|
227 |
.repeatForever())
|
- |
|
228 |
.startAt(Date.from(Instant.now().plus(configuration.getCleaner_interval(), ChronoUnit.MINUTES)))
|
- |
|
229 |
.build();
|
- |
|
230 |
scheduler.scheduleJob(job2, t);
|
- |
|
231 |
}
|
- |
|
232 |
} catch (SchedulerException ex) {
|
- |
|
233 |
log.error("Could not start EventLogCleaner to process existing queue directly after startup: {}", ex.getMessage());
|
- |
|
234 |
}
|
184 |
}
|
235 |
}
|
185 |
|
236 |
|
186 |
private void readConfiguration(ServletContext ctx) {
|
237 |
private void readConfiguration(ServletContext ctx) {
|
187 |
/* Configure ServletContext attributes using configuration object*/
|
238 |
/* Configure ServletContext attributes using configuration object*/
|
188 |
EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();
|
239 |
EventmanagerConfiguration c = EventmanagerConfiguration.getInstance().refreshConfig();
|