authservice/auth.py

160 lines
5.1 KiB
Python
Executable File

import time
import connexion
from jose import JWTError, jwt
import werkzeug
import os
import psycopg2
from collections import namedtuple
from pbkdf2 import crypt
from loguru import logger
DB_USER = os.environ["DB_USER"]
DB_PASS = os.environ["DB_PASS"]
DB_HOST = os.environ["DB_HOST"]
DB_NAME = os.environ["DB_NAME"]
JWT_ISSUER = os.environ["JWT_ISSUER"]
class NoUserException(Exception):
pass
class ManyUsersException(Exception):
pass
class PasswordMismatchException(Exception):
pass
UserEntry = namedtuple('UserEntry', ['id', 'login', 'pwhash', 'expiry', 'claims'])
JWT_PRIV_KEY = ""
try:
JWT_PRIV_KEY = os.environ["JWT_PRIV_KEY"]
except KeyError:
with open('/opt/app/config/authservice.key', 'r') as f:
JWT_PRIV_KEY = f.read()
JWT_PUB_KEY = ""
try:
JWT_PUB_KEY = os.environ["JWT_PUB_KEY"]
except KeyError:
with open('/opt/app/config/authservice.pub', 'r') as f:
JWT_PUB_KEY = f.read()
def getUserEntryFromDB(application: str, login: str):
conn = None
cur = None
try:
conn = psycopg2.connect(user = DB_USER, password = DB_PASS,
host = DB_HOST, database = DB_NAME)
conn.autocommit = False
userObj = None
with conn.cursor() as cur:
cur.execute("SELECT id, pwhash, expiry FROM user_application_v" +
" WHERE application = %s AND login = %s",
(application, login))
userObj = cur.fetchone()
logger.debug("getUserEntryFromDB: userObj: {}".format(userObj))
if not userObj:
raise NoUserException()
invObj = cur.fetchone()
if invObj:
raise ManyUsersException()
claims = {}
with conn.cursor() as cur:
cur.execute('SELECT key, value FROM claims_for_user_v where "user" = %s and application = %s',
(userObj[0], application))
for claimObj in cur:
logger.debug("getUserEntryFromDB: add claim {} -> {}".format(claimObj[0], claimObj[1]))
if claimObj[0] in claims:
if isinstance(claims[claimObj[0]], list):
claims[claimObj[0]].append(claimObj[1])
else:
claims[claimObj[0]] = [ claims[claimObj[0]] ]
claims[claimObj[0]].append(claimObj[1])
else:
claims[claimObj[0]] = claimObj[1]
userEntry = UserEntry(id=userObj[0], login=login, pwhash=userObj[1], expiry=userObj[2], claims=claims)
return userEntry
except psycopg2.Error as err:
raise Exception("Error when connecting to database: {}".format(err))
finally:
if conn:
conn.close()
def getUserEntry(application, login, password):
userEntry = getUserEntryFromDB(application, login)
if userEntry.pwhash != crypt(password, userEntry.pwhash):
raise PasswordMismatchException()
return userEntry
def generateToken(**args):
try:
body = args["body"]
application = body["application"]
login = body["login"]
password = body["password"]
userEntry = getUserEntry(application, login, password)
timestamp = int(time.time())
payload = {
"iss": JWT_ISSUER,
"iat": int(timestamp),
"exp": int(timestamp + userEntry.expiry),
"sub": str(userEntry.id),
"aud": application
}
logger.debug("claims: {}".format(userEntry.claims))
for claim in userEntry.claims.items():
logger.debug("generateToken: add claim {}".format(claim))
payload[claim[0]] = claim[1]
return jwt.encode(payload, JWT_PRIV_KEY, algorithm='RS256')
except NoUserException:
logger.error("generateToken: no user found, login or application wrong")
raise werkzeug.exceptions.Unauthorized()
except ManyUsersException:
logger.error("generateToken: too many users found")
raise werkzeug.exceptions.Unauthorized()
except PasswordMismatchException:
logger.error("generateToken: wrong password")
raise werkzeug.exceptions.Unauthorized()
except KeyError:
logger.error("generateToken: application, login or password missing")
raise werkzeug.exceptions.Unauthorized()
except Exception as e:
logger.error("generateToken: unspecific exception: {}".format(str(e)))
raise werkzeug.exceptions.Unauthorized()
def generateTokenFromEnc(**args):
cryptContent = args["body"]
raise werkzeug.exceptions.NotImplemented("Stay tuned, will be added soon")
return str(cryptContent)
def getPubKey():
return JWT_PUB_KEY
def decodeToken(token):
try:
return jwt.decode(token, JWT_PUB_KEY, audience="test")
except JWTError as e:
logger.error("decodeToken: {}".format(e))
raise werkzeug.exceptions.Unauthorized()
def getSecret(user, token_info):
return '''
You are user_id {user} and the secret is 'wbevuec'.
Decoded token claims: {token_info}.
'''.format(user=user, token_info=token_info)