Subversion Repositories XServices

Rev

Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | Download | RSS feed

package net.brutex.xservices.util;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.h2.jdbcx.JdbcConnectionPool;
import org.quartz.*;

import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.sql.*;
import java.time.Instant;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.quartz.TriggerBuilder.newTrigger;

@Slf4j
public class EventEmitter implements Job, InterruptableJob {

    private final AtomicBoolean isInterrupted = new AtomicBoolean(false);

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        final Instant d = Instant.now();
        final long ts = d.toEpochMilli();
        final EventmanagerConfiguration conf = (EventmanagerConfiguration) context.getMergedJobDataMap()
                .get(EventmanagerConfiguration.KEY);

        final String url = conf.getTargeturl();

        final JdbcConnectionPool pool = (JdbcConnectionPool) context.getMergedJobDataMap().get("mdbConnection");
        final JdbcConnectionPool fpool = (JdbcConnectionPool) context.getMergedJobDataMap().get("fdbConnection");
        final long run_key = context.getMergedJobDataMap().getLong("run_key");
        final AtomicLong egres_counter = (AtomicLong) context.getMergedJobDataMap().get("egres_counter");

        final String querySQL = "SELECT btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_type, btx_timestamp FROM brutex.tbl_events_snap ORDER BY btx_timestamp asc FOR UPDATE;";
        final String deleteSQL = "DELETE FROM brutex.tbl_events_snap where btx_id=?";
        final String deleteTable = "TRUNCATE TABLE brutex.tbl_events;";

        final String moveSQL = "INSERT INTO brutex.tbl_events_snap DIRECT SELECT " +
                                " btx_event_type, btx_id, btx_obj_type, btx_obj_id, btx_timestamp, ?, btx_event FROM brutex.tbl_events; ";


        final String moveErrorSQL = "MERGE INTO brutex.tbl_events_errors " +
                "KEY (btx_event_type, btx_obj_type, btx_obj_id) " +
                "VALUES (?,?,?,?,?,?,?,?);";

        /**
         * Move event table data to snapshot
         */

        Connection con = null;
        Connection fcon = null;
        try {
            con = pool.getConnection();
            con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
            con.setAutoCommit(false);
            Statement stmt = con.createStatement();
            PreparedStatement moveprep= con.prepareStatement(moveSQL);
            moveprep.setLong(1, run_key);
            moveprep.execute();
            stmt.execute(deleteTable);
            con.commit(); //all events moved from tbl_events to tbl_events_snap at this point


            fcon = fpool.getConnection();
            PreparedStatement errorPrepSql = fcon.prepareStatement(moveErrorSQL);

            PreparedStatement del = con.prepareStatement(deleteSQL);

            ResultSet rs = stmt.executeQuery(querySQL);


            while(rs.next() && !isInterrupted.get()) {
                /* btx_id, btx_event, btx_obj_id, btx_event_type, btx_obj_typ */
                String id = rs.getString(1);
                Clob c = rs.getClob(2);
                String obj_id = rs.getString(3);
                String event_type = rs.getString(4);
                String obj_type = rs.getString(5);
                long event_ts = rs.getLong(6);
                boolean bretry = false;

                SimpleSoap ss = new SimpleSoap( url, id, IOUtils.toString(c.getCharacterStream()));
                int retry = 0;
                Reader response = null;
                String rsp = "";
                boolean succeeded = false;
                while(retry < 3 && !succeeded && !isInterrupted.get()) {
                    retry++;
                        response = ss.sendSoap(false);
                        succeeded = true;
                        if(response!=null) {
                            rsp = IOUtils.toString(response);
                        }

                        if (rsp.contains("<soap:Fault") || rsp.contains("<soapenv:Fault")) { succeeded=false; bretry=false;};
                        if (! rsp.contains(":Envelope ")) { succeeded=false; bretry=true;};

                        if (succeeded) {
                            // Successfully send
                            del.setString(1, id);
                            del.execute();
                            con.commit();
                            egres_counter.incrementAndGet();
                            log.debug("Successfully sent event '{}' to target ALF Event Manager.", id);
                        } else {
                            // Error during sending
                            log.warn("Unable to send ALF Event '{}' to event manager. Will retry in 2 seconds. This is the {}. time.", id, retry);
                            try {
                                Thread.sleep(2000);
                            } catch (InterruptedException e) {
                                log.error("Interrupted while waiting to retry: {}", e.getMessage());
                            }
                        }
                }

                if(! succeeded) {
                    log.error("Failed to send ALF Event '{}' to the event manager. Giving up. " +
                            "Moving event back to the queue unless there is a superseding event already queued.", id);


                    try {
                        //this is in file-based db
                        errorPrepSql.setString(1, event_type);
                        errorPrepSql.setString(2, id);
                        errorPrepSql.setString(3, obj_type);
                        errorPrepSql.setString(4, obj_id);
                        errorPrepSql.setLong(5, event_ts);
                        errorPrepSql.setBoolean(6, bretry);
                        errorPrepSql.setClob(7, new StringReader(rsp) );
                        errorPrepSql.setClob(8, c);
                        errorPrepSql.execute();
                        fcon.commit();

                        //this is in-memory
                        del.setString(1, id);
                        del.execute();
                        con.commit();
                    }  catch (SQLException e) {
                        log.error("Exception in SQL execution during writing error events: {}", e.getMessage());
                    }
                }
            }


        } catch (SQLException e) {
            log.error("Exception in SQL execution: {}", e.getMessage());
            throw new JobExecutionException(e);
        } catch (IOException e) {
            log.error("Exception in SQL execution: {}", e.getMessage());
            throw new RuntimeException(e);
        } finally {
            try {
                if(fcon!=null) fcon.close();
                if(con!=null)  con.close();
            } catch (SQLException e) {
                    log.error("Error closing the database connections: {}", e.getMessage());
                    throw new RuntimeException(e);
            }
        }
    }


    /**
     * <p>
     * Called by the <code>{@link Scheduler}</code> when a user
     * interrupts the <code>Job</code>.
     * </p>
     *
     * @throws UnableToInterruptJobException if there is an exception while interrupting the job.
     */
    @Override
    public synchronized void interrupt() throws UnableToInterruptJobException {
        isInterrupted.set(true);
        log.warn("ALFEmitter received and interrupt.");
    }
}