import psycopg2 import psycopg2.extras from loguru import logger import os import configparser import json import werkzeug DB_USER = "" DB_PASS = "" DB_HOST = "" DB_NAME = "" try: DB_USER = os.environ["DB_USER"] DB_PASS = os.environ["DB_PASS"] DB_HOST = os.environ["DB_HOST"] DB_NAME = os.environ["DB_NAME"] except KeyError: config = configparser.ConfigParser() config.read('./config/dbconfig.ini') DB_USER = config["database"]["user"] DB_PASS = config["database"]["pass"] DB_HOST = config["database"]["host"] DB_NAME = config["database"]["name"] class NoDataFoundException(Exception): pass class TooMuchDataFoundException(Exception): pass def databaseOperation(cursor, params): cursor.execute('SELECT key, value FROM claims_for_user_v where "user" = %s and application = %s', params) for claimObj in cursor: logger.debug("add claim {} -> {}".format(claimObj[0], claimObj[1])) return [] def execDatabaseOperation(func, params): conn = None cur = None try: conn = psycopg2.connect(user = DB_USER, password = DB_PASS, host = DB_HOST, database = DB_NAME, sslmode = 'require') conn.autocommit = False with conn: with conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) as cur: params["params"] = [ v if not v=='' else None for v in params["params"] ] logger.debug("edo: {}".format(str(params))) return func(cur, params) except psycopg2.Error as err: raise Exception("Error when connecting to database: {}".format(err)) finally: if conn: conn.close() def _opGetMany(cursor, params): items = [] cursor.execute(params["statement"]) for itemObj in cursor: logger.debug("item received {}".format(str(itemObj))) items.append(itemObj) return items def dbGetMany(user, token_info, params): logger.info("params: {}, token: {}".format(params, json.dumps(token_info))) try: return execDatabaseOperation(_opGetMany, params) except Exception as e: logger.error(f"Exception: {e}") raise werkzeug.exceptions.InternalServerError def _opGetOne(cursor, params): cursor.execute(params["statement"], params["params"]) itemObj = cursor.fetchone() logger.debug(f"item received: {itemObj}") if not itemObj: raise NoDataFoundException dummyObj = cursor.fetchone() if dummyObj: raise TooMuchDataFoundException return itemObj def dbGetOne(user, token_info, params): logger.info("params: {}, token: {}".format(params, json.dumps(token_info))) try: return execDatabaseOperation(_opGetOne, params) except NoDataFoundException: logger.error("no data found") raise werkzeug.exceptions.NotFound except TooMuchDataFoundException: logger.error("too much data found") raise werkzeug.exceptions.InternalServerError except Exception as e: logger.error(f"Exception: {e}") raise werkzeug.exceptions.InternalServerError def dbInsert(user, token_info, params): logger.info("params: {}, token: {}".format(params, json.dumps(token_info))) try: return execDatabaseOperation(_opGetOne, params) except Exception as e: logger.error(f"Exception: {e}") raise werkzeug.exceptions.InternalServerError def dbUpdate(user, token_info, params): logger.info("params: {}, token: {}".format(params, json.dumps(token_info))) try: return execDatabaseOperation(_opGetOne, params) except Exception as e: logger.error(f"Exception: {e}") raise werkzeug.exceptions.InternalServerError