authservice/auth.py
2021-01-25 21:52:52 +01:00

77 lines
2.1 KiB
Python
Executable File

import time
import connexion
from jose import JWTError, jwt
import os
import mariadb
JWT_ISSUER = os.environ['JWT_ISSUER']
JWT_SECRET = os.environ['JWT_SECRET']
JWT_LIFETIME_SECONDS = int(os.environ['JWT_LIFETIME_SECONDS'])
JWT_ALGORITHM = os.environ['JWT_ALGORITHM']
DB_USER = os.environ["DB_USER"]
DB_PASS = os.environ["DB_PASS"]
DB_HOST = os.environ["DB_HOST"]
DB_NAME = os.environ["DB_NAME"]
def getUserEntryFromDB(login, password):
conn = None
cur = None
try:
conn = mariadb.connect(user = DB_USER, password = DB_PASS,
host = DB_HOST, database = DB_NAME)
conn.autocommit = False
cur = conn.cursor(dictionary=True)
cur.execute("SELECT id FROM users WHERE login = ? AND password = ?", [login, password])
userEntry = cur.next()
if not userEntry:
raise Exception("No user entry found")
invObj = cur.next()
if invObj:
raise Exception("Too many user entries found")
return userEntry
except mariadb.Error as err:
raise Exception("Error when connecting to database: {}".format(err))
finally:
if cur:
cur.close()
if conn:
conn.rollback()
conn.close()
def getUserEntry(login, password):
return getUserEntryFromDB(login, password)
def generateToken(login, password):
userEntry = getUserEntryFromDB(login, password)
userId = userEntry["id"]
timestamp = int(time.time())
payload = {
"iss": JWT_ISSUER,
"iat": int(timestamp),
"exp": int(timestamp + JWT_LIFETIME_SECONDS),
"sub": str(userId),
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def decodeToken(token):
try:
return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
except JWTError as e:
return "Unauthorized ({})".format(str(e)), 401
def getSecret(user, token_info):
return '''
You are user_id {user} and the secret is 'wbevuec'.
Decoded token claims: {token_info}.
'''.format(user=user, token_info=token_info)