package de.hottis.measurementDatabaseEngine; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Properties; import java.util.Timer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import de.hottis.common.ITriggerable; import de.hottis.common.MyQueue; import de.hottis.common.TriggerTimer; import de.hottis.smarthomelib.ADataObject; public class DatabaseEngine extends Thread implements ITriggerable { static final String DATABASE_ENGINE_PERIOD_PROP = "db.period"; static final String DATABASE_URL_PROP = "db.url"; static final String DATABASE_USER_PROP = "db.user"; static final String DATABASE_PASSWORD_PROP = "db.password"; static final String DATABASE_DRIVER_PROP = "db.driver"; final protected Logger logger = LogManager.getRootLogger(); private Properties config; private MyQueue queue; private boolean stop; private boolean triggerFlag; private Timer timer; private int period; private String dbUrl; private String dbUsername; private String dbPassword; public DatabaseEngine(Properties config, MyQueue queue) { super("MeasurementDatabaseEngine.DatabaseEngine"); this.config = config; this.queue = queue; this.stop = false; this.triggerFlag = false; this.period = Integer.parseInt(this.config.getProperty(DATABASE_ENGINE_PERIOD_PROP)); this.dbUrl = this.config.getProperty(DATABASE_URL_PROP); this.dbUsername = this.config.getProperty(DATABASE_USER_PROP); this.dbPassword = this.config.getProperty(DATABASE_PASSWORD_PROP); } public void requestShutdown() { logger.info("Shutdown of database engine requested"); this.stop = true; try { this.join(); logger.info("Database engine is down"); } catch (InterruptedException e) { logger.error("Waiting for shutdown of database engine interrupted"); } } public synchronized void trigger() { logger.debug("DatabaseEngine triggered"); triggerFlag = true; notify(); } public void init() throws MeasurementDatabaseEngineException { timer = new Timer("DatabaseEngineTrigger"); timer.schedule(new TriggerTimer(this), 0, period * 1000); try { Class.forName(config.getProperty(DATABASE_DRIVER_PROP)); } catch (ClassNotFoundException e) { logger.error("Database driver class not found", e); throw new MeasurementDatabaseEngineException("Database driver class not found", e); } } private String createStatementFromDataObject(ADataObject ado) { StringBuffer sb = new StringBuffer(); sb.append("INSERT INTO " + ado.getTableName()); sb.append("(name,ts,"); sb.append(String.join(",", ado.getValues().keySet())); sb.append(") "); sb.append("VALUES(?,?,"); String[] marks = (String[]) ado.getValues().values().stream() .map(c -> "?") .toArray(String[]::new); sb.append(String.join(",", marks)); sb.append(")"); return sb.toString(); } private void bindValue(PreparedStatement pstmt, int pos, Object o) throws SQLException { if (o instanceof Integer) { pstmt.setInt(pos, (Integer)o); } else if (o instanceof Byte) { pstmt.setByte(pos, (Byte)o); } else if (o instanceof Double) { pstmt.setDouble(pos, (Double)o); } else if (o instanceof String) { pstmt.setString(pos, (String)o); } else if (o instanceof LocalDateTime) { pstmt.setTimestamp(pos, Timestamp.valueOf(((LocalDateTime)o))); } else { throw new SQLException("illegal parameter type: " + o.getClass().getName()); } } @Override public synchronized void run() { while (! stop) { logger.debug("DatabaseEngine is about to wait for (regularly) " + period + "s"); while (! triggerFlag) { try { wait(); } catch (InterruptedException e) { } } triggerFlag = false; logger.debug("DatabaseEngine has received trigger"); Connection dbCon = null; HashMap preparedStatements = new HashMap(); try { int itemCnt = 0; dbCon = DriverManager.getConnection(dbUrl, dbUsername, dbPassword); while (true) { try { ADataObject ado = queue.dequeue(); if (ado == null) { logger.warn("DatabaseEngine found no data"); break; } String key = ado.getClass().getName(); PreparedStatement pstmt; if (! preparedStatements.containsKey(key)) { pstmt = dbCon.prepareStatement(createStatementFromDataObject(ado)); preparedStatements.put(key, pstmt); } else { pstmt = preparedStatements.get(key); } bindValue(pstmt, 1, ado.getName()); bindValue(pstmt, 2, ado.getTimestamp()); int pos = 3; for (Object o : ado.getValues().values()) { bindValue(pstmt, pos, o); pos++; } itemCnt++; logger.info("DatabaseEngine received (" + itemCnt + ") " + ado); logger.info("Statement is " + pstmt.toString()); pstmt.execute(); logger.info("Database insert executed"); pstmt.clearParameters(); } catch (SQLException e) { logger.error("SQLException in inner database engine loop", e); } } } catch (Exception e) { logger.error("Exception in outer database engine loop", e); } finally { for (PreparedStatement p : preparedStatements.values()) { try { p.close(); } catch (SQLException e) { logger.warn("SQLException when closing prepared statement, nothing will be done"); } } preparedStatements.clear(); if (dbCon != null) { try { dbCon.close(); } catch (SQLException e) { logger.warn("SQLException when closing connection, nothing will be done"); } finally { dbCon = null; } } } } logger.info("Database engine is terminating"); } }