diff --git a/src/sbom_dt_dd.py b/src/sbom_dt_dd.py new file mode 100644 index 0000000..058ce2c --- /dev/null +++ b/src/sbom_dt_dd.py @@ -0,0 +1,174 @@ +import os +import sys + +from loguru import logger +import json + +import datetime +from dateutil.relativedelta import relativedelta + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'defectdojo-client')) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'dependencytrack-client')) + +import defectdojo_api +from defectdojo_api.rest import ApiException as DefectDojoApiException + +import dependencytrack_api +from dependencytrack_api.rest import ApiException as DependencyTrackApiException + +class ApiException(Exception): + def __init__(self, status, reason, body, data, headers): + self.status = status + self.reason = reason + self.body = body + self.data = data + self.headers = None + +class ApiCallExecutor: + def __init__(self, verbose): + self.verbose = verbose + + def innerExecuteApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams): + logger.info(f"Calling {ApiClass=}.{EndpointMethod=} with {RequestClass=})") + if self.verbose: + logger.debug(f"{additionalParams=}, {requestParams=}") + instance = ApiClass(self) + if RequestClass: + request = RequestClass(**requestParams) + response = EndpointMethod(instance, *additionalParams, request) + else: + response = EndpointMethod(instance, *additionalParams) + logger.info(f"Response is {response}") + return response + +class DefectDojoApiClient(defectdojo_api.ApiClient, ApiCallExecutor): + def __init__(self, config, verbose): + defectdojo_api.ApiClient.__init__(self, config) + ApiCallExecutor.__init__(self, verbose) + + def executeApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams): + try: + return self.innerExecuteApiCall(ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams) + except defectdojo_api.exceptions.ApiException as e: + raise ApiException(e.status, e.reason, e.body, e.data, e.headers) + +class DependencyTrackApiClient(dependencytrack_api.ApiClient, ApiCallExecutor): + def __init__(self, config, verbose): + dependencytrack_api.ApiClient.__init__(self, config) + ApiCallExecutor.__init__(self, verbose) + + def executeApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams): + try: + return self.innerExecuteApiCall(ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams) + except dependencytrack_api.exceptions.ApiException as e: + raise ApiException(e.status, e.reason, e.body, e.data, e.headers) + + + +def generateSBOM(target='.', name='dummyName', version='0.0.0'): + try: + result = subprocess.run( + ["syft", "scan", target, "-o", "cyclonedx-json", "--source-name", name, "--source-version", version], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + sbom = json.loads(result.stdout) + return sbom + except subprocess.CalledProcessError as e: + logger.error(f"SBOM scanner failed: {e.stderr}") + raise MyLocalException(e) + + + +def loadToDTrackAndDefectDojo(config, projectName, projectVersion, projectClassifier, projectDescription, productType, sbom, reImport): + # ------- create product and engagement in DefectDojo ------- + if not reImport: + # in case of a reimport no modification on DefectDojo are required + defectdojo_configuration = defectdojo_api.Configuration( + host = config['DEFECTDOJO_URL'] + ) + defectdojo_configuration.api_key['tokenAuth'] = config['DEFECTDOJO_TOKEN'] + defectdojo_configuration.api_key_prefix['tokenAuth'] = 'Token' + + with DefectDojoApiClient(defectdojo_configuration, config['VERBOSE']) as client: + print("Create product in DefectDojo") + productName = f"{projectName}:{projectVersion}" + product_response = \ + client.executeApiCall( + defectdojo_api.ProductsApi, + defectdojo_api.ProductsApi.products_create, + defectdojo_api.ProductRequest, + { 'name': productName, 'description': projectDescription, 'prod_type': productType }, + [] + ) + + product_id = product_response.id + print(f"{product_id=}") + + print("Create engagement in DefectDojo") + start_time = datetime.date.today() + end_time = start_time + relativedelta(years=10) + engagementName = f"{productName} DTrack Link" + engagement_response = \ + client.executeApiCall( + defectdojo_api.EngagementsApi, + defectdojo_api.EngagementsApi.engagements_create, + defectdojo_api.EngagementRequest, + { 'name': engagementName, 'target_start': start_time, 'target_end': end_time, 'status': 'In Progress', 'product': product_id }, + [] + ) + + engagement_id = engagement_response.id + print(f"{engagement_id=}") + + +# ------- create project in DependencyTrack, connect project to engagement in DefectDojo, upload SBOM -------- + dependencytrack_configuration = dependencytrack_api.Configuration( + host = f"{config['DTRACK_API_URL']}/api" + ) + dependencytrack_configuration.debug = False + dependencytrack_configuration.api_key['ApiKeyAuth'] = config['DTRACK_TOKEN'] + + with DependencyTrackApiClient(dependencytrack_configuration, config['VERBOSE']) as client: + if not reImport: + # in case of a reimport it is not necessary to create the project + project_response = \ + client.executeApiCall( + dependencytrack_api.ProjectApi, + dependencytrack_api.ProjectApi.create_project, + dependencytrack_api.Project, + { 'name': projectName, 'version': projectVersion, 'classifier': projectClassifier, 'uuid': "", 'last_bom_import': 0 }, + [] + ) + + project_uuid = project_response.uuid + print(f"{project_uuid=}") + + properties = [ + { 'group_name': "integrations", 'property_name': "defectdojo.engagementId", + 'property_value': str(engagement_id), 'property_type': "STRING" }, + { 'group_name': "integrations", 'property_name': "defectdojo.doNotReactivate", + 'property_value': "true", 'property_type': "BOOLEAN" }, + { 'group_name': "integrations", 'property_name': "defectdojo.reimport", + 'property_value': "true", 'property_type': "BOOLEAN" } + ] + for property in properties: + client.executeApiCall( + dependencytrack_api.ProjectPropertyApi, + dependencytrack_api.ProjectPropertyApi.create_property1, + dependencytrack_api.ProjectProperty, + property, + [ project_uuid ] + ) + + bom_response = \ + client.executeApiCall( + dependencytrack_api.BomApi, + dependencytrack_api.BomApi.upload_bom, + None, + None, + [ None, False, projectName, projectVersion, None, None, None, None, True, sbom ] + ) + diff --git a/src/sbom_dt_dd_api.py b/src/sbom_dt_dd_api.py new file mode 100644 index 0000000..f47d471 --- /dev/null +++ b/src/sbom_dt_dd_api.py @@ -0,0 +1,73 @@ +import os +from loguru import logger +from fastapi import FastAPI, UploadFile, File, Form, HTTPException +from fastapi.responses import JSONResponse +from converter import minimalSbomFormatConverter +from sbom_dt_dd import generateSBOM, loadToDTrackAndDefectDojo, ApiException + +app = FastAPI( + title="SBOM DTrack DefectDojo Synchronization API", + version="0.0.1", + description="" +) + +config = {} +try: + config['DTRACK_API_URL'] = os.environ["DTRACK_API_URL"] + config['DTRACK_TOKEN'] = os.environ["DTRACK_TOKEN"] + config['DEFECTDOJO_URL'] = os.environ["DEFECTDOJO_URL"] + config['DEFECTDOJO_TOKEN'] = os.environ["DEFECTDOJO_TOKEN"] + config['VERBOSE'] = True +except KeyError as e: + raise Exception(f"Env variable {e} is shall be set") + +app.state.config = config + + +@app.get("/hello") +async def say_hello(name: str): + """ + Returns a friendly greeting. + --- + parameters: + - name: name + in: query + required: true + schema: + type: string + responses: + 200: + description: Successful Response + content: + application/json: + schema: + type: object + properties: + message: + type: string + """ + return JSONResponse(content={"message": f"Hello, {name}!"}) + +@app.post("/uploadMinimalSBOM/") +async def uploadMinimalSBOM( + file: UploadFile = File(...), + reimport: bool = Form(...) +): + """ + Endpoint to upload a minimal SBOM definition + """ + sbom = await file.read() + + try: + logger.info("Start converting from minimal format into cyclonedx") + (sbom, projectName, projectVersion, projectClassifier, projectDescription) = minimalSbomFormatConverter(sbom) + logger.info("Converted") + + loadToDTrackAndDefectDojo(app.state.config, projectName, projectVersion, projectClassifier, projectDescription, 1, sbom, reimport) + logger.info("Done.") + except ApiException as e: + raise HTTPException(status_code=e.status, detail=f"{e.reason=}, {e.body=}, {e.data=}") + + return JSONResponse(content={ + "message": "Upload successful!" + }) diff --git a/src/sbom_dt_dd_cli.py b/src/sbom_dt_dd_cli.py new file mode 100644 index 0000000..aa88439 --- /dev/null +++ b/src/sbom_dt_dd_cli.py @@ -0,0 +1,133 @@ +import os +import sys +from loguru import logger +import argparse +import subprocess +import json + + + +from converter import minimalSbomFormatConverter +from sbom_dt_dd import generateSBOM, loadToDTrackAndDefectDojo + + + +# ---- main starts here with preparation of config ----------------------------------------------------------------------- + + + +parser = argparse.ArgumentParser(description='sbom-dt-dd glue logic') +parser.add_argument('--name', '-n', + help='Project Name', + required=False, + default=''), +parser.add_argument('--version', '-v', + help='Project Version', + required=False, + default='') +parser.add_argument('--description', '-d', + help='Project Description', + required=False, + default='') +parser.add_argument('--type', '-t', + help='Product Type from DefectDojo', + type=int, + required=True) +parser.add_argument('--classifier', '-c', + help='Project Classifier from DependencyTrack', + choices=['APPLICATION', 'FRAMEWORK', 'LIBRARY', 'CONTAINER', 'OPERATING_SYSTEM', 'DEVICE', + 'FIRMWARE', 'FILE', 'PLATFORM', 'DEVICE_DRIVER', 'MACHINE_LEARNING_MODEL', 'DATA'], + required=False, + default='') +parser.add_argument('--uploadsbom', '-U', + help='Upload a already existing SBOM instead of generating it. Give the SBOM file at -F instead of a target', + required=False, + action='store_true', + default=False) +parser.add_argument('--sbomfile', '-F', + help='Filename of existing SBOM file to upload, use together with -U, do not use together with -T', + required=False) +parser.add_argument('--minimalsbomformat', '-K', + help='SBOM file comes in dedicated minimal format and will be converted into cyclonedx before uploading', + action='store_true', + default=False) +parser.add_argument('--overwritemetadata', '-O', + help='Overwrite name, version, description and classifier with data from minimal SBOM', + action='store_true', + default=False) +parser.add_argument('--target', '-T', + help='Target to scan, either path name for sources or docker image tag', + required=False) +parser.add_argument('--reimport', '-R', + help='Import the SBOM for an existing project/product once again', + required=False, + action='store_true', + default=False) +parser.add_argument('--verbose', '-V', + help='A lot of debug output', + required=False, + action='store_true', + default=False) +args = parser.parse_args() +projectName = args.name +projectVersion = args.version +projectDescription = args.description +productType = args.type +projectClassifier = args.classifier +reImport = args.reimport + +uploadSbomFlag = args.uploadsbom +if uploadSbomFlag: + sbomFileName = args.sbomfile + minimalSbomFormat = args.minimalsbomformat +else: + target = args.target + +if minimalSbomFormat: + overwriteMetadata = args.overwritemetadata + +if not overwriteMetadata and not (projectName and projectVersion and projectClassifier and projectDescription): + raise MyLocalException("If overwriteMetadata is not selected, projectName, projectVersion, projectClassifier and projectDescription must be set.") + + +CONFIG = {} +try: + CONFIG['DTRACK_API_URL'] = os.environ["DTRACK_API_URL"] + CONFIG['DTRACK_TOKEN'] = os.environ["DTRACK_TOKEN"] + CONFIG['DEFECTDOJO_URL'] = os.environ["DEFECTDOJO_URL"] + CONFIG['DEFECTDOJO_TOKEN'] = os.environ["DEFECTDOJO_TOKEN"] +except KeyError as e: + raise Exception(f"Env variable {e} is shall be set") + +CONFIG['VERBOSE'] = args.verbose + +# ---- main starts here -------------------------------------------------------------------------------------------------- + +if uploadSbomFlag: + # ------- read uploaded SBOM ------------- + logger.info(f"Reading SBOM from file {sbomFileName}") + with open(sbomFileName, 'r') as sbomFile: + sbom = sbomFile.read() + logger.info("SBOM file read.") + if minimalSbomFormat: + logger.info("Start converting from minimal format into cyclonedx") + (sbom, nameFromMinimalSbom, versionFromMinimalSbom, classifierFromMinimalSbom, descriptionFromMinimalSbom) = minimalSbomFormatConverter(sbom) + logger.info("Converted") + if overwriteMetadata: + projectName = nameFromMinimalSbom + projectVersion = versionFromMinimalSbom + projectClassifier = classifierFromMinimalSbom + projectDescription = descriptionFromMinimalSbom + logger.info("Done.") +else: + # ------- generate SBOM ------------ + logger.info(f"Generating SBOM for {target}") + sbomJson = generateSBOM(target, projectName, projectVersion) + sbom = json.dumps(sbomJson) + logger.info("Done.") + + + +loadToDTrackAndDefectDojo(CONFIG, projectName, projectVersion, projectClassifier, projectDescription, productType, sbom, reImport) + + diff --git a/src/server.sh b/src/server.sh new file mode 100755 index 0000000..5f30c60 --- /dev/null +++ b/src/server.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./.venv/bin/gunicorn sbom_dt_dd_api:app -k uvicorn.workers.UvicornWorker -w 4 -b 0.0.0.0:8000