support handover of cleartext and encrypted credentials, refactor naming of methods

This commit is contained in:
Wolfgang Hottgenroth 2021-06-15 23:13:18 +02:00
parent 1f55ef0a80
commit 91178b1fa7
Signed by: wn
GPG Key ID: E49AF3B9EF6DD469
4 changed files with 48 additions and 44 deletions

53
auth.py
View File

@ -1,6 +1,7 @@
import time
import connexion
from jose import JWTError, jwt
from jose import JWTError, jwt, jwe
import json
import werkzeug
import os
import psycopg2
@ -60,7 +61,7 @@ def getUserEntryFromDB(application: str, login: str):
" WHERE application = %s AND login = %s",
(application, login))
userObj = cur.fetchone()
logger.debug("getUserEntryFromDB: userObj: {}".format(userObj))
logger.debug("userObj: {}".format(userObj))
if not userObj:
raise NoUserException()
invObj = cur.fetchone()
@ -72,7 +73,7 @@ def getUserEntryFromDB(application: str, login: str):
cur.execute('SELECT key, value FROM claims_for_user_v where "user" = %s and application = %s',
(userObj[0], application))
for claimObj in cur:
logger.debug("getUserEntryFromDB: add claim {} -> {}".format(claimObj[0], claimObj[1]))
logger.debug("add claim {} -> {}".format(claimObj[0], claimObj[1]))
if claimObj[0] in claims:
if isinstance(claims[claimObj[0]], list):
claims[claimObj[0]].append(claimObj[1])
@ -100,9 +101,26 @@ def getUserEntry(application, login, password):
def generateToken(**args):
try:
body = args["body"]
application = body["application"]
login = body["login"]
password = body["password"]
application = ""
login = ""
password = ""
if (("application" in body) and
("login" in body) and
("password" in body)):
application = body["application"]
login = body["login"]
password = body["password"]
elif ("encAleTuple" in body):
clearContent = jwe.decrypt(body["encAleTuple"], JWT_PRIV_KEY)
clearObj = json.loads(clearContent)
application = clearObj["application"]
login = clearObj["login"]
password = clearObj["password"]
else:
raise KeyError("Neither application, login and password nor encAleTuple given")
logger.debug(f"Tuple: {application} {login} {password}")
userEntry = getUserEntry(application, login, password)
@ -116,31 +134,26 @@ def generateToken(**args):
}
logger.debug("claims: {}".format(userEntry.claims))
for claim in userEntry.claims.items():
logger.debug("generateToken: add claim {}".format(claim))
logger.debug("add claim {}".format(claim))
payload[claim[0]] = claim[1]
return jwt.encode(payload, JWT_PRIV_KEY, algorithm='RS256')
except NoUserException:
logger.error("generateToken: no user found, login or application wrong")
logger.error("no user found, login or application wrong")
raise werkzeug.exceptions.Unauthorized()
except ManyUsersException:
logger.error("generateToken: too many users found")
logger.error("too many users found")
raise werkzeug.exceptions.Unauthorized()
except PasswordMismatchException:
logger.error("generateToken: wrong password")
logger.error("wrong password")
raise werkzeug.exceptions.Unauthorized()
except KeyError:
logger.error("generateToken: application, login or password missing")
logger.error("application, login or password missing")
raise werkzeug.exceptions.Unauthorized()
except Exception as e:
logger.error("generateToken: unspecific exception: {}".format(str(e)))
logger.error("unspecific exception: {}".format(str(e)))
raise werkzeug.exceptions.Unauthorized()
def generateTokenFromEnc(**args):
cryptContent = args["body"]
raise werkzeug.exceptions.NotImplemented("Stay tuned, will be added soon")
return str(cryptContent)
def getPubKey():
return JWT_PUB_KEY
@ -148,12 +161,12 @@ def decodeToken(token):
try:
return jwt.decode(token, JWT_PUB_KEY, audience="test")
except JWTError as e:
logger.error("decodeToken: {}".format(e))
logger.error("{}".format(e))
raise werkzeug.exceptions.Unauthorized()
def getSecret(user, token_info):
def testToken(user, token_info):
return '''
You are user_id {user} and the secret is 'wbevuec'.
You are user_id {user} and the provided token has been signed by this issuers. Fine.'.
Decoded token claims: {token_info}.
'''.format(user=user, token_info=token_info)

View File

@ -4,16 +4,18 @@ info:
version: "0.1"
paths:
/auth:
/token:
post:
tags: [ "JWT" ]
summary: Accept login and password, return JWT token
summary: Accept encrypted or clear set of credentials, return JWT token
operationId: auth.generateToken
requestBody:
content:
'application/json':
schema:
$ref: '#/components/schemas/User'
anyOf:
- $ref: '#/components/schemas/User'
- $ref: '#/components/schemas/EncUser'
responses:
'200':
description: JWT token
@ -21,28 +23,11 @@ paths:
'text/plain':
schema:
type: string
/authe:
post:
tags: [ "JWT" ]
summary: Accept encrypted set of credentials, return JWT token
operationId: auth.generateTokenFromEnc
requestBody:
content:
'text/plain':
schema:
type: string
responses:
'200':
description: JWT token
content:
'text/plain':
schema:
type: string
/secret:
/test:
get:
tags: [ "Test" ]
summary: Return secret string
operationId: auth.getSecret
operationId: auth.testToken
responses:
'200':
description: secret response
@ -84,3 +69,9 @@ components:
type: string
password:
type: string
EncUser:
description: Encrypted Application/Login/Password tuple
type: object
properties:
encAleTuple:
type: string

View File

@ -1,7 +1,7 @@
import connexion
import logging
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)
app = connexion.App('authservice')
app.add_api('./openapi.yaml')

View File

@ -8,7 +8,7 @@ JWT_PRIV_KEY = os.environ["JWT_PRIV_KEY"]
class JweTestMethods(unittest.TestCase):
def test_encryptDecrypt(self):
inObj = {"key1":"value1", "key2":"value2"}
inObj = {"application":"test", "login":"wn", "password":"joshua"}
plainText = json.dumps(inObj)
cryptText = jwe.encrypt(plainText, JWT_PUB_KEY, "A256GCM", "RSA-OAEP")