This commit is contained in:
@@ -45,3 +45,5 @@ python3 -m venv .venv
|
||||
pip install -r requirements.txt
|
||||
pip install -r $LOCALLBIS/dependencytrack-client/requirements.txt
|
||||
pip install -r $LOCALLBIS/defectdojo-client/requirements.txt
|
||||
|
||||
|
||||
|
@@ -1,5 +1,6 @@
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
|
||||
from loguru import logger
|
||||
import json
|
||||
@@ -7,8 +8,8 @@ import json
|
||||
import datetime
|
||||
from dateutil.relativedelta import relativedelta
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'defectdojo-client'))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'dependencytrack-client'))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "defectdojo-client"))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "dependencytrack-client"))
|
||||
|
||||
import defectdojo_api
|
||||
from defectdojo_api.rest import ApiException as DefectDojoApiException
|
||||
@@ -16,6 +17,7 @@ from defectdojo_api.rest import ApiException as DefectDojoApiException
|
||||
import dependencytrack_api
|
||||
from dependencytrack_api.rest import ApiException as DependencyTrackApiException
|
||||
|
||||
|
||||
class ApiException(Exception):
|
||||
def __init__(self, cause):
|
||||
self.cause = cause
|
||||
@@ -25,11 +27,14 @@ class ApiException(Exception):
|
||||
self.data = cause.data
|
||||
self.headers = cause.headers
|
||||
|
||||
|
||||
class ApiCallExecutor:
|
||||
def __init__(self, verbose):
|
||||
self.verbose = verbose
|
||||
|
||||
def innerExecuteApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams):
|
||||
def innerExecuteApiCall(
|
||||
self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams
|
||||
):
|
||||
logger.info(f"Calling {ApiClass=}.{EndpointMethod=} with {RequestClass=})")
|
||||
if self.verbose:
|
||||
logger.debug(f"{additionalParams=}, {requestParams=}")
|
||||
@@ -42,38 +47,57 @@ class ApiCallExecutor:
|
||||
logger.info(f"Response is {response}")
|
||||
return response
|
||||
|
||||
|
||||
class DefectDojoApiClient(defectdojo_api.ApiClient, ApiCallExecutor):
|
||||
def __init__(self, config, verbose):
|
||||
defectdojo_api.ApiClient.__init__(self, config)
|
||||
ApiCallExecutor.__init__(self, verbose)
|
||||
|
||||
def executeApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams):
|
||||
def executeApiCall(
|
||||
self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams
|
||||
):
|
||||
try:
|
||||
return self.innerExecuteApiCall(ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams)
|
||||
return self.innerExecuteApiCall(
|
||||
ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams
|
||||
)
|
||||
except defectdojo_api.exceptions.ApiException as e:
|
||||
raise ApiException(e)
|
||||
|
||||
|
||||
class DependencyTrackApiClient(dependencytrack_api.ApiClient, ApiCallExecutor):
|
||||
def __init__(self, config, verbose):
|
||||
dependencytrack_api.ApiClient.__init__(self, config)
|
||||
ApiCallExecutor.__init__(self, verbose)
|
||||
|
||||
def executeApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams):
|
||||
def executeApiCall(
|
||||
self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams
|
||||
):
|
||||
try:
|
||||
return self.innerExecuteApiCall(ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams)
|
||||
return self.innerExecuteApiCall(
|
||||
ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams
|
||||
)
|
||||
except dependencytrack_api.exceptions.ApiException as e:
|
||||
raise ApiException(e)
|
||||
|
||||
|
||||
|
||||
def generateSBOM(target='.', name='dummyName', version='0.0.0'):
|
||||
def generateSBOM(target=".", name="dummyName", version="0.0.0"):
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["syft", "scan", target, "-o", "cyclonedx-json", "--source-name", name, "--source-version", version],
|
||||
[
|
||||
"syft",
|
||||
"scan",
|
||||
target,
|
||||
"-o",
|
||||
"cyclonedx-json@1.5",
|
||||
"--source-name",
|
||||
name,
|
||||
"--source-version",
|
||||
version,
|
||||
],
|
||||
check=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True
|
||||
text=True,
|
||||
)
|
||||
sbom = json.loads(result.stdout)
|
||||
return sbom
|
||||
@@ -82,28 +106,39 @@ def generateSBOM(target='.', name='dummyName', version='0.0.0'):
|
||||
raise MyLocalException(e)
|
||||
|
||||
|
||||
|
||||
def loadToDTrackAndDefectDojo(config, projectName, projectVersion, projectClassifier, projectDescription, productType, sbom, reImport):
|
||||
def loadToDTrackAndDefectDojo(
|
||||
config,
|
||||
projectName,
|
||||
projectVersion,
|
||||
projectClassifier,
|
||||
projectDescription,
|
||||
productType,
|
||||
sbom,
|
||||
reImport,
|
||||
):
|
||||
# ------- create product and engagement in DefectDojo -------
|
||||
if not reImport:
|
||||
# in case of a reimport no modification on DefectDojo are required
|
||||
defectdojo_configuration = defectdojo_api.Configuration(
|
||||
host = config['DEFECTDOJO_URL']
|
||||
host=config["DEFECTDOJO_URL"]
|
||||
)
|
||||
defectdojo_configuration.api_key['tokenAuth'] = config['DEFECTDOJO_TOKEN']
|
||||
defectdojo_configuration.api_key_prefix['tokenAuth'] = 'Token'
|
||||
defectdojo_configuration.api_key["tokenAuth"] = config["DEFECTDOJO_TOKEN"]
|
||||
defectdojo_configuration.api_key_prefix["tokenAuth"] = "Token"
|
||||
|
||||
with DefectDojoApiClient(defectdojo_configuration, config['VERBOSE']) as client:
|
||||
with DefectDojoApiClient(defectdojo_configuration, config["VERBOSE"]) as client:
|
||||
print("Create product in DefectDojo")
|
||||
productName = f"{projectName}:{projectVersion}"
|
||||
product_response = \
|
||||
client.executeApiCall(
|
||||
defectdojo_api.ProductsApi,
|
||||
defectdojo_api.ProductsApi.products_create,
|
||||
defectdojo_api.ProductRequest,
|
||||
{ 'name': productName, 'description': projectDescription, 'prod_type': productType },
|
||||
[]
|
||||
)
|
||||
product_response = client.executeApiCall(
|
||||
defectdojo_api.ProductsApi,
|
||||
defectdojo_api.ProductsApi.products_create,
|
||||
defectdojo_api.ProductRequest,
|
||||
{
|
||||
"name": productName,
|
||||
"description": projectDescription,
|
||||
"prod_type": productType,
|
||||
},
|
||||
[],
|
||||
)
|
||||
|
||||
product_id = product_response.id
|
||||
print(f"{product_id=}")
|
||||
@@ -112,48 +147,71 @@ def loadToDTrackAndDefectDojo(config, projectName, projectVersion, projectClassi
|
||||
start_time = datetime.date.today()
|
||||
end_time = start_time + relativedelta(years=10)
|
||||
engagementName = f"{productName} DTrack Link"
|
||||
engagement_response = \
|
||||
client.executeApiCall(
|
||||
defectdojo_api.EngagementsApi,
|
||||
defectdojo_api.EngagementsApi.engagements_create,
|
||||
defectdojo_api.EngagementRequest,
|
||||
{ 'name': engagementName, 'target_start': start_time, 'target_end': end_time, 'status': 'In Progress', 'product': product_id },
|
||||
[]
|
||||
)
|
||||
engagement_response = client.executeApiCall(
|
||||
defectdojo_api.EngagementsApi,
|
||||
defectdojo_api.EngagementsApi.engagements_create,
|
||||
defectdojo_api.EngagementRequest,
|
||||
{
|
||||
"name": engagementName,
|
||||
"target_start": start_time,
|
||||
"target_end": end_time,
|
||||
"status": "In Progress",
|
||||
"product": product_id,
|
||||
},
|
||||
[],
|
||||
)
|
||||
|
||||
engagement_id = engagement_response.id
|
||||
print(f"{engagement_id=}")
|
||||
|
||||
|
||||
# ------- create project in DependencyTrack, connect project to engagement in DefectDojo, upload SBOM --------
|
||||
# ------- create project in DependencyTrack, connect project to engagement in DefectDojo, upload SBOM --------
|
||||
dependencytrack_configuration = dependencytrack_api.Configuration(
|
||||
host = f"{config['DTRACK_API_URL']}/api"
|
||||
host=f"{config['DTRACK_API_URL']}/api"
|
||||
)
|
||||
dependencytrack_configuration.debug = False
|
||||
dependencytrack_configuration.api_key['ApiKeyAuth'] = config['DTRACK_TOKEN']
|
||||
dependencytrack_configuration.api_key["ApiKeyAuth"] = config["DTRACK_TOKEN"]
|
||||
|
||||
with DependencyTrackApiClient(dependencytrack_configuration, config['VERBOSE']) as client:
|
||||
with DependencyTrackApiClient(
|
||||
dependencytrack_configuration, config["VERBOSE"]
|
||||
) as client:
|
||||
if not reImport:
|
||||
# in case of a reimport it is not necessary to create the project
|
||||
project_response = \
|
||||
client.executeApiCall(
|
||||
dependencytrack_api.ProjectApi,
|
||||
dependencytrack_api.ProjectApi.create_project,
|
||||
dependencytrack_api.Project,
|
||||
{ 'name': projectName, 'version': projectVersion, 'classifier': projectClassifier, 'uuid': "", 'last_bom_import': 0 },
|
||||
[]
|
||||
)
|
||||
project_response = client.executeApiCall(
|
||||
dependencytrack_api.ProjectApi,
|
||||
dependencytrack_api.ProjectApi.create_project,
|
||||
dependencytrack_api.Project,
|
||||
{
|
||||
"name": projectName,
|
||||
"version": projectVersion,
|
||||
"classifier": projectClassifier,
|
||||
"uuid": "",
|
||||
"last_bom_import": 0,
|
||||
},
|
||||
[],
|
||||
)
|
||||
|
||||
project_uuid = project_response.uuid
|
||||
print(f"{project_uuid=}")
|
||||
|
||||
properties = [
|
||||
{ 'group_name': "integrations", 'property_name': "defectdojo.engagementId",
|
||||
'property_value': str(engagement_id), 'property_type': "STRING" },
|
||||
{ 'group_name': "integrations", 'property_name': "defectdojo.doNotReactivate",
|
||||
'property_value': "true", 'property_type': "BOOLEAN" },
|
||||
{ 'group_name': "integrations", 'property_name': "defectdojo.reimport",
|
||||
'property_value': "true", 'property_type': "BOOLEAN" }
|
||||
{
|
||||
"group_name": "integrations",
|
||||
"property_name": "defectdojo.engagementId",
|
||||
"property_value": str(engagement_id),
|
||||
"property_type": "STRING",
|
||||
},
|
||||
{
|
||||
"group_name": "integrations",
|
||||
"property_name": "defectdojo.doNotReactivate",
|
||||
"property_value": "true",
|
||||
"property_type": "BOOLEAN",
|
||||
},
|
||||
{
|
||||
"group_name": "integrations",
|
||||
"property_name": "defectdojo.reimport",
|
||||
"property_value": "true",
|
||||
"property_type": "BOOLEAN",
|
||||
},
|
||||
]
|
||||
for property in properties:
|
||||
client.executeApiCall(
|
||||
@@ -161,15 +219,24 @@ def loadToDTrackAndDefectDojo(config, projectName, projectVersion, projectClassi
|
||||
dependencytrack_api.ProjectPropertyApi.create_property1,
|
||||
dependencytrack_api.ProjectProperty,
|
||||
property,
|
||||
[ project_uuid ]
|
||||
[project_uuid],
|
||||
)
|
||||
|
||||
bom_response = \
|
||||
client.executeApiCall(
|
||||
dependencytrack_api.BomApi,
|
||||
dependencytrack_api.BomApi.upload_bom,
|
||||
bom_response = client.executeApiCall(
|
||||
dependencytrack_api.BomApi,
|
||||
dependencytrack_api.BomApi.upload_bom,
|
||||
None,
|
||||
None,
|
||||
[
|
||||
None,
|
||||
False,
|
||||
projectName,
|
||||
projectVersion,
|
||||
None,
|
||||
None,
|
||||
[ None, False, projectName, projectVersion, None, None, None, None, True, sbom ]
|
||||
)
|
||||
|
||||
None,
|
||||
None,
|
||||
True,
|
||||
sbom,
|
||||
],
|
||||
)
|
||||
|
@@ -2,72 +2,102 @@ import os
|
||||
import sys
|
||||
from loguru import logger
|
||||
import argparse
|
||||
import subprocess
|
||||
import json
|
||||
|
||||
|
||||
|
||||
from converter import minimalSbomFormatConverter
|
||||
from sbom_dt_dd import generateSBOM, loadToDTrackAndDefectDojo
|
||||
|
||||
|
||||
class CliException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
# ---- main starts here with preparation of config -----------------------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description='sbom-dt-dd glue logic')
|
||||
parser.add_argument('--name', '-n',
|
||||
help='Project Name',
|
||||
required=False,
|
||||
default=''),
|
||||
parser.add_argument('--version', '-v',
|
||||
help='Project Version',
|
||||
required=False,
|
||||
default='')
|
||||
parser.add_argument('--description', '-d',
|
||||
help='Project Description',
|
||||
required=False,
|
||||
default='')
|
||||
parser.add_argument('--type', '-t',
|
||||
help='Product Type from DefectDojo',
|
||||
type=int,
|
||||
required=True)
|
||||
parser.add_argument('--classifier', '-c',
|
||||
help='Project Classifier from DependencyTrack',
|
||||
choices=['APPLICATION', 'FRAMEWORK', 'LIBRARY', 'CONTAINER', 'OPERATING_SYSTEM', 'DEVICE',
|
||||
'FIRMWARE', 'FILE', 'PLATFORM', 'DEVICE_DRIVER', 'MACHINE_LEARNING_MODEL', 'DATA'],
|
||||
required=False,
|
||||
default='')
|
||||
parser.add_argument('--uploadsbom', '-U',
|
||||
help='Upload a already existing SBOM instead of generating it. Give the SBOM file at -F instead of a target',
|
||||
required=False,
|
||||
action='store_true',
|
||||
default=False)
|
||||
parser.add_argument('--sbomfile', '-F',
|
||||
help='Filename of existing SBOM file to upload, use together with -U, do not use together with -T',
|
||||
required=False)
|
||||
parser.add_argument('--minimalsbomformat', '-K',
|
||||
help='SBOM file comes in dedicated minimal format and will be converted into cyclonedx before uploading',
|
||||
action='store_true',
|
||||
default=False)
|
||||
parser.add_argument('--overwritemetadata', '-O',
|
||||
help='Overwrite name, version, description and classifier with data from minimal SBOM',
|
||||
action='store_true',
|
||||
default=False)
|
||||
parser.add_argument('--target', '-T',
|
||||
help='Target to scan, either path name for sources or docker image tag',
|
||||
required=False)
|
||||
parser.add_argument('--reimport', '-R',
|
||||
help='Import the SBOM for an existing project/product once again',
|
||||
required=False,
|
||||
action='store_true',
|
||||
default=False)
|
||||
parser.add_argument('--verbose', '-V',
|
||||
help='A lot of debug output',
|
||||
required=False,
|
||||
action='store_true',
|
||||
default=False)
|
||||
parser = argparse.ArgumentParser(description="sbom-dt-dd glue logic")
|
||||
parser.add_argument("--name", "-n", help="Project Name", required=False, default="")
|
||||
parser.add_argument(
|
||||
"--version", "-v", help="Project Version", required=False, default=""
|
||||
)
|
||||
parser.add_argument(
|
||||
"--description", "-d", help="Project Description", required=False, default=""
|
||||
)
|
||||
parser.add_argument(
|
||||
"--type", "-t", help="Product Type from DefectDojo", type=int, required=True
|
||||
)
|
||||
parser.add_argument(
|
||||
"--classifier",
|
||||
"-c",
|
||||
help="Project Classifier from DependencyTrack",
|
||||
choices=[
|
||||
"APPLICATION",
|
||||
"FRAMEWORK",
|
||||
"LIBRARY",
|
||||
"CONTAINER",
|
||||
"OPERATING_SYSTEM",
|
||||
"DEVICE",
|
||||
"FIRMWARE",
|
||||
"FILE",
|
||||
"PLATFORM",
|
||||
"DEVICE_DRIVER",
|
||||
"MACHINE_LEARNING_MODEL",
|
||||
"DATA",
|
||||
],
|
||||
required=False,
|
||||
default="",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--uploadsbom",
|
||||
"-U",
|
||||
help="Upload a already existing SBOM instead of generating it. Give the SBOM file at -F instead of a target",
|
||||
required=False,
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--sbomfile",
|
||||
"-F",
|
||||
help="Filename of existing SBOM file to upload, use together with -U, do not use together with -T",
|
||||
required=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--minimalsbomformat",
|
||||
"-K",
|
||||
help="SBOM file comes in dedicated minimal format and will be converted into cyclonedx before uploading",
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--overwritemetadata",
|
||||
"-O",
|
||||
help="Overwrite name, version, description and classifier with data from minimal SBOM",
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--target",
|
||||
"-T",
|
||||
help="Target to scan, either path name for sources or docker image tag",
|
||||
required=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--reimport",
|
||||
"-R",
|
||||
help="Import the SBOM for an existing project/product once again",
|
||||
required=False,
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose",
|
||||
"-V",
|
||||
help="A lot of debug output",
|
||||
required=False,
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
args = parser.parse_args()
|
||||
projectName = args.name
|
||||
projectVersion = args.version
|
||||
@@ -77,41 +107,48 @@ projectClassifier = args.classifier
|
||||
reImport = args.reimport
|
||||
|
||||
uploadSbomFlag = args.uploadsbom
|
||||
if uploadSbomFlag:
|
||||
sbomFileName = args.sbomfile
|
||||
minimalSbomFormat = args.minimalsbomformat
|
||||
else:
|
||||
target = args.target
|
||||
sbomFileName = args.sbomfile
|
||||
minimalSbomFormat = args.minimalsbomformat
|
||||
target = args.target
|
||||
|
||||
if minimalSbomFormat:
|
||||
overwriteMetadata = args.overwritemetadata
|
||||
overwriteMetadata = args.overwritemetadata
|
||||
|
||||
if not overwriteMetadata and not (projectName and projectVersion and projectClassifier and projectDescription):
|
||||
raise MyLocalException("If overwriteMetadata is not selected, projectName, projectVersion, projectClassifier and projectDescription must be set.")
|
||||
if not overwriteMetadata and not (
|
||||
projectName and projectVersion and projectClassifier and projectDescription
|
||||
):
|
||||
raise CliException(
|
||||
"If overwriteMetadata is not selected, projectName, projectVersion, projectClassifier and projectDescription must be set."
|
||||
)
|
||||
|
||||
|
||||
CONFIG = {}
|
||||
try:
|
||||
CONFIG['DTRACK_API_URL'] = os.environ["DTRACK_API_URL"]
|
||||
CONFIG['DTRACK_TOKEN'] = os.environ["DTRACK_TOKEN"]
|
||||
CONFIG['DEFECTDOJO_URL'] = os.environ["DEFECTDOJO_URL"]
|
||||
CONFIG['DEFECTDOJO_TOKEN'] = os.environ["DEFECTDOJO_TOKEN"]
|
||||
CONFIG["DTRACK_API_URL"] = os.environ["DTRACK_API_URL"]
|
||||
CONFIG["DTRACK_TOKEN"] = os.environ["DTRACK_TOKEN"]
|
||||
CONFIG["DEFECTDOJO_URL"] = os.environ["DEFECTDOJO_URL"]
|
||||
CONFIG["DEFECTDOJO_TOKEN"] = os.environ["DEFECTDOJO_TOKEN"]
|
||||
except KeyError as e:
|
||||
raise Exception(f"Env variable {e} is shall be set")
|
||||
raise CliException(f"Env variable {e} is shall be set") from e
|
||||
|
||||
CONFIG['VERBOSE'] = args.verbose
|
||||
CONFIG["VERBOSE"] = args.verbose
|
||||
|
||||
# ---- main starts here --------------------------------------------------------------------------------------------------
|
||||
|
||||
if uploadSbomFlag:
|
||||
# ------- read uploaded SBOM -------------
|
||||
logger.info(f"Reading SBOM from file {sbomFileName}")
|
||||
with open(sbomFileName, 'r') as sbomFile:
|
||||
with open(sbomFileName, "r") as sbomFile:
|
||||
sbom = sbomFile.read()
|
||||
logger.info("SBOM file read.")
|
||||
if minimalSbomFormat:
|
||||
logger.info("Start converting from minimal format into cyclonedx")
|
||||
(sbom, nameFromMinimalSbom, versionFromMinimalSbom, classifierFromMinimalSbom, descriptionFromMinimalSbom) = minimalSbomFormatConverter(sbom)
|
||||
(
|
||||
sbom,
|
||||
nameFromMinimalSbom,
|
||||
versionFromMinimalSbom,
|
||||
classifierFromMinimalSbom,
|
||||
descriptionFromMinimalSbom,
|
||||
) = minimalSbomFormatConverter(sbom)
|
||||
logger.info("Converted")
|
||||
if overwriteMetadata:
|
||||
projectName = nameFromMinimalSbom
|
||||
@@ -127,7 +164,13 @@ else:
|
||||
logger.info("Done.")
|
||||
|
||||
|
||||
|
||||
loadToDTrackAndDefectDojo(CONFIG, projectName, projectVersion, projectClassifier, projectDescription, productType, sbom, reImport)
|
||||
|
||||
|
||||
loadToDTrackAndDefectDojo(
|
||||
CONFIG,
|
||||
projectName,
|
||||
projectVersion,
|
||||
projectClassifier,
|
||||
projectDescription,
|
||||
productType,
|
||||
sbom,
|
||||
reImport,
|
||||
)
|
||||
|
Reference in New Issue
Block a user