import os import sys import subprocess from loguru import logger import json import datetime from dateutil.relativedelta import relativedelta sys.path.insert(0, os.path.join(os.path.dirname(__file__), "defectdojo-client")) sys.path.insert(0, os.path.join(os.path.dirname(__file__), "dependencytrack-client")) import defectdojo_api from defectdojo_api.rest import ApiException as DefectDojoApiException import dependencytrack_api from dependencytrack_api.rest import ApiException as DependencyTrackApiException class ApiException(Exception): def __init__(self, cause): self.cause = cause self.status = cause.status self.reason = cause.reason self.body = cause.body self.data = cause.data self.headers = cause.headers class ApiCallExecutor: def __init__(self, verbose): self.verbose = verbose def innerExecuteApiCall( self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams ): logger.info(f"Calling {ApiClass=}.{EndpointMethod=} with {RequestClass=})") if self.verbose: logger.debug(f"{additionalParams=}, {requestParams=}") instance = ApiClass(self) if RequestClass: request = RequestClass(**requestParams) response = EndpointMethod(instance, *additionalParams, request) else: response = EndpointMethod(instance, *additionalParams) logger.info(f"Response is {response}") return response class DefectDojoApiClient(defectdojo_api.ApiClient, ApiCallExecutor): def __init__(self, config, verbose): defectdojo_api.ApiClient.__init__(self, config) ApiCallExecutor.__init__(self, verbose) def executeApiCall( self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams ): try: return self.innerExecuteApiCall( ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams ) except defectdojo_api.exceptions.ApiException as e: raise ApiException(e) class DependencyTrackApiClient(dependencytrack_api.ApiClient, ApiCallExecutor): def __init__(self, config, verbose): dependencytrack_api.ApiClient.__init__(self, config) ApiCallExecutor.__init__(self, verbose) def executeApiCall( self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams ): try: return self.innerExecuteApiCall( ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams ) except dependencytrack_api.exceptions.ApiException as e: raise ApiException(e) def generateSBOM(target=".", name="dummyName", version="0.0.0"): try: result = subprocess.run( [ "syft", "scan", target, "-o", "cyclonedx-json@1.5", "--source-name", name, "--source-version", version, ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) sbom = json.loads(result.stdout) return sbom except subprocess.CalledProcessError as e: logger.error(f"SBOM scanner failed: {e.stderr}") raise MyLocalException(e) def loadToDTrackAndDefectDojo( config, projectName, projectVersion, projectClassifier, projectDescription, productType, sbom, reImport, ): # ------- create product and engagement in DefectDojo ------- if not reImport: # in case of a reimport no modification on DefectDojo are required defectdojo_configuration = defectdojo_api.Configuration( host=config["DEFECTDOJO_URL"] ) defectdojo_configuration.api_key["tokenAuth"] = config["DEFECTDOJO_TOKEN"] defectdojo_configuration.api_key_prefix["tokenAuth"] = "Token" with DefectDojoApiClient(defectdojo_configuration, config["VERBOSE"]) as client: print("Create product in DefectDojo") productName = f"{projectName}:{projectVersion}" product_response = client.executeApiCall( defectdojo_api.ProductsApi, defectdojo_api.ProductsApi.products_create, defectdojo_api.ProductRequest, { "name": productName, "description": projectDescription, "prod_type": productType, }, [], ) product_id = product_response.id print(f"{product_id=}") print("Create engagement in DefectDojo") start_time = datetime.date.today() end_time = start_time + relativedelta(years=10) engagementName = f"{productName} DTrack Link" engagement_response = client.executeApiCall( defectdojo_api.EngagementsApi, defectdojo_api.EngagementsApi.engagements_create, defectdojo_api.EngagementRequest, { "name": engagementName, "target_start": start_time, "target_end": end_time, "status": "In Progress", "product": product_id, }, [], ) engagement_id = engagement_response.id print(f"{engagement_id=}") # ------- create project in DependencyTrack, connect project to engagement in DefectDojo, upload SBOM -------- dependencytrack_configuration = dependencytrack_api.Configuration( host=f"{config['DTRACK_API_URL']}/api" ) dependencytrack_configuration.debug = False dependencytrack_configuration.api_key["ApiKeyAuth"] = config["DTRACK_TOKEN"] with DependencyTrackApiClient( dependencytrack_configuration, config["VERBOSE"] ) as client: if not reImport: # in case of a reimport it is not necessary to create the project project_response = client.executeApiCall( dependencytrack_api.ProjectApi, dependencytrack_api.ProjectApi.create_project, dependencytrack_api.Project, { "name": projectName, "version": projectVersion, "classifier": projectClassifier, "uuid": "", "last_bom_import": 0, }, [], ) project_uuid = project_response.uuid print(f"{project_uuid=}") properties = [ { "group_name": "integrations", "property_name": "defectdojo.engagementId", "property_value": str(engagement_id), "property_type": "STRING", }, { "group_name": "integrations", "property_name": "defectdojo.doNotReactivate", "property_value": "true", "property_type": "BOOLEAN", }, { "group_name": "integrations", "property_name": "defectdojo.reimport", "property_value": "true", "property_type": "BOOLEAN", }, ] for property in properties: client.executeApiCall( dependencytrack_api.ProjectPropertyApi, dependencytrack_api.ProjectPropertyApi.create_property1, dependencytrack_api.ProjectProperty, property, [project_uuid], ) bom_response = client.executeApiCall( dependencytrack_api.BomApi, dependencytrack_api.BomApi.upload_bom, None, None, [ None, False, projectName, projectVersion, None, None, None, None, True, sbom, ], )