201 |
brianR |
1 |
package net.brutex.xservices.util;
|
|
|
2 |
|
|
|
3 |
import lombok.extern.slf4j.Slf4j;
|
|
|
4 |
import org.apache.commons.io.IOUtils;
|
|
|
5 |
import org.h2.jdbcx.JdbcConnectionPool;
|
|
|
6 |
import org.quartz.*;
|
|
|
7 |
|
|
|
8 |
import java.io.IOException;
|
|
|
9 |
import java.io.Reader;
|
|
|
10 |
import java.io.StringReader;
|
|
|
11 |
import java.sql.*;
|
|
|
12 |
import java.time.Instant;
|
|
|
13 |
import java.util.Date;
|
|
|
14 |
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
15 |
import java.util.concurrent.atomic.AtomicLong;
|
|
|
16 |
|
|
|
17 |
import static org.quartz.TriggerBuilder.newTrigger;
|
|
|
18 |
|
|
|
19 |
@Slf4j
|
|
|
20 |
public class EventEmitter implements Job, InterruptableJob {
|
|
|
21 |
|
|
|
22 |
private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
|
|
|
23 |
|
|
|
24 |
@Override
|
|
|
25 |
public void execute(JobExecutionContext context) throws JobExecutionException {
|
|
|
26 |
final Instant d = Instant.now();
|
|
|
27 |
final long ts = d.toEpochMilli();
|
|
|
28 |
final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
|
|
|
29 |
.get(EventmanagerConfiguration.KEY);
|
|
|
30 |
|
|
|
31 |
final String url = conf.getTargeturl();
|
|
|
32 |
|
|
|
33 |
final JdbcConnectionPool pool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
|
|
|
34 |
final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
|
|
|
35 |
final long run_key = context.getMergedJobDataMap().getLong("run_key");
|
|
|
36 |
final AtomicLong egres_counter = (AtomicLong) context.getMergedJobDataMap().get("egres_counter");
|
|
|
37 |
|
|
|
38 |
final String querySQL = "SELECT btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_type, btx_timestamp FROM brutex.tbl_events_snap ORDER BY btx_timestamp asc FOR UPDATE;";
|
|
|
39 |
final String deleteSQL = "DELETE FROM brutex.tbl_events_snap where btx_id=?";
|
|
|
40 |
final String deleteTable = "TRUNCATE TABLE brutex.tbl_events;";
|
|
|
41 |
|
|
|
42 |
final String moveSQL = "INSERT INTO brutex.tbl_events_snap DIRECT SELECT " +
|
|
|
43 |
" btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, ?, btx_event FROM brutex.tbl_events; ";
|
|
|
44 |
|
|
|
45 |
|
|
|
46 |
final String moveErrorSQL = "MERGE INTO brutex.tbl_events_errors " +
|
|
|
47 |
"KEY (btx_event_type, btx_obj_type, btx_obj_id) " +
|
|
|
48 |
"VALUES (?,?,?,?,?,?,?,?);";
|
|
|
49 |
|
|
|
50 |
/**
|
|
|
51 |
* Move event table data to snapshot
|
|
|
52 |
*/
|
|
|
53 |
|
|
|
54 |
Connection con = null;
|
|
|
55 |
Connection fcon = null;
|
|
|
56 |
try {
|
|
|
57 |
con = pool.getConnection();
|
|
|
58 |
con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
|
|
|
59 |
con.setAutoCommit(false);
|
|
|
60 |
Statement stmt = con.createStatement();
|
|
|
61 |
PreparedStatement moveprep= con.prepareStatement(moveSQL);
|
|
|
62 |
moveprep.setLong(1, run_key);
|
|
|
63 |
moveprep.execute();
|
|
|
64 |
stmt.execute(deleteTable);
|
|
|
65 |
con.commit(); //all events moved from tbl_events to tbl_events_snap at this point
|
|
|
66 |
|
|
|
67 |
|
|
|
68 |
fcon = fpool.getConnection();
|
|
|
69 |
PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL);
|
|
|
70 |
|
|
|
71 |
PreparedStatement del = con.prepareStatement(deleteSQL);
|
|
|
72 |
|
|
|
73 |
ResultSet rs = stmt.executeQuery(querySQL);
|
|
|
74 |
|
|
|
75 |
|
|
|
76 |
while(rs.next() && !isInterrupted.get()) {
|
|
|
77 |
/* btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_typ */
|
|
|
78 |
String id = rs.getString(1);
|
|
|
79 |
Clob c = rs.getClob(2);
|
|
|
80 |
String obj_id = rs.getString(3);
|
|
|
81 |
String event_type = rs.getString(4);
|
|
|
82 |
String obj_type = rs.getString(5);
|
|
|
83 |
long event_ts = rs.getLong(6);
|
|
|
84 |
boolean bretry = false;
|
|
|
85 |
|
|
|
86 |
SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getCharacterStream()));
|
|
|
87 |
int retry = 0;
|
|
|
88 |
Reader response = null;
|
|
|
89 |
String rsp = "";
|
|
|
90 |
boolean succeeded = false;
|
|
|
91 |
while(retry < 3 && !succeeded && !isInterrupted.get()) {
|
|
|
92 |
retry++;
|
|
|
93 |
response = ss.sendSoap(false);
|
|
|
94 |
succeeded = true;
|
|
|
95 |
if(response!=null) {
|
|
|
96 |
rsp = IOUtils.toString(response);
|
|
|
97 |
}
|
|
|
98 |
|
|
|
99 |
if (rsp.contains("<soap:Fault") || rsp.contains("<soapenv:Fault")) { succeeded=false; bretry=false;};
|
|
|
100 |
if (! rsp.contains(":Envelope ")) { succeeded=false; bretry=true;};
|
|
|
101 |
|
|
|
102 |
if (succeeded) {
|
|
|
103 |
// Successfully send
|
|
|
104 |
del.setString(1, id);
|
|
|
105 |
del.execute();
|
|
|
106 |
con.commit();
|
|
|
107 |
egres_counter.incrementAndGet();
|
|
|
108 |
log.debug("Successfully sent event '{}' to target ALF Event Manager.", id);
|
|
|
109 |
} else {
|
|
|
110 |
// Error during sending
|
|
|
111 |
log.warn("Unable to send ALF Event '{}' to event manager. Will retry in 2 seconds. This is the {}. time.", id, retry);
|
|
|
112 |
try {
|
|
|
113 |
Thread.sleep(2000);
|
|
|
114 |
} catch (InterruptedException e) {
|
|
|
115 |
log.error("Interrupted while waiting to retry: {}", e.getMessage());
|
|
|
116 |
}
|
|
|
117 |
}
|
|
|
118 |
}
|
|
|
119 |
|
|
|
120 |
if(! succeeded) {
|
|
|
121 |
log.error("Failed to send ALF Event '{}' to the event manager. Giving up. " +
|
|
|
122 |
"Moving event back to the queue unless there is a superseding event already queued.", id);
|
|
|
123 |
|
|
|
124 |
|
|
|
125 |
try {
|
|
|
126 |
//this is in file-based db
|
|
|
127 |
errorPrepSql.setString(1, event_type);
|
|
|
128 |
errorPrepSql.setString(2, id);
|
|
|
129 |
errorPrepSql.setString(3, obj_type);
|
|
|
130 |
errorPrepSql.setString(4, obj_id);
|
|
|
131 |
errorPrepSql.setLong(5, event_ts);
|
|
|
132 |
errorPrepSql.setBoolean(6, bretry);
|
|
|
133 |
errorPrepSql.setClob(7, new StringReader(rsp) );
|
|
|
134 |
errorPrepSql.setClob(8, c);
|
|
|
135 |
errorPrepSql.execute();
|
|
|
136 |
fcon.commit();
|
|
|
137 |
|
|
|
138 |
//this is in-memory
|
|
|
139 |
del.setString(1, id);
|
|
|
140 |
del.execute();
|
|
|
141 |
con.commit();
|
|
|
142 |
} catch (SQLException e) {
|
|
|
143 |
log.error("Exception in SQL execution during writing error events: {}", e.getMessage());
|
|
|
144 |
}
|
|
|
145 |
}
|
|
|
146 |
}
|
|
|
147 |
|
|
|
148 |
|
|
|
149 |
} catch (SQLException e) {
|
|
|
150 |
log.error("Exception in SQL execution: {}", e.getMessage());
|
|
|
151 |
throw new JobExecutionException(e);
|
|
|
152 |
} catch (IOException e) {
|
|
|
153 |
log.error("Exception in SQL execution: {}", e.getMessage());
|
|
|
154 |
throw new RuntimeException(e);
|
|
|
155 |
} finally {
|
|
|
156 |
try {
|
|
|
157 |
if(fcon!=null) fcon.close();
|
|
|
158 |
if(con!=null) con.close();
|
|
|
159 |
} catch (SQLException e) {
|
|
|
160 |
log.error("Error closing the database connections: {}", e.getMessage());
|
|
|
161 |
throw new RuntimeException(e);
|
|
|
162 |
}
|
|
|
163 |
}
|
|
|
164 |
}
|
|
|
165 |
|
|
|
166 |
|
|
|
167 |
/**
|
|
|
168 |
* <p>
|
|
|
169 |
* Called by the <code>{@link Scheduler}</code> when a user
|
|
|
170 |
* interrupts the <code>Job</code>.
|
|
|
171 |
* </p>
|
|
|
172 |
*
|
|
|
173 |
* @throws UnableToInterruptJobException if there is an exception while interrupting the job.
|
|
|
174 |
*/
|
|
|
175 |
@Override
|
|
|
176 |
public synchronized void interrupt() throws UnableToInterruptJobException {
|
|
|
177 |
isInterrupted.set(true);
|
|
|
178 |
log.warn("ALFEmitter received and interrupt.");
|
|
|
179 |
}
|
|
|
180 |
}
|