import time import connexion from jose import JWTError, jwt import werkzeug import os import mariadb from collections import namedtuple DB_USER = os.environ["DB_USER"] DB_PASS = os.environ["DB_PASS"] DB_HOST = os.environ["DB_HOST"] DB_NAME = os.environ["DB_NAME"] class NoUserException(Exception): pass class ManyUsersException(Exception): pass UserEntry = namedtuple('UserEntry', ['id', 'login', 'issuer', 'secret', 'expiry', 'claims']) def getUserEntryFromDB(login: str, password: str) -> UserEntry: conn = None cur = None try: conn = mariadb.connect(user = DB_USER, password = DB_PASS, host = DB_HOST, database = DB_NAME) conn.autocommit = False cur = conn.cursor(dictionary=True) # print("DEBUG: getUserEntryFromDB: login: <{}>, password: <{}>".format(login, password)) cur.execute("SELECT id, issuer, secret, expiry FROM user_and_issuer WHERE login = ? AND password = ?", [login, password]) resObj = cur.next() print("DEBUG: getUserEntryFromDB: resObj: {}".format(resObj)) if not resObj: raise NoUserException() invObj = cur.next() if invObj: raise ManyUsersException() userId = resObj["id"] cur.execute("SELECT user, `key`, `value` FROM claims_for_user where user = ?", [userId]) claims = {} for claimObj in cur: print("DEBUG: getUserEntryFromDB: add claim {} -> {}".format(claimObj["key"], claimObj["value"])) if claimObj["key"] in claims: if isinstance(claimObj["key"], list): claims[claimObj["key"]].append(claimObj["value"]) else: claims[claimObj["key"]] = [ claims[claimObj["key"]] ] claims[claimObj["key"]].append(claimObj["value"]) else: claims[claimObj["key"]] = claimObj["value"] userEntry = UserEntry(id=userId, login=login, secret=resObj["secret"], issuer=resObj["issuer"], expiry=resObj["expiry"], claims=claims) return userEntry except mariadb.Error as err: raise Exception("Error when connecting to database: {}".format(err)) finally: if cur: cur.close() if conn: conn.rollback() conn.close() def getUserEntry(login: str, password: str) -> UserEntry: return getUserEntryFromDB(login, password) def generateToken(**args): try: body = args["body"] login = body["login"] password = body["password"] userEntry = getUserEntryFromDB(login, password) timestamp = int(time.time()) payload = { "iss": userEntry.issuer, "iat": int(timestamp), "exp": int(timestamp + userEntry.expiry), "sub": str(userEntry.id) } for claim in userEntry.claims.items(): # print("DEBUG: generateToken: add claim {} -> {}".format(claim[0], claim[1])) payload["x-{}".format(claim[0])] = claim[1] return jwt.encode(payload, userEntry.secret) except NoUserException: print("ERROR: generateToken: no user found, login or password wrong") raise werkzeug.exceptions.Unauthorized() except ManyUsersException: print("ERROR: generateToken: too many users found") raise werkzeug.exceptions.Unauthorized() except KeyError: print("ERROR: generateToken: login or password missing") raise werkzeug.exceptions.Unauthorized() except Exception as e: print("ERROR: generateToken: unspecific exception: {}".format(str(e))) raise werkzeug.exceptions.Unauthorized()