From 955937dea716f5e9fd41afdf95d2420bc3829290 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Wed, 30 Jul 2025 09:08:33 +0200 Subject: [PATCH] code styling --- src/prepare-local-env.sh | 2 + src/sbom_dt_dd.py | 189 +++++++++++++++++++++++++------------ src/sbom_dt_dd_cli.py | 195 ++++++++++++++++++++++++--------------- 3 files changed, 249 insertions(+), 137 deletions(-) diff --git a/src/prepare-local-env.sh b/src/prepare-local-env.sh index 11ecf06..56d8c0f 100755 --- a/src/prepare-local-env.sh +++ b/src/prepare-local-env.sh @@ -45,3 +45,5 @@ python3 -m venv .venv pip install -r requirements.txt pip install -r $LOCALLBIS/dependencytrack-client/requirements.txt pip install -r $LOCALLBIS/defectdojo-client/requirements.txt + + diff --git a/src/sbom_dt_dd.py b/src/sbom_dt_dd.py index 58217ad..5b5139e 100644 --- a/src/sbom_dt_dd.py +++ b/src/sbom_dt_dd.py @@ -1,5 +1,6 @@ import os import sys +import subprocess from loguru import logger import json @@ -7,8 +8,8 @@ import json import datetime from dateutil.relativedelta import relativedelta -sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'defectdojo-client')) -sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'dependencytrack-client')) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "defectdojo-client")) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "dependencytrack-client")) import defectdojo_api from defectdojo_api.rest import ApiException as DefectDojoApiException @@ -16,6 +17,7 @@ from defectdojo_api.rest import ApiException as DefectDojoApiException import dependencytrack_api from dependencytrack_api.rest import ApiException as DependencyTrackApiException + class ApiException(Exception): def __init__(self, cause): self.cause = cause @@ -25,11 +27,14 @@ class ApiException(Exception): self.data = cause.data self.headers = cause.headers + class ApiCallExecutor: def __init__(self, verbose): self.verbose = verbose - def innerExecuteApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams): + def innerExecuteApiCall( + self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams + ): logger.info(f"Calling {ApiClass=}.{EndpointMethod=} with {RequestClass=})") if self.verbose: logger.debug(f"{additionalParams=}, {requestParams=}") @@ -42,38 +47,57 @@ class ApiCallExecutor: logger.info(f"Response is {response}") return response + class DefectDojoApiClient(defectdojo_api.ApiClient, ApiCallExecutor): def __init__(self, config, verbose): defectdojo_api.ApiClient.__init__(self, config) ApiCallExecutor.__init__(self, verbose) - def executeApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams): + def executeApiCall( + self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams + ): try: - return self.innerExecuteApiCall(ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams) + return self.innerExecuteApiCall( + ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams + ) except defectdojo_api.exceptions.ApiException as e: raise ApiException(e) + class DependencyTrackApiClient(dependencytrack_api.ApiClient, ApiCallExecutor): def __init__(self, config, verbose): dependencytrack_api.ApiClient.__init__(self, config) ApiCallExecutor.__init__(self, verbose) - def executeApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams): + def executeApiCall( + self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams + ): try: - return self.innerExecuteApiCall(ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams) + return self.innerExecuteApiCall( + ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams + ) except dependencytrack_api.exceptions.ApiException as e: raise ApiException(e) - -def generateSBOM(target='.', name='dummyName', version='0.0.0'): +def generateSBOM(target=".", name="dummyName", version="0.0.0"): try: result = subprocess.run( - ["syft", "scan", target, "-o", "cyclonedx-json", "--source-name", name, "--source-version", version], + [ + "syft", + "scan", + target, + "-o", + "cyclonedx-json@1.5", + "--source-name", + name, + "--source-version", + version, + ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - text=True + text=True, ) sbom = json.loads(result.stdout) return sbom @@ -82,28 +106,39 @@ def generateSBOM(target='.', name='dummyName', version='0.0.0'): raise MyLocalException(e) - -def loadToDTrackAndDefectDojo(config, projectName, projectVersion, projectClassifier, projectDescription, productType, sbom, reImport): +def loadToDTrackAndDefectDojo( + config, + projectName, + projectVersion, + projectClassifier, + projectDescription, + productType, + sbom, + reImport, +): # ------- create product and engagement in DefectDojo ------- if not reImport: # in case of a reimport no modification on DefectDojo are required defectdojo_configuration = defectdojo_api.Configuration( - host = config['DEFECTDOJO_URL'] + host=config["DEFECTDOJO_URL"] ) - defectdojo_configuration.api_key['tokenAuth'] = config['DEFECTDOJO_TOKEN'] - defectdojo_configuration.api_key_prefix['tokenAuth'] = 'Token' + defectdojo_configuration.api_key["tokenAuth"] = config["DEFECTDOJO_TOKEN"] + defectdojo_configuration.api_key_prefix["tokenAuth"] = "Token" - with DefectDojoApiClient(defectdojo_configuration, config['VERBOSE']) as client: + with DefectDojoApiClient(defectdojo_configuration, config["VERBOSE"]) as client: print("Create product in DefectDojo") productName = f"{projectName}:{projectVersion}" - product_response = \ - client.executeApiCall( - defectdojo_api.ProductsApi, - defectdojo_api.ProductsApi.products_create, - defectdojo_api.ProductRequest, - { 'name': productName, 'description': projectDescription, 'prod_type': productType }, - [] - ) + product_response = client.executeApiCall( + defectdojo_api.ProductsApi, + defectdojo_api.ProductsApi.products_create, + defectdojo_api.ProductRequest, + { + "name": productName, + "description": projectDescription, + "prod_type": productType, + }, + [], + ) product_id = product_response.id print(f"{product_id=}") @@ -112,48 +147,71 @@ def loadToDTrackAndDefectDojo(config, projectName, projectVersion, projectClassi start_time = datetime.date.today() end_time = start_time + relativedelta(years=10) engagementName = f"{productName} DTrack Link" - engagement_response = \ - client.executeApiCall( - defectdojo_api.EngagementsApi, - defectdojo_api.EngagementsApi.engagements_create, - defectdojo_api.EngagementRequest, - { 'name': engagementName, 'target_start': start_time, 'target_end': end_time, 'status': 'In Progress', 'product': product_id }, - [] - ) - + engagement_response = client.executeApiCall( + defectdojo_api.EngagementsApi, + defectdojo_api.EngagementsApi.engagements_create, + defectdojo_api.EngagementRequest, + { + "name": engagementName, + "target_start": start_time, + "target_end": end_time, + "status": "In Progress", + "product": product_id, + }, + [], + ) + engagement_id = engagement_response.id print(f"{engagement_id=}") - -# ------- create project in DependencyTrack, connect project to engagement in DefectDojo, upload SBOM -------- + # ------- create project in DependencyTrack, connect project to engagement in DefectDojo, upload SBOM -------- dependencytrack_configuration = dependencytrack_api.Configuration( - host = f"{config['DTRACK_API_URL']}/api" + host=f"{config['DTRACK_API_URL']}/api" ) dependencytrack_configuration.debug = False - dependencytrack_configuration.api_key['ApiKeyAuth'] = config['DTRACK_TOKEN'] + dependencytrack_configuration.api_key["ApiKeyAuth"] = config["DTRACK_TOKEN"] - with DependencyTrackApiClient(dependencytrack_configuration, config['VERBOSE']) as client: + with DependencyTrackApiClient( + dependencytrack_configuration, config["VERBOSE"] + ) as client: if not reImport: # in case of a reimport it is not necessary to create the project - project_response = \ - client.executeApiCall( - dependencytrack_api.ProjectApi, - dependencytrack_api.ProjectApi.create_project, - dependencytrack_api.Project, - { 'name': projectName, 'version': projectVersion, 'classifier': projectClassifier, 'uuid': "", 'last_bom_import': 0 }, - [] - ) + project_response = client.executeApiCall( + dependencytrack_api.ProjectApi, + dependencytrack_api.ProjectApi.create_project, + dependencytrack_api.Project, + { + "name": projectName, + "version": projectVersion, + "classifier": projectClassifier, + "uuid": "", + "last_bom_import": 0, + }, + [], + ) project_uuid = project_response.uuid print(f"{project_uuid=}") properties = [ - { 'group_name': "integrations", 'property_name': "defectdojo.engagementId", - 'property_value': str(engagement_id), 'property_type': "STRING" }, - { 'group_name': "integrations", 'property_name': "defectdojo.doNotReactivate", - 'property_value': "true", 'property_type': "BOOLEAN" }, - { 'group_name': "integrations", 'property_name': "defectdojo.reimport", - 'property_value': "true", 'property_type': "BOOLEAN" } + { + "group_name": "integrations", + "property_name": "defectdojo.engagementId", + "property_value": str(engagement_id), + "property_type": "STRING", + }, + { + "group_name": "integrations", + "property_name": "defectdojo.doNotReactivate", + "property_value": "true", + "property_type": "BOOLEAN", + }, + { + "group_name": "integrations", + "property_name": "defectdojo.reimport", + "property_value": "true", + "property_type": "BOOLEAN", + }, ] for property in properties: client.executeApiCall( @@ -161,15 +219,24 @@ def loadToDTrackAndDefectDojo(config, projectName, projectVersion, projectClassi dependencytrack_api.ProjectPropertyApi.create_property1, dependencytrack_api.ProjectProperty, property, - [ project_uuid ] + [project_uuid], ) - bom_response = \ - client.executeApiCall( - dependencytrack_api.BomApi, - dependencytrack_api.BomApi.upload_bom, + bom_response = client.executeApiCall( + dependencytrack_api.BomApi, + dependencytrack_api.BomApi.upload_bom, + None, + None, + [ + None, + False, + projectName, + projectVersion, None, None, - [ None, False, projectName, projectVersion, None, None, None, None, True, sbom ] - ) - + None, + None, + True, + sbom, + ], + ) diff --git a/src/sbom_dt_dd_cli.py b/src/sbom_dt_dd_cli.py index aa88439..fc31fd4 100644 --- a/src/sbom_dt_dd_cli.py +++ b/src/sbom_dt_dd_cli.py @@ -2,72 +2,102 @@ import os import sys from loguru import logger import argparse -import subprocess import json - from converter import minimalSbomFormatConverter from sbom_dt_dd import generateSBOM, loadToDTrackAndDefectDojo +class CliException(Exception): + pass + # ---- main starts here with preparation of config ----------------------------------------------------------------------- - -parser = argparse.ArgumentParser(description='sbom-dt-dd glue logic') -parser.add_argument('--name', '-n', - help='Project Name', - required=False, - default=''), -parser.add_argument('--version', '-v', - help='Project Version', - required=False, - default='') -parser.add_argument('--description', '-d', - help='Project Description', - required=False, - default='') -parser.add_argument('--type', '-t', - help='Product Type from DefectDojo', - type=int, - required=True) -parser.add_argument('--classifier', '-c', - help='Project Classifier from DependencyTrack', - choices=['APPLICATION', 'FRAMEWORK', 'LIBRARY', 'CONTAINER', 'OPERATING_SYSTEM', 'DEVICE', - 'FIRMWARE', 'FILE', 'PLATFORM', 'DEVICE_DRIVER', 'MACHINE_LEARNING_MODEL', 'DATA'], - required=False, - default='') -parser.add_argument('--uploadsbom', '-U', - help='Upload a already existing SBOM instead of generating it. Give the SBOM file at -F instead of a target', - required=False, - action='store_true', - default=False) -parser.add_argument('--sbomfile', '-F', - help='Filename of existing SBOM file to upload, use together with -U, do not use together with -T', - required=False) -parser.add_argument('--minimalsbomformat', '-K', - help='SBOM file comes in dedicated minimal format and will be converted into cyclonedx before uploading', - action='store_true', - default=False) -parser.add_argument('--overwritemetadata', '-O', - help='Overwrite name, version, description and classifier with data from minimal SBOM', - action='store_true', - default=False) -parser.add_argument('--target', '-T', - help='Target to scan, either path name for sources or docker image tag', - required=False) -parser.add_argument('--reimport', '-R', - help='Import the SBOM for an existing project/product once again', - required=False, - action='store_true', - default=False) -parser.add_argument('--verbose', '-V', - help='A lot of debug output', - required=False, - action='store_true', - default=False) +parser = argparse.ArgumentParser(description="sbom-dt-dd glue logic") +parser.add_argument("--name", "-n", help="Project Name", required=False, default="") +parser.add_argument( + "--version", "-v", help="Project Version", required=False, default="" +) +parser.add_argument( + "--description", "-d", help="Project Description", required=False, default="" +) +parser.add_argument( + "--type", "-t", help="Product Type from DefectDojo", type=int, required=True +) +parser.add_argument( + "--classifier", + "-c", + help="Project Classifier from DependencyTrack", + choices=[ + "APPLICATION", + "FRAMEWORK", + "LIBRARY", + "CONTAINER", + "OPERATING_SYSTEM", + "DEVICE", + "FIRMWARE", + "FILE", + "PLATFORM", + "DEVICE_DRIVER", + "MACHINE_LEARNING_MODEL", + "DATA", + ], + required=False, + default="", +) +parser.add_argument( + "--uploadsbom", + "-U", + help="Upload a already existing SBOM instead of generating it. Give the SBOM file at -F instead of a target", + required=False, + action="store_true", + default=False, +) +parser.add_argument( + "--sbomfile", + "-F", + help="Filename of existing SBOM file to upload, use together with -U, do not use together with -T", + required=False, +) +parser.add_argument( + "--minimalsbomformat", + "-K", + help="SBOM file comes in dedicated minimal format and will be converted into cyclonedx before uploading", + action="store_true", + default=False, +) +parser.add_argument( + "--overwritemetadata", + "-O", + help="Overwrite name, version, description and classifier with data from minimal SBOM", + action="store_true", + default=False, +) +parser.add_argument( + "--target", + "-T", + help="Target to scan, either path name for sources or docker image tag", + required=False, +) +parser.add_argument( + "--reimport", + "-R", + help="Import the SBOM for an existing project/product once again", + required=False, + action="store_true", + default=False, +) +parser.add_argument( + "--verbose", + "-V", + help="A lot of debug output", + required=False, + action="store_true", + default=False, +) args = parser.parse_args() projectName = args.name projectVersion = args.version @@ -77,41 +107,48 @@ projectClassifier = args.classifier reImport = args.reimport uploadSbomFlag = args.uploadsbom -if uploadSbomFlag: - sbomFileName = args.sbomfile - minimalSbomFormat = args.minimalsbomformat -else: - target = args.target +sbomFileName = args.sbomfile +minimalSbomFormat = args.minimalsbomformat +target = args.target -if minimalSbomFormat: - overwriteMetadata = args.overwritemetadata +overwriteMetadata = args.overwritemetadata -if not overwriteMetadata and not (projectName and projectVersion and projectClassifier and projectDescription): - raise MyLocalException("If overwriteMetadata is not selected, projectName, projectVersion, projectClassifier and projectDescription must be set.") +if not overwriteMetadata and not ( + projectName and projectVersion and projectClassifier and projectDescription +): + raise CliException( + "If overwriteMetadata is not selected, projectName, projectVersion, projectClassifier and projectDescription must be set." + ) CONFIG = {} try: - CONFIG['DTRACK_API_URL'] = os.environ["DTRACK_API_URL"] - CONFIG['DTRACK_TOKEN'] = os.environ["DTRACK_TOKEN"] - CONFIG['DEFECTDOJO_URL'] = os.environ["DEFECTDOJO_URL"] - CONFIG['DEFECTDOJO_TOKEN'] = os.environ["DEFECTDOJO_TOKEN"] + CONFIG["DTRACK_API_URL"] = os.environ["DTRACK_API_URL"] + CONFIG["DTRACK_TOKEN"] = os.environ["DTRACK_TOKEN"] + CONFIG["DEFECTDOJO_URL"] = os.environ["DEFECTDOJO_URL"] + CONFIG["DEFECTDOJO_TOKEN"] = os.environ["DEFECTDOJO_TOKEN"] except KeyError as e: - raise Exception(f"Env variable {e} is shall be set") + raise CliException(f"Env variable {e} is shall be set") from e -CONFIG['VERBOSE'] = args.verbose +CONFIG["VERBOSE"] = args.verbose # ---- main starts here -------------------------------------------------------------------------------------------------- if uploadSbomFlag: # ------- read uploaded SBOM ------------- logger.info(f"Reading SBOM from file {sbomFileName}") - with open(sbomFileName, 'r') as sbomFile: + with open(sbomFileName, "r") as sbomFile: sbom = sbomFile.read() logger.info("SBOM file read.") if minimalSbomFormat: logger.info("Start converting from minimal format into cyclonedx") - (sbom, nameFromMinimalSbom, versionFromMinimalSbom, classifierFromMinimalSbom, descriptionFromMinimalSbom) = minimalSbomFormatConverter(sbom) + ( + sbom, + nameFromMinimalSbom, + versionFromMinimalSbom, + classifierFromMinimalSbom, + descriptionFromMinimalSbom, + ) = minimalSbomFormatConverter(sbom) logger.info("Converted") if overwriteMetadata: projectName = nameFromMinimalSbom @@ -127,7 +164,13 @@ else: logger.info("Done.") - -loadToDTrackAndDefectDojo(CONFIG, projectName, projectVersion, projectClassifier, projectDescription, productType, sbom, reImport) - - +loadToDTrackAndDefectDojo( + CONFIG, + projectName, + projectVersion, + projectClassifier, + projectDescription, + productType, + sbom, + reImport, +)