import time import connexion from jose import JWTError, jwt import werkzeug import os import psycopg2 from collections import namedtuple from pbkdf2 import crypt from loguru import logger DB_USER = os.environ["DB_USER"] DB_PASS = os.environ["DB_PASS"] DB_HOST = os.environ["DB_HOST"] DB_NAME = os.environ["DB_NAME"] JWT_ISSUER = os.environ["JWT_ISSUER"] class NoUserException(Exception): pass class ManyUsersException(Exception): pass class PasswordMismatchException(Exception): pass UserEntry = namedtuple('UserEntry', ['id', 'login', 'pwhash', 'expiry', 'claims']) JWT_PRIV_KEY = "" try: JWT_PRIV_KEY = os.environ["JWT_PRIV_KEY"] except KeyError: with open('/opt/app/config/authservice.key', 'r') as f: JWT_PRIV_KEY = f.read() JWT_PUB_KEY = "" try: JWT_PUB_KEY = os.environ["JWT_PUB_KEY"] except KeyError: with open('/opt/app/config/authservice.pub', 'r') as f: JWT_PUB_KEY = f.read() def getUserEntryFromDB(application: str, login: str): conn = None cur = None try: conn = psycopg2.connect(user = DB_USER, password = DB_PASS, host = DB_HOST, database = DB_NAME) conn.autocommit = False userObj = None with conn.cursor() as cur: cur.execute("SELECT id, pwhash, expiry FROM user_application_v" + " WHERE application = %s AND login = %s", (application, login)) userObj = cur.fetchone() logger.debug("getUserEntryFromDB: userObj: {}".format(userObj)) if not userObj: raise NoUserException() invObj = cur.fetchone() if invObj: raise ManyUsersException() claims = {} with conn.cursor() as cur: cur.execute('SELECT key, value FROM claims_for_user_v where "user" = %s and application = %s', (userObj[0], application)) for claimObj in cur: logger.debug("getUserEntryFromDB: add claim {} -> {}".format(claimObj[0], claimObj[1])) if claimObj[0] in claims: if isinstance(claims[claimObj[0]], list): claims[claimObj[0]].append(claimObj[1]) else: claims[claimObj[0]] = [ claims[claimObj[0]] ] claims[claimObj[0]].append(claimObj[1]) else: claims[claimObj[0]] = claimObj[1] userEntry = UserEntry(id=userObj[0], login=login, pwhash=userObj[1], expiry=userObj[2], claims=claims) return userEntry except psycopg2.Error as err: raise Exception("Error when connecting to database: {}".format(err)) finally: if conn: conn.close() def getUserEntry(application, login, password): userEntry = getUserEntryFromDB(application, login) if userEntry.pwhash != crypt(password, userEntry.pwhash): raise PasswordMismatchException() return userEntry def generateToken(**args): try: body = args["body"] application = body["application"] login = body["login"] password = body["password"] userEntry = getUserEntry(application, login, password) timestamp = int(time.time()) payload = { "iss": JWT_ISSUER, "iat": int(timestamp), "exp": int(timestamp + userEntry.expiry), "sub": str(userEntry.id), "aud": application } logger.debug("claims: {}".format(userEntry.claims)) for claim in userEntry.claims.items(): logger.debug("generateToken: add claim {}".format(claim)) payload[claim[0]] = claim[1] return jwt.encode(payload, JWT_PRIV_KEY, algorithm='RS256') except NoUserException: logger.error("generateToken: no user found, login or application wrong") raise werkzeug.exceptions.Unauthorized() except ManyUsersException: logger.error("generateToken: too many users found") raise werkzeug.exceptions.Unauthorized() except PasswordMismatchException: logger.error("generateToken: wrong password") raise werkzeug.exceptions.Unauthorized() except KeyError: logger.error("generateToken: application, login or password missing") raise werkzeug.exceptions.Unauthorized() except Exception as e: logger.error("generateToken: unspecific exception: {}".format(str(e))) raise werkzeug.exceptions.Unauthorized() def generateTokenFromEnc(**args): cryptContent = args["body"] raise werkzeug.exceptions.NotImplemented("Stay tuned, will be added soon") return str(cryptContent) def getPubKey(): return JWT_PUB_KEY def decodeToken(token): try: return jwt.decode(token, JWT_PUB_KEY, audience="test") except JWTError as e: logger.error("decodeToken: {}".format(e)) raise werkzeug.exceptions.Unauthorized() def getSecret(user, token_info): return ''' You are user_id {user} and the secret is 'wbevuec'. Decoded token claims: {token_info}. '''.format(user=user, token_info=token_info)