136 lines
4.3 KiB
Python
Executable File

import time
import connexion
from jose import JWTError, jwt
import werkzeug
import os
import mariadb
from collections import namedtuple
from pbkdf2 import crypt
DB_USER = os.environ["DB_USER"]
DB_PASS = os.environ["DB_PASS"]
DB_HOST = os.environ["DB_HOST"]
DB_NAME = os.environ["DB_NAME"]
JWT_ISSUER = os.environ["JWT_ISSUER"]
class NoUserException(Exception):
pass
class ManyUsersException(Exception):
pass
class PasswordMismatchException(Exception):
pass
UserEntry = namedtuple('UserEntry', ['id', 'login', 'expiry', 'claims'])
JWT_PRIV_KEY = ""
with open('/opt/app/config/authservice.key', 'r') as f:
JWT_PRIV_KEY = f.read()
print("DEBUG PRIVKEY: {}".format(JWT_PRIV_KEY))
JWT_PUB_KEY = ""
with open('/opt/app/config/authservice.pub', 'r') as f:
JWT_PUB_KEY = f.read()
print("DEBUG PUBKEY: {}".format(JWT_PUB_KEY))
def getUserEntryFromDB(application: str, login: str):
conn = None
cur = None
try:
conn = mariadb.connect(user = DB_USER, password = DB_PASS,
host = DB_HOST, database = DB_NAME)
conn.autocommit = False
cur = conn.cursor(dictionary=True)
cur.execute("SELECT id, pwhash, expiry FROM user_application" +
" WHERE application = ? AND login = ?",
[application, login])
resObj = cur.next()
print("DEBUG: getUserEntryFromDB: resObj: {}".format(resObj))
if not resObj:
raise NoUserException()
invObj = cur.next()
if invObj:
raise ManyUsersException()
userId = resObj["id"]
cur.execute("SELECT user, `key`, `value` FROM claims_for_user where user = ?",
[userId])
claims = {}
for claimObj in cur:
print("DEBUG: getUserEntryFromDB: add claim {} -> {}".format(claimObj["key"], claimObj["value"]))
if claimObj["key"] in claims:
if isinstance(claims[claimObj["key"]], list):
claims[claimObj["key"]].append(claimObj["value"])
else:
claims[claimObj["key"]] = [ claims[claimObj["key"]] ]
claims[claimObj["key"]].append(claimObj["value"])
else:
claims[claimObj["key"]] = claimObj["value"]
userEntry = UserEntry(id=userId, login=login, expiry=resObj["expiry"], claims=claims)
return userEntry, resObj["pwhash"]
except mariadb.Error as err:
raise Exception("Error when connecting to database: {}".format(err))
finally:
if cur:
cur.close()
if conn:
conn.rollback()
conn.close()
def getUserEntry(application, login, password):
userEntry, pwhash = getUserEntryFromDB(application, login)
if pwhash != crypt(password, pwhash):
raise PasswordMismatchException()
return userEntry
def generateToken(**args):
try:
body = args["body"]
application = body["application"]
login = body["login"]
password = body["password"]
userEntry = getUserEntry(application, login, password)
timestamp = int(time.time())
payload = {
"iss": JWT_ISSUER,
"iat": int(timestamp),
"exp": int(timestamp + userEntry.expiry),
"sub": str(userEntry.id)
}
for claim in userEntry.claims.items():
# print("DEBUG: generateToken: add claim {} -> {}".format(claim[0], claim[1]))
payload[claim[0]] = claim[1]
return jwt.encode(payload, JWT_PRIV_KEY, algorithm='RS256')
except NoUserException:
print("ERROR: generateToken: no user found, login or application wrong")
raise werkzeug.exceptions.Unauthorized()
except ManyUsersException:
print("ERROR: generateToken: too many users found")
raise werkzeug.exceptions.Unauthorized()
except PasswordMismatchException:
print("ERROR: generateToken: wrong password")
raise werkzeug.exceptions.Unauthorized()
except KeyError:
print("ERROR: generateToken: application, login or password missing")
raise werkzeug.exceptions.Unauthorized()
except Exception as e:
print("ERROR: generateToken: unspecific exception: {}".format(str(e)))
raise werkzeug.exceptions.Unauthorized()
def getPubKey():
return JWT_PUB_KEY