10 Commits
0.1.7 ... 0.4.1

8 changed files with 457 additions and 220 deletions

4
.gitignore vendored
View File

@ -1,2 +1,6 @@
__pycache__/ __pycache__/
ENV ENV
config/
*~
.*~

View File

@ -16,8 +16,9 @@ ENV JWT_SECRET='streng_geheim'
RUN \ RUN \
apt update && \ apt update && \
apt install -y libmariadbclient-dev && \ apt install -y postgresql-client-common && \
pip3 install mariadb && \ pip3 install psycopg2 && \
pip3 install loguru && \
pip3 install dateparser && \ pip3 install dateparser && \
pip3 install connexion && \ pip3 install connexion && \
pip3 install connexion[swagger-ui] && \ pip3 install connexion[swagger-ui] && \

333
auth.py
View File

@ -1,48 +1,77 @@
import time import time
import connexion import connexion
from jose import JWTError, jwt from jose import JWTError, jwt, jwe
import json
from jose.exceptions import ExpiredSignatureError
import werkzeug import werkzeug
import os import os
import mariadb import psycopg2
from collections import namedtuple from collections import namedtuple
from pbkdf2 import crypt from pbkdf2 import crypt
from loguru import logger
import configparser
import random
import string
DB_USER = ""
DB_PASS = ""
DB_HOST = ""
DB_NAME = ""
JWT_ISSUER = ""
try:
DB_USER = os.environ["DB_USER"] DB_USER = os.environ["DB_USER"]
DB_PASS = os.environ["DB_PASS"] DB_PASS = os.environ["DB_PASS"]
DB_HOST = os.environ["DB_HOST"] DB_HOST = os.environ["DB_HOST"]
DB_NAME = os.environ["DB_NAME"] DB_NAME = os.environ["DB_NAME"]
JWT_ISSUER = os.environ["JWT_ISSUER"] JWT_ISSUER = os.environ["JWT_ISSUER"]
except KeyError:
config = configparser.ConfigParser()
config.read('./config/config.ini')
DB_USER = config["database"]["user"]
DB_PASS = config["database"]["pass"]
DB_HOST = config["database"]["host"]
DB_NAME = config["database"]["name"]
JWT_ISSUER = config["jwt"]["issuer"]
class NoUserException(Exception): class NoUserException(Exception):
pass pass
class NoTokenException(Exception):
pass
class NoValidTokenException(Exception):
pass
class ManyUsersException(Exception): class ManyUsersException(Exception):
pass pass
class ManyTokensException(Exception):
pass
class PasswordMismatchException(Exception): class PasswordMismatchException(Exception):
pass pass
class RefreshTokenExpiredException(Exception):
pass
UserEntry = namedtuple('UserEntry', ['id', 'login', 'expiry', 'claims'])
UserEntry = namedtuple('UserEntry', ['id', 'login', 'pwhash', 'expiry', 'claims'])
RefreshTokenEntry = namedtuple('RefreshTokenEntry', ['id', 'salt', 'login', 'app', 'expiry'])
JWT_PRIV_KEY = "" JWT_PRIV_KEY = ""
try: try:
JWT_PRIV_KEY = os.environ["JWT_PRIV_KEY"] JWT_PRIV_KEY = os.environ["JWT_PRIV_KEY"]
except KeyError: except KeyError:
with open('/opt/app/config/authservice.key', 'r') as f: with open('./config/authservice.key', 'r') as f:
JWT_PRIV_KEY = f.read() JWT_PRIV_KEY = f.read()
JWT_PUB_KEY = "" JWT_PUB_KEY = ""
try: try:
JWT_PUB_KEY = os.environ["JWT_PUB_KEY"] JWT_PUB_KEY = os.environ["JWT_PUB_KEY"]
except KeyError: except KeyError:
with open('/opt/app/config/authservice.pub', 'r') as f: with open('./config/authservice.pub', 'r') as f:
JWT_PUB_KEY = f.read() JWT_PUB_KEY = f.read()
@ -50,63 +79,147 @@ def getUserEntryFromDB(application: str, login: str):
conn = None conn = None
cur = None cur = None
try: try:
conn = mariadb.connect(user = DB_USER, password = DB_PASS, conn = psycopg2.connect(user = DB_USER, password = DB_PASS,
host = DB_HOST, database = DB_NAME) host = DB_HOST, database = DB_NAME)
conn.autocommit = False conn.autocommit = False
cur = conn.cursor(dictionary=True) userObj = None
cur.execute("SELECT id, pwhash, expiry FROM user_application" + with conn.cursor() as cur:
" WHERE application = ? AND login = ?", cur.execute("SELECT id, pwhash, expiry FROM user_application_v" +
[application, login]) " WHERE application = %s AND login = %s",
resObj = cur.next() (application, login))
print("DEBUG: getUserEntryFromDB: resObj: {}".format(resObj)) userObj = cur.fetchone()
if not resObj: logger.debug("userObj: {}".format(userObj))
if not userObj:
raise NoUserException() raise NoUserException()
invObj = cur.next() invObj = cur.fetchone()
if invObj: if invObj:
raise ManyUsersException() raise ManyUsersException()
userId = resObj["id"]
cur.execute("SELECT user, `key`, `value` FROM claims_for_user where user = ?",
[userId])
claims = {} claims = {}
with conn.cursor() as cur:
cur.execute('SELECT key, value FROM claims_for_user_v where "user" = %s and application = %s',
(userObj[0], application))
for claimObj in cur: for claimObj in cur:
print("DEBUG: getUserEntryFromDB: add claim {} -> {}".format(claimObj["key"], claimObj["value"])) logger.debug("add claim {} -> {}".format(claimObj[0], claimObj[1]))
if claimObj["key"] in claims: if claimObj[0] in claims:
if isinstance(claims[claimObj["key"]], list): if isinstance(claims[claimObj[0]], list):
claims[claimObj["key"]].append(claimObj["value"]) claims[claimObj[0]].append(claimObj[1])
else: else:
claims[claimObj["key"]] = [ claims[claimObj["key"]] ] claims[claimObj[0]] = [ claims[claimObj[0]] ]
claims[claimObj["key"]].append(claimObj["value"]) claims[claimObj[0]].append(claimObj[1])
else: else:
claims[claimObj["key"]] = claimObj["value"] claims[claimObj[0]] = claimObj[1]
userEntry = UserEntry(id=userId, login=login, expiry=resObj["expiry"], claims=claims) userEntry = UserEntry(id=userObj[0], login=login, pwhash=userObj[1], expiry=userObj[2], claims=claims)
return userEntry, resObj["pwhash"] return userEntry
except mariadb.Error as err: except psycopg2.Error as err:
raise Exception("Error when connecting to database: {}".format(err))
finally:
if conn:
conn.close()
def getRefreshTokenFromDB(application, login):
conn = None
cur = None
try:
conn = psycopg2.connect(user = DB_USER, password = DB_PASS,
host = DB_HOST, database = DB_NAME)
conn.autocommit = False
with conn:
with conn.cursor() as cur:
cur.execute('SELECT u.id, u.expiry, a.name FROM user_t u, application_t a, user_application_mapping_t m' +
' WHERE u.login = %s AND ' +
' a.name = %s AND ' +
' a.id = m.application AND u.id = m."user"',
(login, application))
userObj = cur.fetchone()
logger.debug("userObj: {}".format(userObj))
if not userObj:
raise NoUserException()
invObj = cur.fetchone()
if invObj:
raise ManyUsersException()
with conn.cursor() as cur:
salt = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(64))
cur.execute('INSERT INTO token_t ("user", salt, expiry) VALUES (%s, %s, %s) RETURNING id',
(userObj[0], salt, userObj[1]))
tokenObj = cur.fetchone()
logger.debug("tokenObj: {}".format(tokenObj))
if not tokenObj:
raise NoTokenException()
invObj = cur.fetchone()
if invObj:
raise ManyTokensException()
refreshTokenEntry = RefreshTokenEntry(id=tokenObj[0], salt=salt, login=login, app=userObj[2], expiry=userObj[1])
return refreshTokenEntry
except psycopg2.Error as err:
raise Exception("Error when connecting to database: {}".format(err)) raise Exception("Error when connecting to database: {}".format(err))
finally: finally:
if cur:
cur.close()
if conn: if conn:
conn.rollback()
conn.close() conn.close()
def getUserEntry(application, login, password): def getUserEntry(application, login, password):
userEntry, pwhash = getUserEntryFromDB(application, login) userEntry = getUserEntryFromDB(application, login)
if pwhash != crypt(password, pwhash): if userEntry.pwhash != crypt(password, userEntry.pwhash):
raise PasswordMismatchException() raise PasswordMismatchException()
return userEntry return userEntry
def generateToken(**args): def generateToken(func, **args):
try: try:
body = args["body"] body = args["body"]
application = ""
login = ""
password = ""
if (("application" in body) and
("login" in body) and
("password" in body)):
application = body["application"] application = body["application"]
login = body["login"] login = body["login"]
password = body["password"] password = body["password"]
elif ("encAleTuple" in body):
clearContent = jwe.decrypt(body["encAleTuple"], JWT_PRIV_KEY)
clearObj = json.loads(clearContent)
application = clearObj["application"]
login = clearObj["login"]
password = clearObj["password"]
else:
raise KeyError("Neither application, login and password nor encAleTuple given")
userEntry = getUserEntry(application, login, password) logger.debug(f"Tuple: {application} {login} {password}")
return func(application, login, password)
except NoTokenException:
logger.error("no token created")
raise werkzeug.exceptions.Unauthorized()
except ManyTokensException:
logger.error("too many tokens created")
raise werkzeug.exceptions.Unauthorized()
except NoUserException:
logger.error("no user found, login or application wrong")
raise werkzeug.exceptions.Unauthorized()
except ManyUsersException:
logger.error("too many users found")
raise werkzeug.exceptions.Unauthorized()
except PasswordMismatchException:
logger.error("wrong password")
raise werkzeug.exceptions.Unauthorized()
except KeyError:
logger.error("application, login or password missing")
raise werkzeug.exceptions.Unauthorized()
except Exception as e:
logger.error("unspecific exception: {}".format(str(e)))
raise werkzeug.exceptions.Unauthorized()
def _makeSimpleToken(application, login, password, refresh=False):
userEntry = getUserEntry(application, login, password) if not refresh else getUserEntryFromDB(application, login)
timestamp = int(time.time()) timestamp = int(time.time())
payload = { payload = {
@ -116,31 +229,135 @@ def generateToken(**args):
"sub": str(userEntry.id), "sub": str(userEntry.id),
"aud": application "aud": application
} }
logger.debug("claims: {}".format(userEntry.claims))
for claim in userEntry.claims.items(): for claim in userEntry.claims.items():
# print("DEBUG: generateToken: add claim {} -> {}".format(claim[0], claim[1])) logger.debug("add claim {}".format(claim))
payload[claim[0]] = claim[1] payload[claim[0]] = claim[1]
return jwt.encode(payload, JWT_PRIV_KEY, algorithm='RS256') return jwt.encode(payload, JWT_PRIV_KEY, algorithm='RS256')
except NoUserException:
print("ERROR: generateToken: no user found, login or application wrong")
raise werkzeug.exceptions.Unauthorized()
except ManyUsersException:
print("ERROR: generateToken: too many users found")
raise werkzeug.exceptions.Unauthorized()
except PasswordMismatchException:
print("ERROR: generateToken: wrong password")
raise werkzeug.exceptions.Unauthorized()
except KeyError:
print("ERROR: generateToken: application, login or password missing")
raise werkzeug.exceptions.Unauthorized()
except Exception as e:
print("ERROR: generateToken: unspecific exception: {}".format(str(e)))
raise werkzeug.exceptions.Unauthorized()
def generateTokenFromEnc(**args): def _makeRefreshToken(application, login, password):
cryptContent = args["body"] refreshTokenEntry = getRefreshTokenFromDB(application, login)
raise werkzeug.exceptions.NotImplemented("Stay tuned, will be added soon")
return str(cryptContent) timestamp = int(time.time())
payload = {
"iss": JWT_ISSUER,
"iat": int(timestamp),
"exp": int(timestamp + refreshTokenEntry.expiry),
"sub": str(refreshTokenEntry.login),
"xap": str(refreshTokenEntry.app),
"xid": str(refreshTokenEntry.id),
"xal": str(refreshTokenEntry.salt)
}
return jwt.encode(payload, JWT_PRIV_KEY, algorithm='RS256')
def _makeRefreshableTokens(application, login, password):
authToken = _makeSimpleToken(application, login, password)
refreshToken = _makeRefreshToken(application, login, password)
return {
"authToken": authToken,
"refreshToken": refreshToken
}
def generateSimpleToken(**args):
return generateToken(_makeSimpleToken, **args)
def generateRefreshableTokens(**args):
return generateToken(_makeRefreshableTokens, **args)
def getPubKey(): def getPubKey():
return JWT_PUB_KEY return JWT_PUB_KEY
def decodeToken(token):
try:
return jwt.decode(token, JWT_PUB_KEY, audience="test")
except JWTError as e:
logger.error("{}".format(e))
raise werkzeug.exceptions.Unauthorized()
def testToken(user, token_info):
return {
"message": f"You are user_id {user} and the provided token has been signed by this issuers. Fine.",
"details": token_info
}
def checkAndInvalidateRefreshToken(login, xid, xal):
conn = None
cur = None
try:
conn = psycopg2.connect(user = DB_USER, password = DB_PASS,
host = DB_HOST, database = DB_NAME)
conn.autocommit = False
with conn:
with conn.cursor() as cur:
cur.execute('SELECT t.id FROM token_t t, user_t u' +
' WHERE t.id = %s AND ' +
' t.salt = %s AND ' +
' t."user" = u.id AND ' +
' u.login = %s AND ' +
' t.valid = true',
(xid, xal, login))
tokenObj = cur.fetchone()
logger.debug("tokenObj: {}".format(tokenObj))
if not tokenObj:
raise NoValidTokenException()
invObj = cur.fetchone()
if invObj:
raise ManyTokensException()
with conn.cursor() as cur:
cur.execute('UPDATE token_t SET valid = false WHERE id = %s',
[ xid ])
except psycopg2.Error as err:
raise Exception("Error when connecting to database: {}".format(err))
finally:
if conn:
conn.close()
def refreshTokens(**args):
try:
refreshToken = args["body"]
refreshTokenObj = jwt.decode(refreshToken, JWT_PUB_KEY)
logger.info(str(refreshTokenObj))
checkAndInvalidateRefreshToken(refreshTokenObj["sub"], refreshTokenObj["xid"], refreshTokenObj["xal"])
authToken = _makeSimpleToken(refreshTokenObj["xap"], refreshTokenObj["sub"], "", refresh=True)
refreshToken = _makeRefreshToken(refreshTokenObj["xap"], refreshTokenObj["sub"], "")
return {
"authToken": authToken,
"refreshToken": refreshToken
}
except JWTError as e:
logger.error("jwt.decode failed: {}".format(e))
raise werkzeug.exceptions.Unauthorized()
except NoTokenException:
logger.error("no token created")
raise werkzeug.exceptions.Unauthorized()
except NoValidTokenException:
logger.error("no valid token found")
raise werkzeug.exceptions.Unauthorized()
except ManyTokensException:
logger.error("too many tokens created/found")
raise werkzeug.exceptions.Unauthorized()
except NoUserException:
logger.error("no user found, login or application wrong")
raise werkzeug.exceptions.Unauthorized()
except ManyUsersException:
logger.error("too many users found")
raise werkzeug.exceptions.Unauthorized()
except PasswordMismatchException:
logger.error("wrong password")
raise werkzeug.exceptions.Unauthorized()
except KeyError:
logger.error("application, login or password missing")
raise werkzeug.exceptions.Unauthorized()
#except Exception as e:
# logger.error("unspecific exception: {}".format(str(e)))
# raise werkzeug.exceptions.Unauthorized()

View File

@ -1,8 +1,8 @@
#!/bin/bash #!/bin/bash
IMAGE_NAME="registry.hottis.de/wolutator/authservice" IMAGE_NAME="registry.hottis.de/wolutator/authservice"
VERSION=0.0.1 VERSION=0.3.x
docker build -t ${IMAGE_NAME}:${VERSION} . docker build -t ${IMAGE_NAME}:${VERSION} .
docker push ${IMAGE_NAME}:${VERSION} # docker push ${IMAGE_NAME}:${VERSION}

View File

@ -1,111 +1,69 @@
CREATE DATABASE `authservice`; create sequence application_s start with 1 increment by 1;
USE `authservice`; create table application_t (
id integer primary key not null default nextval('application_s'),
name varchar(128) not null unique
);
CREATE TABLE `applications` ( create sequence user_s start with 1 increment by 1;
`id` int(10) unsigned NOT NULL AUTO_INCREMENT, create table user_t (
`name` varchar(128) NOT NULL, id integer primary key not null default nextval('user_s'),
CONSTRAINT PRIMARY KEY (`id`), login varchar(64) not null unique,
CONSTRAINT UNIQUE KEY `uk_applications_name` (`name`) pwhash varchar(64) not null,
) ENGINE=InnoDB; expiry integer not null default 600
);
CREATE TABLE `users` ( create sequence token_s start with 1 increment by 1;
`id` int(10) unsigned NOT NULL AUTO_INCREMENT, create table token_t (
`login` varchar(64) NOT NULL, id integer primary key not null default nextval('token_s'),
`pwhash` varchar(64) NOT NULL, "user" integer not null references user_t (id),
`expiry` int(10) unsigned NOT NULL DEFAULT 600, salt varchar(64) not null,
CONSTRAINT PRIMARY KEY (`id`), valid boolean not null default true
CONSTRAINT UNIQUE KEY `uk_users_login` (`login`) );
) ENGINE=InnoDB;
CREATE TABLE `claims` ( create sequence claim_s start with 1 increment by 1;
`id` int(10) unsigned NOT NULL AUTO_INCREMENT, create table claim_t (
`key` varchar(64) NOT NULL, id integer primary key not null default nextval('claim_s'),
`value` varchar(1024) NOT NULL, key varchar(64) not null,
CONSTRAINT PRIMARY KEY (`id`), value varchar(64) not null,
CONSTRAINT UNIQUE KEY `uk_claims_key_value` (`key`, `value`) application integer not null references application(id),
) ENGINE=InnoDB; unique (key, value)
);
CREATE TABLE `user_claims_mapping` ( create table user_claim_mapping_t (
`user` int(10) unsigned NOT NULL, "user" integer not null references user_t(id),
`claim` int(10) unsigned NOT NULL, claim integer not null references claim_t(id),
CONSTRAINT UNIQUE KEY `uk_user_claims_mapping` (`user`, `claim` ), unique ("user", claim)
CONSTRAINT FOREIGN KEY `fk_user_claims_mapping_user` (`user`) );
REFERENCES `users`(`id`),
CONSTRAINT FOREIGN KEY `fk_user_claims_mapping_claim` (`claim`)
REFERENCES `claims`(`id`)
) ENGINE=InnoDB;
CREATE TABLE `user_applications_mapping` ( create table user_application_mapping_t (
`user` int(10) unsigned NOT NULL, "user" integer not null references user_t(id),
`application` int(10) unsigned NOT NULL, application integer not null references application_t(id),
CONSTRAINT UNIQUE KEY `uk_user_applications_mapping` (`user`, `application` ), unique ("user", application)
CONSTRAINT FOREIGN KEY `fk_user_applications_mapping_user` (`user`) );
REFERENCES `users`(`id`),
CONSTRAINT FOREIGN KEY `fk_user_applications_mapping_application` (`application`)
REFERENCES `applications`(`id`)
) ENGINE=InnoDB;
CREATE OR REPLACE VIEW claims_for_user AS create or replace view claims_for_user_v as
SELECT u.id AS user, select u.id as "user",
c.`key` AS `key`, a.name as application,
c.`value` AS `value` c.key as key,
FROM users u, c.value as value
claims c, from user_t u,
user_claims_mapping m claim_t c,
WHERE m.user = u.id AND user_claim_mapping_t m,
m.claim = c.id; application_t a
where m.user = u.id and
m.claim = c.id and
a.id = c.application;
CREATE OR REPLACE VIEW user_application AS create or replace view user_application_v as
SELECT u.login AS login, select u.login as login,
u.pwhash AS pwhash, u.pwhash as pwhash,
u.id AS id, u.id as id,
u.expiry AS expiry, u.expiry as expiry,
a.name as application a.name as application
FROM users u, from user_t u,
applications a, application_t a,
user_applications_mapping m user_application_mapping_t m
WHERE u.id = m.user AND where u.id = m.user and
a.id = m.application; a.id = m.application;
CREATE USER 'authservice-ui'@'%' IDENTIFIED BY 'test123';
GRANT SELECT ON `user_application` TO 'authservice-ui'@'%';
GRANT SELECT ON `claims_for_user` TO 'authservice-ui'@'%';
CREATE USER 'authservice-cli'@'%' IDENTIFIED BY 'test123';
GRANT INSERT ON `users` TO 'authservice-cli'@'%';
GRANT INSERT ON `user_applications_mapping` TO 'authservice-cli'@'%';
FLUSH PRIVILEGES;
INSERT INTO `applications` (`name`) VALUES ('hv');
INSERT INTO `claims` (`key`, `value`) VALUES ('accesslevel', 'r');
INSERT INTO `claims` (`key`, `value`) VALUES ('accesslevel', 'rw');
-- password is 'test123'
INSERT INTO `users` (`login`, `pwhash`) VALUES ('wn', '$p5k2$186a0$dJXL0AjF$0HualDF92nyilDXPgSbaUn/UpFzSrpPx');
INSERT INTO `user_applications_mapping` (`user`, `application`)
VALUES(
(SELECT `id` FROM `users` WHERE `login` = 'wn'),
(SELECT `id` FROM `applications` WHERE `name` = 'hv')
);
INSERT INTO `user_claims_mapping` (`user`, `claim`)
VALUES(
(SELECT `id` FROM `users` WHERE `login` = 'wn'),
(SELECT `id` FROM `claims` WHERE `key` = 'accesslevel' AND `value` = 'rw')
);
-- password is 'geheim'
INSERT INTO `users` (`login`, `pwhash`) VALUES ('gregor', '$p5k2$186a0$Tcwps8Ar$TsypGB.y1dCB9pWOPz2X2SsxYqrTn3Fv');
INSERT INTO `user_applications_mapping` (`user`, `application`)
VALUES(
(SELECT `id` FROM `users` WHERE `login` = 'gregor'),
(SELECT `id` FROM `applications` WHERE `name` = 'hv')
);
INSERT INTO `user_claims_mapping` (`user`, `claim`)
VALUES(
(SELECT `id` FROM `users` WHERE `login` = 'gregor'),
(SELECT `id` FROM `claims` WHERE `key` = 'accesslevel' AND `value` = 'rw')
);

View File

@ -4,16 +4,18 @@ info:
version: "0.1" version: "0.1"
paths: paths:
/auth: /token:
post: post:
tags: [ "JWT" ] tags: [ "JWT" ]
summary: Accept login and password, return JWT token summary: Accepts encrypted or clear set of credentials, returns JWT token
operationId: auth.generateToken operationId: auth.generateSimpleToken
requestBody: requestBody:
content: content:
'application/json': 'application/json':
schema: schema:
$ref: '#/components/schemas/User' anyOf:
- $ref: '#/components/schemas/User'
- $ref: '#/components/schemas/EncUser'
responses: responses:
'200': '200':
description: JWT token description: JWT token
@ -21,35 +23,54 @@ paths:
'text/plain': 'text/plain':
schema: schema:
type: string type: string
/authe: /refreshable:
post: post:
tags: [ "JWT" ] tags: [ "JWT" ]
summary: Accept encrypted set of credentials, return JWT token summary: Accepts encrypted or clear set of credentials, returns tuple of AuthToken and RefreshToken
operationId: auth.generateTokenFromEnc operationId: auth.generateRefreshableTokens
requestBody:
content:
'application/json':
schema:
anyOf:
- $ref: '#/components/schemas/User'
- $ref: '#/components/schemas/EncUser'
responses:
'200':
description: Token tuple
content:
'application/json':
schema:
$ref: '#/components/schemas/TokenTuple'
/refresh:
post:
tags: [ "JWT" ]
summary: Accepts refresh token, returns tuple of AuthToken and RefreshToken
operationId: auth.refreshTokens
requestBody: requestBody:
content: content:
'text/plain': 'text/plain':
schema: schema:
type: string $ref: '#/components/schemas/RefreshToken'
responses: responses:
'200': '200':
description: JWT token description: Token tuple
content: content:
'text/plain': 'application/json':
schema: schema:
type: string $ref: '#/components/schemas/TokenTuple'
/secret: /test:
get: get:
tags: [ "JWT" ] tags: [ "Test" ]
summary: Return secret string summary: Return secret string
operationId: test.getSecret operationId: auth.testToken
responses: responses:
'200': '200':
description: secret response description: secret response
content: content:
'text/plain': 'application/json':
schema: schema:
type: string $ref: '#/components/schemas/TestOutput'
security: security:
- jwt: ['secret'] - jwt: ['secret']
/pubkey: /pubkey:
@ -72,7 +93,7 @@ components:
type: http type: http
scheme: bearer scheme: bearer
bearerFormat: JWT bearerFormat: JWT
x-bearerInfoFunc: test.decodeToken x-bearerInfoFunc: auth.decodeToken
schemas: schemas:
User: User:
description: Application/Login/Password tuple description: Application/Login/Password tuple
@ -84,3 +105,32 @@ components:
type: string type: string
password: password:
type: string type: string
EncUser:
description: Encrypted Application/Login/Password tuple
type: object
properties:
encAleTuple:
type: string
TestOutput:
description: Test Output
type: object
properties:
message:
type: string
details:
type: object
AuthToken:
description: Token for authentication purposes, just a string
type: string
RefreshToken:
description: Token for refresh purposes, just a string
type: string
TokenTuple:
description: Test Output
type: object
properties:
authToken:
$ref: '#/components/schemas/AuthToken'
refreshToken:
$ref: '#/components/schemas/RefreshToken'

24
test.py
View File

@ -1,20 +1,8 @@
from jose import JWTError, jwt import connexion
import os import logging
import werkzeug
logging.basicConfig(level=logging.DEBUG)
JWT_SECRET = os.environ['JWT_SECRET'] app = connexion.App('authservice')
app.add_api('./openapi.yaml')
def decodeToken(token): app.run(port=8080)
try:
return jwt.decode(token, JWT_SECRET)
except JWTError as e:
print("ERROR: decodeToken: {}".format(e))
raise werkzeug.exceptions.Unauthorized()
def getSecret(user, token_info):
return '''
You are user_id {user} and the secret is 'wbevuec'.
Decoded token claims: {token_info}.
'''.format(user=user, token_info=token_info)

View File

@ -1,9 +1,28 @@
import unittest
from jose import jwe from jose import jwe
import os
import json
JWT_PUB_KEY = os.environ["JWT_PUB_KEY"] JWT_PUB_KEY = os.environ["JWT_PUB_KEY"]
JWT_PRIV_KEY = os.environ["JWT_PRIV_KEY"]
class JweTestMethods(unittest.TestCase):
def test_encryptDecrypt(self):
inObj = {"application":"hv2", "login":"wn", "password":"joshua"}
plainText = json.dumps(inObj)
plainText = "BlaBlaBla123"
cryptText = jwe.encrypt(plainText, JWT_PUB_KEY, "A256GCM", "RSA-OAEP") cryptText = jwe.encrypt(plainText, JWT_PUB_KEY, "A256GCM", "RSA-OAEP")
print(cryptText) print(cryptText)
clearText = jwe.decrypt(cryptText, JWT_PRIV_KEY)
print(clearText)
outObj = json.loads(clearText)
print(outObj)
self.assertEqual(outObj, inObj)
if __name__ == '__main__':
unittest.main()