error handling
This commit is contained in:
parent
dd9871578f
commit
fd0e0e9c71
@ -17,7 +17,6 @@ class AbstractSinkHandler(threading.Thread):
|
|||||||
self.inQueue = inQueue
|
self.inQueue = inQueue
|
||||||
self.experiment = experiment
|
self.experiment = experiment
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
logger.debug("loop started")
|
logger.debug("loop started")
|
||||||
|
|
||||||
@ -32,8 +31,6 @@ class AbstractSinkHandler(threading.Thread):
|
|||||||
self.sinkAction(dataObject)
|
self.sinkAction(dataObject)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"unexpected error, items dropped: {type(e)}, {e}")
|
logger.error(f"unexpected error, items dropped: {type(e)}, {e}")
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
self.deinit()
|
self.deinit()
|
||||||
|
|
||||||
|
@ -1,14 +1,25 @@
|
|||||||
from io import StringIO
|
from io import StringIO
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
import psycopg2.extensions
|
||||||
from AbstractSinkHandler import AbstractSinkHandler
|
from AbstractSinkHandler import AbstractSinkHandler
|
||||||
from PgDbHandle import PgDbHandle
|
from PgDbHandle import PgDbHandle, PgDbHandleException
|
||||||
|
|
||||||
|
|
||||||
class TsDbSinkException(Exception):
|
class TsDbSinkException(Exception):
|
||||||
def __init__(self, code, msg):
|
def __init__(self, pgDbHandleException):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.code = code
|
self.errType = pgDbHandleException.errType
|
||||||
self.msg = msg
|
self.code = pgDbHandleException.code
|
||||||
|
self.msg = pgDbHandleException.msg
|
||||||
|
|
||||||
|
def _copy(cursor, table, columns, data):
|
||||||
|
stringIO = StringIO(data)
|
||||||
|
cursor.copy_from(stringIO, table, columns=columns)
|
||||||
|
|
||||||
|
def _insert(cursor, table, columns, data):
|
||||||
|
stmt = f"insert into {table} ({','.join(columns)}) values ({','.join(['%s'] * len(columns))})"
|
||||||
|
cursor.execute(stmt, data)
|
||||||
|
|
||||||
class AbstractTsDbSinkHandler(AbstractSinkHandler):
|
class AbstractTsDbSinkHandler(AbstractSinkHandler):
|
||||||
def __init__(self, config, name, inQueue, experiment):
|
def __init__(self, config, name, inQueue, experiment):
|
||||||
@ -18,28 +29,20 @@ class AbstractTsDbSinkHandler(AbstractSinkHandler):
|
|||||||
|
|
||||||
def init(self):
|
def init(self):
|
||||||
logger.info("init")
|
logger.info("init")
|
||||||
self.dbh.connectDb()
|
self.dbh.connect()
|
||||||
|
|
||||||
def deinit(self):
|
def deinit(self):
|
||||||
logger.info("deinit")
|
logger.info("deinit")
|
||||||
self.dbh.disconnectDb()
|
self.dbh.disconnect()
|
||||||
|
|
||||||
def copy(self, table, columns, data):
|
def copy(self, table, columns, data):
|
||||||
try:
|
try:
|
||||||
stringIO = StringIO(data)
|
self.dbh.execute(_copy, table, columns, data)
|
||||||
with self.dbh.connection as conn:
|
except PgDbHandleException as e:
|
||||||
with conn.cursor() as cursor:
|
raise TsDbSinkException(e)
|
||||||
cursor.copy_from(stringIO, table, columns=columns)
|
|
||||||
except psycopg2.Error as e:
|
|
||||||
raise TsDbSinkException(e.pgcode, str(e))
|
|
||||||
|
|
||||||
def insert(self, table, columns, data):
|
def insert(self, table, columns, data):
|
||||||
placeholders = ['%s'] * len(columns)
|
|
||||||
stmt = f"insert into {table} ({','.join(columns)}) values ({','.join(placeholders)})"
|
|
||||||
logger.debug(f"{stmt=}")
|
|
||||||
try:
|
try:
|
||||||
with self.dbh.connection as conn:
|
self.dbh.execute(_insert, table, columns, data)
|
||||||
with conn.cursor() as cursor:
|
except PgDbHandleException as e:
|
||||||
cursor.execute(stmt, data)
|
raise TsDbSinkException(e)
|
||||||
except psycopg2.Error as e:
|
|
||||||
raise TsDbSinkException(e.pgcode, str(e))
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
from loguru import logger
|
from loguru import logger
|
||||||
from AbstractTsDbSinkHandler import TsDbSinkException, AbstractTsDbSinkHandler
|
from AbstractTsDbSinkHandler import AbstractTsDbSinkHandler, TsDbSinkException
|
||||||
import iso8601
|
import iso8601
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
@ -35,5 +35,5 @@ class EGM1mdTsDbSinkHandler(AbstractTsDbSinkHandler):
|
|||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
logger.error(f"missing attribute in payload, items dropped: {e}")
|
logger.error(f"missing attribute in payload, items dropped: {e}")
|
||||||
except TsDbSinkException as e:
|
except TsDbSinkException as e:
|
||||||
logger.error(f"error {e.code} when talking to database, items dropped: {e.msg}")
|
logger.error(f"error {e.errType}, {e.code} when talking to database, items dropped: {e.msg}")
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
from loguru import logger
|
from loguru import logger
|
||||||
from AbstractTsDbSinkHandler import TsDbSinkException, AbstractTsDbSinkHandler
|
from AbstractTsDbSinkHandler import AbstractTsDbSinkHandler, TsDbSinkException
|
||||||
import iso8601
|
import iso8601
|
||||||
|
|
||||||
|
|
||||||
@ -28,6 +28,6 @@ class FlowRigCmdTsDbSinkHandler(AbstractTsDbSinkHandler):
|
|||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
logger.error(f"missing attribute in payload, items dropped: {e}")
|
logger.error(f"missing attribute in payload, items dropped: {e}")
|
||||||
except TsDbSinkException as e:
|
except TsDbSinkException as e:
|
||||||
logger.error(f"error {e.code} when talking to database, items dropped: {e.msg}")
|
logger.error(f"error {e.errType}, {e.code} when talking to database, items dropped: {e.msg}")
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
from loguru import logger
|
from loguru import logger
|
||||||
from AbstractTsDbSinkHandler import TsDbSinkException, AbstractTsDbSinkHandler
|
from AbstractTsDbSinkHandler import AbstractTsDbSinkHandler, TsDbSinkException
|
||||||
import iso8601
|
import iso8601
|
||||||
|
|
||||||
|
|
||||||
@ -28,6 +28,6 @@ class FlowRigLogTsDbSinkHandler(AbstractTsDbSinkHandler):
|
|||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
logger.error(f"missing attribute in payload, items dropped: {e}")
|
logger.error(f"missing attribute in payload, items dropped: {e}")
|
||||||
except TsDbSinkException as e:
|
except TsDbSinkException as e:
|
||||||
logger.error(f"error {e.code} when talking to database, items dropped: {e.msg}")
|
logger.error(f"error {e.errType}, {e.code} when talking to database, items dropped: {e.msg}")
|
||||||
|
|
||||||
|
|
||||||
|
@ -3,13 +3,23 @@ import psycopg2
|
|||||||
import psycopg2.extras
|
import psycopg2.extras
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
MAX_RETRY_CNT = 3
|
||||||
|
|
||||||
|
class PgDbHandleException(Exception):
|
||||||
|
def __init__(self, errType, code, msg):
|
||||||
|
super().__init__()
|
||||||
|
self.errType = errType
|
||||||
|
self.code = code
|
||||||
|
self.msg = msg
|
||||||
|
|
||||||
class PgDbHandle(object):
|
class PgDbHandle(object):
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
logger.info("constructor called")
|
logger.info("constructor called")
|
||||||
self.config = config
|
self.config = config
|
||||||
self.connection = None
|
self.connection = None
|
||||||
|
|
||||||
def connectDb(self):
|
def connect(self):
|
||||||
dbHost = self.config["database"]["host"]
|
dbHost = self.config["database"]["host"]
|
||||||
dbUser = self.config["database"]["user"]
|
dbUser = self.config["database"]["user"]
|
||||||
dbPassword = self.config["database"]["password"]
|
dbPassword = self.config["database"]["password"]
|
||||||
@ -18,17 +28,41 @@ class PgDbHandle(object):
|
|||||||
dbName = self.config["database"]["name"]
|
dbName = self.config["database"]["name"]
|
||||||
|
|
||||||
logger.info(f"Connect to database")
|
logger.info(f"Connect to database")
|
||||||
connInfo = f"user={dbUser} host={dbHost} dbname={dbName} sslmode=require"
|
self.connInfo = f"user={dbUser} host={dbHost} dbname={dbName} sslmode=require"
|
||||||
if dbPassword != "$NONE$":
|
if dbPassword != "$NONE$":
|
||||||
connInfo += f" password={dbPassword}"
|
self.connInfo += f" password={dbPassword}"
|
||||||
self.connection = psycopg2.connect(connInfo)
|
self.connection = psycopg2.connect(self.connInfo)
|
||||||
self.connection.autocommit = False
|
self.connection.autocommit = False
|
||||||
logger.info("Connection to database established")
|
logger.info("Connection to database established")
|
||||||
|
|
||||||
def disconnectDb(self):
|
def disconnect(self):
|
||||||
logger.info(f"Disonnect from database")
|
logger.info(f"Disonnect from database")
|
||||||
if self.connection:
|
if self.connection:
|
||||||
self.connection.close()
|
self.connection.close()
|
||||||
|
|
||||||
|
def execute(self, func, *args):
|
||||||
|
try:
|
||||||
|
retryCnt = 0
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
if self.connection.status != psycopg2.extensions.STATUS_READY:
|
||||||
|
logger.error("resetting database connection")
|
||||||
|
self.connection.reset()
|
||||||
|
if self.connection.closed != 0:
|
||||||
|
logger.error("re-opening database connection")
|
||||||
|
self.connection = psycopg2.connect(self.connInfo)
|
||||||
|
self.connection.autocommit = False
|
||||||
|
with self.connection as conn:
|
||||||
|
with conn.cursor() as cursor:
|
||||||
|
func(cursor, *args)
|
||||||
|
break # in case of success we leave this retry loop
|
||||||
|
except psycopg2.InterfaceError as e:
|
||||||
|
retryCnt += 1
|
||||||
|
logger.error(f"InterfaceError when talking to database: {e}, {retryCnt}. retry of {MAX_RETRY_CNT}")
|
||||||
|
if retryCnt > MAX_RETRY_CNT:
|
||||||
|
logger.error("Giving up")
|
||||||
|
raise e
|
||||||
|
except psycopg2.Error as e:
|
||||||
|
raise PgDbHandleException(type(e), e.pgcode, str(e))
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user