import os import sys from loguru import logger import json import datetime from dateutil.relativedelta import relativedelta sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'defectdojo-client')) sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'dependencytrack-client')) import defectdojo_api from defectdojo_api.rest import ApiException as DefectDojoApiException import dependencytrack_api from dependencytrack_api.rest import ApiException as DependencyTrackApiException class ApiException(Exception): def __init__(self, cause): self.cause = cause self.status = cause.status self.reason = cause.reason self.body = cause.body self.data = cause.data self.headers = cause.headers class ApiCallExecutor: def __init__(self, verbose): self.verbose = verbose def innerExecuteApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams): logger.info(f"Calling {ApiClass=}.{EndpointMethod=} with {RequestClass=})") if self.verbose: logger.debug(f"{additionalParams=}, {requestParams=}") instance = ApiClass(self) if RequestClass: request = RequestClass(**requestParams) response = EndpointMethod(instance, *additionalParams, request) else: response = EndpointMethod(instance, *additionalParams) logger.info(f"Response is {response}") return response class DefectDojoApiClient(defectdojo_api.ApiClient, ApiCallExecutor): def __init__(self, config, verbose): defectdojo_api.ApiClient.__init__(self, config) ApiCallExecutor.__init__(self, verbose) def executeApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams): try: return self.innerExecuteApiCall(ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams) except defectdojo_api.exceptions.ApiException as e: raise ApiException(e) class DependencyTrackApiClient(dependencytrack_api.ApiClient, ApiCallExecutor): def __init__(self, config, verbose): dependencytrack_api.ApiClient.__init__(self, config) ApiCallExecutor.__init__(self, verbose) def executeApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams): try: return self.innerExecuteApiCall(ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams) except dependencytrack_api.exceptions.ApiException as e: raise ApiException(e) def generateSBOM(target='.', name='dummyName', version='0.0.0'): try: result = subprocess.run( ["syft", "scan", target, "-o", "cyclonedx-json", "--source-name", name, "--source-version", version], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) sbom = json.loads(result.stdout) return sbom except subprocess.CalledProcessError as e: logger.error(f"SBOM scanner failed: {e.stderr}") raise MyLocalException(e) def loadToDTrackAndDefectDojo(config, projectName, projectVersion, projectClassifier, projectDescription, productType, sbom, reImport): # ------- create product and engagement in DefectDojo ------- if not reImport: # in case of a reimport no modification on DefectDojo are required defectdojo_configuration = defectdojo_api.Configuration( host = config['DEFECTDOJO_URL'] ) defectdojo_configuration.api_key['tokenAuth'] = config['DEFECTDOJO_TOKEN'] defectdojo_configuration.api_key_prefix['tokenAuth'] = 'Token' with DefectDojoApiClient(defectdojo_configuration, config['VERBOSE']) as client: print("Create product in DefectDojo") productName = f"{projectName}:{projectVersion}" product_response = \ client.executeApiCall( defectdojo_api.ProductsApi, defectdojo_api.ProductsApi.products_create, defectdojo_api.ProductRequest, { 'name': productName, 'description': projectDescription, 'prod_type': productType }, [] ) product_id = product_response.id print(f"{product_id=}") print("Create engagement in DefectDojo") start_time = datetime.date.today() end_time = start_time + relativedelta(years=10) engagementName = f"{productName} DTrack Link" engagement_response = \ client.executeApiCall( defectdojo_api.EngagementsApi, defectdojo_api.EngagementsApi.engagements_create, defectdojo_api.EngagementRequest, { 'name': engagementName, 'target_start': start_time, 'target_end': end_time, 'status': 'In Progress', 'product': product_id }, [] ) engagement_id = engagement_response.id print(f"{engagement_id=}") # ------- create project in DependencyTrack, connect project to engagement in DefectDojo, upload SBOM -------- dependencytrack_configuration = dependencytrack_api.Configuration( host = f"{config['DTRACK_API_URL']}/api" ) dependencytrack_configuration.debug = False dependencytrack_configuration.api_key['ApiKeyAuth'] = config['DTRACK_TOKEN'] with DependencyTrackApiClient(dependencytrack_configuration, config['VERBOSE']) as client: if not reImport: # in case of a reimport it is not necessary to create the project project_response = \ client.executeApiCall( dependencytrack_api.ProjectApi, dependencytrack_api.ProjectApi.create_project, dependencytrack_api.Project, { 'name': projectName, 'version': projectVersion, 'classifier': projectClassifier, 'uuid': "", 'last_bom_import': 0 }, [] ) project_uuid = project_response.uuid print(f"{project_uuid=}") properties = [ { 'group_name': "integrations", 'property_name': "defectdojo.engagementId", 'property_value': str(engagement_id), 'property_type': "STRING" }, { 'group_name': "integrations", 'property_name': "defectdojo.doNotReactivate", 'property_value': "true", 'property_type': "BOOLEAN" }, { 'group_name': "integrations", 'property_name': "defectdojo.reimport", 'property_value': "true", 'property_type': "BOOLEAN" } ] for property in properties: client.executeApiCall( dependencytrack_api.ProjectPropertyApi, dependencytrack_api.ProjectPropertyApi.create_property1, dependencytrack_api.ProjectProperty, property, [ project_uuid ] ) bom_response = \ client.executeApiCall( dependencytrack_api.BomApi, dependencytrack_api.BomApi.upload_bom, None, None, [ None, False, projectName, projectVersion, None, None, None, None, True, sbom ] )