import time import connexion from jose import JWTError, jwt, jwe import json import werkzeug import os import psycopg2 from collections import namedtuple from pbkdf2 import crypt from loguru import logger DB_USER = os.environ["DB_USER"] DB_PASS = os.environ["DB_PASS"] DB_HOST = os.environ["DB_HOST"] DB_NAME = os.environ["DB_NAME"] JWT_ISSUER = os.environ["JWT_ISSUER"] class NoUserException(Exception): pass class ManyUsersException(Exception): pass class PasswordMismatchException(Exception): pass UserEntry = namedtuple('UserEntry', ['id', 'login', 'pwhash', 'expiry', 'claims']) JWT_PRIV_KEY = "" try: JWT_PRIV_KEY = os.environ["JWT_PRIV_KEY"] except KeyError: with open('/opt/app/config/authservice.key', 'r') as f: JWT_PRIV_KEY = f.read() JWT_PUB_KEY = "" try: JWT_PUB_KEY = os.environ["JWT_PUB_KEY"] except KeyError: with open('/opt/app/config/authservice.pub', 'r') as f: JWT_PUB_KEY = f.read() def getUserEntryFromDB(application: str, login: str): conn = None cur = None try: conn = psycopg2.connect(user = DB_USER, password = DB_PASS, host = DB_HOST, database = DB_NAME) conn.autocommit = False userObj = None with conn.cursor() as cur: cur.execute("SELECT id, pwhash, expiry FROM user_application_v" + " WHERE application = %s AND login = %s", (application, login)) userObj = cur.fetchone() logger.debug("userObj: {}".format(userObj)) if not userObj: raise NoUserException() invObj = cur.fetchone() if invObj: raise ManyUsersException() claims = {} with conn.cursor() as cur: cur.execute('SELECT key, value FROM claims_for_user_v where "user" = %s and application = %s', (userObj[0], application)) for claimObj in cur: logger.debug("add claim {} -> {}".format(claimObj[0], claimObj[1])) if claimObj[0] in claims: if isinstance(claims[claimObj[0]], list): claims[claimObj[0]].append(claimObj[1]) else: claims[claimObj[0]] = [ claims[claimObj[0]] ] claims[claimObj[0]].append(claimObj[1]) else: claims[claimObj[0]] = claimObj[1] userEntry = UserEntry(id=userObj[0], login=login, pwhash=userObj[1], expiry=userObj[2], claims=claims) return userEntry except psycopg2.Error as err: raise Exception("Error when connecting to database: {}".format(err)) finally: if conn: conn.close() def getUserEntry(application, login, password): userEntry = getUserEntryFromDB(application, login) if userEntry.pwhash != crypt(password, userEntry.pwhash): raise PasswordMismatchException() return userEntry def generateToken(**args): try: body = args["body"] application = "" login = "" password = "" if (("application" in body) and ("login" in body) and ("password" in body)): application = body["application"] login = body["login"] password = body["password"] elif ("encAleTuple" in body): clearContent = jwe.decrypt(body["encAleTuple"], JWT_PRIV_KEY) clearObj = json.loads(clearContent) application = clearObj["application"] login = clearObj["login"] password = clearObj["password"] else: raise KeyError("Neither application, login and password nor encAleTuple given") logger.debug(f"Tuple: {application} {login} {password}") userEntry = getUserEntry(application, login, password) timestamp = int(time.time()) payload = { "iss": JWT_ISSUER, "iat": int(timestamp), "exp": int(timestamp + userEntry.expiry), "sub": str(userEntry.id), "aud": application } logger.debug("claims: {}".format(userEntry.claims)) for claim in userEntry.claims.items(): logger.debug("add claim {}".format(claim)) payload[claim[0]] = claim[1] return jwt.encode(payload, JWT_PRIV_KEY, algorithm='RS256') except NoUserException: logger.error("no user found, login or application wrong") raise werkzeug.exceptions.Unauthorized() except ManyUsersException: logger.error("too many users found") raise werkzeug.exceptions.Unauthorized() except PasswordMismatchException: logger.error("wrong password") raise werkzeug.exceptions.Unauthorized() except KeyError: logger.error("application, login or password missing") raise werkzeug.exceptions.Unauthorized() except Exception as e: logger.error("unspecific exception: {}".format(str(e))) raise werkzeug.exceptions.Unauthorized() def getPubKey(): return JWT_PUB_KEY def decodeToken(token): try: return jwt.decode(token, JWT_PUB_KEY, audience="test") except JWTError as e: logger.error("{}".format(e)) raise werkzeug.exceptions.Unauthorized() def testToken(user, token_info): return ''' You are user_id {user} and the provided token has been signed by this issuers. Fine.'. Decoded token claims: {token_info}. '''.format(user=user, token_info=token_info)