16,6 → 16,7 |
|
package net.brutex.xservices.ws.impl; |
|
import lombok.extern.slf4j.Slf4j; |
import net.brutex.xservices.types.*; |
import net.brutex.xservices.types.alfevent.ALFEventResponseType; |
import net.brutex.xservices.types.alfevent.ALFEventType; |
22,6 → 23,7 |
import net.brutex.xservices.types.alfevent.ObjectFactory; |
import net.brutex.xservices.types.ant.FileSetResource; |
import net.brutex.xservices.util.EventEmitter; |
import net.brutex.xservices.util.EventmanagerConfiguration; |
import net.brutex.xservices.util.RunTask; |
import net.brutex.xservices.ws.MiscService; |
import net.brutex.xservices.ws.XServicesFault; |
46,11 → 48,11 |
import java.math.BigInteger; |
import java.sql.*; |
import java.time.Instant; |
import java.time.temporal.ChronoUnit; |
import java.util.Date; |
import java.util.Enumeration; |
import java.util.Properties; |
import java.util.UUID; |
import java.util.concurrent.atomic.AtomicLong; |
|
import static org.quartz.TriggerBuilder.newTrigger; |
|
60,6 → 62,7 |
* |
* @author Brian Rosenberger, bru@brutex.de |
*/ |
@Slf4j |
@WebService(targetNamespace="http://ws.xservices.brutex.net", endpointInterface="net.brutex.xservices.ws.MiscService", serviceName="MiscService") |
public class MiscServiceImpl |
implements MiscService { |
66,24 → 69,6 |
|
@Resource |
private WebServiceContext context; |
|
// Grab the Scheduler instance from the Factory |
private final Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); |
private final static String conStr = "jdbc:h2:mem:lockdb;DB_CLOSE_DELAY=-1;"; |
|
private final static String dbinit = "" + |
"CREATE SCHEMA IF NOT EXISTS brutex;" + |
"CREATE TABLE IF NOT EXISTS brutex.tbl_events (" + |
"btx_event_type VARCHAR(128) NOT NULL," + |
"btx_id VARCHAR(32) NOT NULL, " + |
"btx_obj_type VARCHAR(32) NOT NULL, " + |
"btx_obj_id VARCHAR(32) NOT NULL, " + |
"btx_timestamp BIGINT NOT NULL," + |
"btx_event CLOB" + |
");" + |
"CREATE INDEX IF NOT EXISTS brutex.btx_idx ON brutex.tbl_events (btx_obj_id, btx_obj_type, btx_event_type);" + |
"CREATE INDEX IF NOT EXISTS brutex.IDX_TO_DESC ON brutex.tbl_events (btx_timestamp ASC);"; |
|
public MiscServiceImpl() throws SchedulerException { |
} |
|
241,11 → 226,21 |
public ALFEventResponseType mergeALFEvent(ALFEventType event) throws XServicesFault { |
final Instant d = Instant.now(); |
final long ts = d.toEpochMilli(); |
//System.out.println("Step 1: " + ChronoUnit.MILLIS.between(Instant.now(), d)); |
|
//Get Parameters from the Servlet Context |
final ServletContext servletContext = |
(ServletContext) context.getMessageContext().get(MessageContext.SERVLET_CONTEXT); |
final JdbcConnectionPool pool = (JdbcConnectionPool) servletContext.getAttribute("dbConnection"); |
final EventmanagerConfiguration conf = (EventmanagerConfiguration) servletContext |
.getAttribute(EventmanagerConfiguration.KEY); |
|
final JdbcConnectionPool pool = (JdbcConnectionPool) servletContext.getAttribute("mdbConnection"); |
final JdbcConnectionPool fpool = (JdbcConnectionPool) servletContext.getAttribute("fdbConnection"); |
final AtomicLong egres_counter = (AtomicLong) servletContext.getAttribute("egres_counter"); |
final AtomicLong ingres_counter = (AtomicLong) servletContext.getAttribute("ingres_counter"); |
final Scheduler scheduler = (Scheduler) servletContext.getAttribute("scheduler"); |
|
log.trace("Read dbConnection from servlet context: {}", pool); |
|
//System.out.println("Step 2: " + ChronoUnit.MILLIS.between(Instant.now(), d)); |
final ObjectFactory of = new ObjectFactory(); |
|
253,16 → 248,15 |
final String eventId = event.getBase().getEventId(); |
final String objectType = event.getBase().getObjectType(); |
final String eventType = event.getBase().getEventType(); |
log.debug("Event id '{}', type '{}' received for object '{}' with object_id '{}'.", |
eventId, eventType, objectType, objectId); |
|
final String mergeStatememt = "MERGE INTO brutex.tbl_events " + |
"KEY (btx_event_type, btx_obj_type, btx_obj_id) " + |
"VALUES (?,?,?,?,?,?) " + |
""; |
"VALUES (?,?,?,?,?,?);"; |
|
long rows = 0L; |
//System.out.println("Step 3: " + ChronoUnit.MILLIS.between(Instant.now(), d)); |
|
try { |
//System.out.println("Step 4: " + ChronoUnit.MILLIS.between(Instant.now(), d)); |
Marshaller m = JAXBContext.newInstance(ALFEventType.class).createMarshaller(); |
m.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE); |
m.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE); |
270,7 → 264,8 |
StringWriter sw = new StringWriter(); |
m.marshal(e, sw); |
StringBuilder sb = new StringBuilder(); |
sb.append(" <soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:ns=\"http://www.eclipse.org/alf/schema/EventBase/1\">\n"); |
sb.append(" <soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" " + |
"xmlns:ns=\"http://www.eclipse.org/alf/schema/EventBase/1\">\n"); |
sb.append("<soapenv:Body>\n"); |
sb.append("<ns:EventNotice>\n"); |
sb.append(sw); |
277,9 → 272,8 |
sb.append("</ns:EventNotice>\n"); |
sb.append("</soapenv:Body>"); |
sb.append("</soapenv:Envelope>"); |
//System.out.println("Step 5: " + ChronoUnit.MILLIS.between(Instant.now(), d)); |
|
Connection con = pool.getConnection(); |
//System.out.println("Step 6: " + ChronoUnit.MILLIS.between(Instant.now(), d)); |
PreparedStatement prep = con.prepareStatement(mergeStatememt); |
prep.setString(1, eventType); |
prep.setString(2, eventId); |
287,40 → 281,34 |
prep.setString(4, objectId); |
prep.setLong(5, ts); |
prep.setClob(6, new StringReader(sb.toString())); |
//prep.setLong(7, ts-20000); |
//System.out.println("Step 7 SQL START: " + ChronoUnit.MILLIS.between(Instant.now(), d)); |
prep.execute(); |
con.commit(); |
con.close(); |
//System.out.println("Step 8 SQL END: " + ChronoUnit.MILLIS.between(Instant.now(), d)); |
//SimpleSoap ss = new SimpleSoap("http://localhost:8099/ALFEventManager/services/ALFEventManagerSOAP", sb.toString()); |
//ss.sendSoap(false); |
|
ingres_counter.incrementAndGet(); |
|
// and start it off |
|
if (!scheduler.isStarted()) |
scheduler.start(); |
if (scheduler.isInStandbyMode()) |
scheduler.resumeAll(); |
//System.out.println("Step 9: " + ChronoUnit.MILLIS.between(Instant.now(), d)); |
synchronized (scheduler) { |
if (!scheduler.checkExists(JobKey.jobKey("ALFEmitter"))) { |
JobDetail job2 = JobBuilder.newJob(EventEmitter.class) |
.withIdentity("ALFEmitter").build(); |
//job2.getJobDataMap().put("script", job.getScript()); |
//job2.getJobDataMap().put("description", job.getDescription()); |
//job2.getJobDataMap().put("date", job.getDate()); |
.withIdentity("ALFEmitter") |
.build(); |
job2.getJobDataMap().put("mdbConnection", pool); |
job2.getJobDataMap().put("fdbConnection", fpool); |
job2.getJobDataMap().put("run_key", ts); |
job2.getJobDataMap().put("egres_counter", egres_counter); |
job2.getJobDataMap().put("ingres_counter", ingres_counter); |
|
job2.getJobDataMap().put(EventmanagerConfiguration.KEY, conf); |
|
SimpleTrigger t = (SimpleTrigger) newTrigger() |
.withIdentity("ALFEmitter").startAt(Date.from(d.plusSeconds(20))) |
.withIdentity("ALFEmitter").startAt(Date.from(d.plusSeconds(conf.getInterval()))) |
.build(); |
|
scheduler.scheduleJob(job2, t); |
} |
} |
//System.out.println("Step 10: " + ChronoUnit.MILLIS.between(Instant.now(), d)); |
} catch (JAXBException | SQLException | SchedulerException e) { |
log.error(e.getMessage()); |
throw new XServicesFault(e); |
} |
return of.createALFEventResponseType(); |