23 Commits

Author SHA1 Message Date
10d14d87fb add deploy stage
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2025-07-15 15:07:53 +02:00
58795aca81 rename dockerfiles, fix 2 2025-07-15 14:45:32 +02:00
13271a6d5e rename dockerfiles, fix 2025-07-15 14:44:18 +02:00
5a9493fe32 rename dockerfiles
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2025-07-15 14:42:53 +02:00
708b99852f add second dockerfile, add ci snippet
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2025-07-15 14:40:40 +02:00
e15973db53 add second dockerfile 2025-07-15 14:40:06 +02:00
b2db5b35ad prepare second dockerfile
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2025-07-15 14:33:07 +02:00
b21bd408f7 there is still an error
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2025-07-14 23:13:30 +02:00
e1aa900f4d works
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2025-07-14 22:47:08 +02:00
91dd245318 add server
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2025-07-14 22:06:16 +02:00
921a784fc0 add webservice boilerplate snippet
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2025-07-14 17:58:43 +02:00
43678c69fb hidden imports, 9 2025-07-14 16:55:56 +02:00
4577f8f0a5 hidden imports, 8 2025-07-14 16:49:32 +02:00
4dd3e9e799 hidden imports, 7 2025-07-14 16:40:35 +02:00
46ce0e1d54 hidden imports, 6 2025-07-14 16:37:16 +02:00
07b5a2a512 hidden imports, 5 2025-07-14 16:32:22 +02:00
d30abf3d0c hidden imports, 4 2025-07-14 16:28:23 +02:00
f8061aaa7a hidden imports, 3 2025-07-14 16:23:01 +02:00
e1cce96308 hidden imports, 2 2025-07-14 16:18:08 +02:00
3bd9882beb hidden imports
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2025-07-14 16:13:18 +02:00
6004f6aeb4 add windows build step, 9
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2025-07-11 16:22:11 +02:00
3ffcf262e5 add windows build step, 8
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2025-07-11 16:17:24 +02:00
550b5ff28a add windows build step, 7
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2025-07-11 16:10:35 +02:00
14 changed files with 599 additions and 290 deletions

View File

@@ -1,6 +1,7 @@
stages: stages:
- generate-api-clients - generate-api-clients
- build - build
- deploy
variables: variables:
REGISTRY: devnexus.krohne.com:18079/repository/docker-krohne REGISTRY: devnexus.krohne.com:18079/repository/docker-krohne
@@ -68,7 +69,7 @@ generate-defectdojo-api:
-o defectdojo-client \ -o defectdojo-client \
--package-name defectdojo_api --package-name defectdojo_api
dockerize: .dockerize:
stage: build stage: build
image: devnexus.krohne.com:18079/repository/docker-krohne/krohnedockerbash:0.5 image: devnexus.krohne.com:18079/repository/docker-krohne/krohnedockerbash:0.5
tags: tags:
@@ -83,36 +84,77 @@ dockerize:
--tag $IMAGE_NAME:latest --tag $IMAGE_NAME:latest
--tag $IMAGE_NAME:$CI_COMMIT_SHA --tag $IMAGE_NAME:$CI_COMMIT_SHA
--tag $IMAGE_NAME:$CI_COMMIT_TAG --tag $IMAGE_NAME:$CI_COMMIT_TAG
-f $DOCKERFILE
. .
- docker login -u $NEXUS_USER -p $NEXUS_PASSWORD $REGISTRY - docker login -u $NEXUS_USER -p $NEXUS_PASSWORD $REGISTRY
- docker push $IMAGE_NAME:latest - docker push $IMAGE_NAME:latest
- docker push $IMAGE_NAME:$CI_COMMIT_SHA - docker push $IMAGE_NAME:$CI_COMMIT_SHA
- docker push $IMAGE_NAME:$CI_COMMIT_TAG - docker push $IMAGE_NAME:$CI_COMMIT_TAG
build-windows-binary: dockerize-cli:
stage: build extends: .dockerize
tags: variables:
- windows DOCKERFILE: Dockerfile-cli
- pwsh
- python3.13 dockerize-server:
rules: extends: .dockerize
- if: '$CI_COMMIT_TAG' variables:
artifacts: DOCKERFILE: Dockerfile-server
paths:
- sbom-dt-dd.exe .deploy:
stage: deploy
image: wollud1969/docker-bash:latest
image: devnexus.krohne.com:18079/repository/docker-krohne/krohnedockerbash:0.5
variables:
GIT_STRATEGY: none
SERVICE: sbom-dd-dt-integrator
dependencies:
- dockerize
script: script:
- dir - VERSION=$CI_COMMIT_SHA
- CONTAINER_NAME=$SERVICE"-"$INSTANCE_SPECIFIER
- SERVICE_VOLUME=$SERVICE"-"$INSTANCE_SPECIFIER"-data"
- docker volume inspect $SERVICE_VOLUME || docker volume create $SERVICE_VOLUME
- docker stop $CONTAINER_NAME || echo "$CONTAINER_NAME not running, anyway okay"
- docker rm $CONTAINER_NAME || echo "$CONTAINER_NAME not running, anyway okay"
- docker login -u $NEXUS_USER -p $NEXUS_PASSWORD $REGISTRY
- docker pull $IMAGE_NAME:$VERSION
- | - |
cd src cat - > /start-scripts/${CONTAINER_NAME}.sh << EOT
mv dependencytrack-client .\src docker run \
mv defectdojo-client .\src -d \
& 'C:\Program Files\Python313\python.exe' -m venv venv --restart always
.\venv\Scripts\pip.exe install --upgrade pip --name $CONTAINER_NAME \
.\venv\Scripts\pip.exe install -r requirements.txt -e DTRACK_API_URL=$DTRACK_API_URL \
.\venv\Scripts\pip.exe install -r dependencytrack-client\requirements.txt -e DTRACK_TOKEN=$DTRACK_TOKEN \
.\venv\Scripts\pip.exe install -r defectdojo-client\requirements.txt -e DEFECTDOJO_URL=$DEFECTDOJO_URL \
.\venv\Scripts\pip.exe install pyinstaller -e DEFECTDOJO_TOKEN=$DEFECTDOJO_TOKEN \
.\venv\Scripts\pyinstaller.exe --onefile sbom-dt-dd.py $IMAGE_NAME:$VERSION
mv dist\sbom-dt-dd.exe .. EOT
- chmod 755 /start-scripts/${CONTAINER_NAME}.sh
- /start-scripts/${CONTAINER_NAME}.sh
deploy-test:
extends: .deploy
only:
refs:
- master
tags:
- test-deployment-de01rdtst01
variables:
INSTANCE_SPECIFIER: test
environment:
name: test
deploy-dev:
extends: .deploy
only:
refs:
- production_deployment
tags:
- for-common-services-prod-deployment-only
variables:
INSTANCE_SPECIFIER: prod
environment:
name: prod

View File

@@ -31,9 +31,10 @@ USER user
WORKDIR $APP_DIR WORKDIR $APP_DIR
COPY src/requirements.txt . COPY src/requirements.txt .
COPY src/sbom-dt-dd.py . COPY src/sbom_dt_dd.py .
COPY src/sbom_dt_dd_cli.py .
COPY src/converter.py . COPY src/converter.py .
COPY src/entrypoint.sh . COPY src/entrypoint-cli.sh .
COPY dependencytrack-client/ ./dependencytrack-client COPY dependencytrack-client/ ./dependencytrack-client
COPY defectdojo-client/ ./defectdojo-client COPY defectdojo-client/ ./defectdojo-client
@@ -44,7 +45,7 @@ RUN \
pip install -r dependencytrack-client/requirements.txt &&\ pip install -r dependencytrack-client/requirements.txt &&\
pip install -r defectdojo-client/requirements.txt pip install -r defectdojo-client/requirements.txt
ENTRYPOINT [ "./entrypoint.sh" ] ENTRYPOINT [ "./entrypoint-cli.sh" ]

52
Dockerfile-server Normal file
View File

@@ -0,0 +1,52 @@
FROM python:3.12.10-alpine3.22
ENV DTRACK_API_URL=""
ENV DTRACK_TOKEN=""
ENV DEFECTDOJO_URL=""
ENV DEFECTDOJO_TOKEN=""
ARG APP_DIR=/opt/app
ARG ADDITIONAL_CA_URL="x"
ARG ADDITIONAL_CA_CHECKSUM="y"
RUN \
set -e &&\
adduser -s /bin/sh -D user &&\
mkdir -p $APP_DIR &&\
chown user:user $APP_DIR &&\
echo $ADDITIONAL_CA_URL &&\
echo $ADDITIONAL_CA_CHECKSUM &&\
if [ "$ADDITIONAL_CA_URL" != "x" ]; then \
cd /usr/local/share/ca-certificates; \
wget --no-check-certificate -O custom-ca.crt $ADDITIONAL_CA_URL; \
echo "$ADDITIONAL_CA_CHECKSUM custom-ca.crt" | md5sum -c; \
/usr/sbin/update-ca-certificates; \
echo "custom ca added"; \
else \
echo "no additional ca"; \
fi
USER user
WORKDIR $APP_DIR
COPY src/requirements.txt .
COPY src/sbom_dt_dd.py .
COPY src/sbom_dt_dd_api.py .
COPY src/converter.py .
COPY src/entrypoint-server.sh .
COPY dependencytrack-client/ ./dependencytrack-client
COPY defectdojo-client/ ./defectdojo-client
RUN \
python -m venv .venv &&\
. ./.venv/bin/activate &&\
pip install -r requirements.txt &&\
pip install -r dependencytrack-client/requirements.txt &&\
pip install -r defectdojo-client/requirements.txt
EXPOSE 8000
ENTRYPOINT [ "./entrypoint-server.sh" ]

33
snippets/websrv/main.py Normal file
View File

@@ -0,0 +1,33 @@
from fastapi import FastAPI
from fastapi.responses import JSONResponse
app = FastAPI(
title="My FastAPI App",
version="1.0.0",
description="A simple FastAPI example with uvicorn and gunicorn."
)
@app.get("/hello")
async def say_hello(name: str):
"""
Returns a friendly greeting.
---
parameters:
- name: name
in: query
required: true
schema:
type: string
responses:
200:
description: Successful Response
content:
application/json:
schema:
type: object
properties:
message:
type: string
"""
return JSONResponse(content={"message": f"Hello, {name}!"})

View File

@@ -0,0 +1,3 @@
fastapi==0.116.1
gunicorn==23.0.0
uvicorn==0.35.0

4
snippets/websrv/server.sh Executable file
View File

@@ -0,0 +1,4 @@
#!/bin/bash
./.venv/bin/gunicorn main:app -k uvicorn.workers.UvicornWorker -w 4 -b 0.0.0.0:8000

View File

@@ -47,7 +47,7 @@ def __converterClassifierToComponentType(classifier):
def minimalSbomFormatConverter(minimalSbom, classifier): def minimalSbomFormatConverter(minimalSbom):
logger.info(f"Minimal input: {minimalSbom}") logger.info(f"Minimal input: {minimalSbom}")
lc_factory = LicenseFactory() lc_factory = LicenseFactory()

View File

@@ -7,5 +7,4 @@ PYTHONPATH="$PYTHONPATH:/opt/app/dependencytrack-client"
PYTHONPATH="$PYTHONPATH:/opt/app/defectdojo-client" PYTHONPATH="$PYTHONPATH:/opt/app/defectdojo-client"
export PYTHONPATH export PYTHONPATH
exec python /opt/app/sbom-dt-dd.py "$@" exec python /opt/app/sbom_dt_dd_cli.py "$@"

9
src/entrypoint-server.sh Executable file
View File

@@ -0,0 +1,9 @@
#!/bin/bash
source /opt/app/.venv/bin/activate
PYTHONPATH="$PYTHONPATH:/opt/app/dependencytrack-client"
PYTHONPATH="$PYTHONPATH:/opt/app/defectdojo-client"
export PYTHONPATH
gunicorn sbom_dt_dd_api:app -k uvicorn.workers.UvicornWorker -w 4 -b 0.0.0.0:8000

View File

@@ -2,4 +2,6 @@ regex==2024.11.6
loguru==0.7.3 loguru==0.7.3
PyYAML==6.0.2 PyYAML==6.0.2
cyclonedx-python-lib==10.4.1 cyclonedx-python-lib==10.4.1
fastapi==0.116.1
gunicorn==23.0.0
uvicorn==0.35.0

View File

@@ -1,259 +0,0 @@
import os
from loguru import logger
import argparse
import subprocess
import json
import defectdojo_api
from defectdojo_api.rest import ApiException as DefectDojoApiException
import datetime
from dateutil.relativedelta import relativedelta
import dependencytrack_api
from dependencytrack_api.rest import ApiException as DependencyTrackApiException
from converter import minimalSbomFormatConverter
class MyLocalException(Exception): pass
def executeApiCall(apiClient, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams=[]):
try:
logger.info(f"Calling {ApiClass=}.{EndpointMethod=} with {RequestClass=})")
if VERBOSE:
logger.debug(f"{additionalParams=}, {requestParams=}")
instance = ApiClass(apiClient)
if RequestClass:
request = RequestClass(**requestParams)
response = EndpointMethod(instance, *additionalParams, request)
else:
response = EndpointMethod(instance, *additionalParams)
logger.info(f"Response is {response}")
return response
except Exception as e:
logger.error(f"Caught error {e} with {str(e)}")
raise MyLocalException(e)
def generateSBOM(target='.', name='dummyName', version='0.0.0'):
try:
result = subprocess.run(
["syft", "scan", target, "-o", "cyclonedx-json", "--source-name", name, "--source-version", version],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
sbom = json.loads(result.stdout)
return sbom
except subprocess.CalledProcessError as e:
logger.error(f"SBOM scanner failed: {e.stderr}")
raise MyLocalException(e)
# ---- main starts here with preparation of config -----------------------------------------------------------------------
try:
DTRACK_API_URL = os.environ["DTRACK_API_URL"]
DTRACK_TOKEN = os.environ["DTRACK_TOKEN"]
DEFECTDOJO_URL = os.environ["DEFECTDOJO_URL"]
DEFECTDOJO_TOKEN = os.environ["DEFECTDOJO_TOKEN"]
except KeyError as e:
raise Exception(f"Env variable {e} is shall be set")
parser = argparse.ArgumentParser(description='sbom-dt-dd glue logic')
parser.add_argument('--name', '-n',
help='Project Name',
required=False,
default=''),
parser.add_argument('--version', '-v',
help='Project Version',
required=False,
default='')
parser.add_argument('--description', '-d',
help='Project Description',
required=False,
default='')
parser.add_argument('--type', '-t',
help='Product Type from DefectDojo',
type=int,
required=True)
parser.add_argument('--classifier', '-c',
help='Project Classifier from DependencyTrack',
choices=['APPLICATION', 'FRAMEWORK', 'LIBRARY', 'CONTAINER', 'OPERATING_SYSTEM', 'DEVICE',
'FIRMWARE', 'FILE', 'PLATFORM', 'DEVICE_DRIVER', 'MACHINE_LEARNING_MODEL', 'DATA'],
required=False,
default='')
parser.add_argument('--uploadsbom', '-U',
help='Upload a already existing SBOM instead of generating it. Give the SBOM file at -F instead of a target',
required=False,
action='store_true',
default=False)
parser.add_argument('--sbomfile', '-F',
help='Filename of existing SBOM file to upload, use together with -U, do not use together with -T',
required=False)
parser.add_argument('--minimalsbomformat', '-K',
help='SBOM file comes in dedicated minimal format and will be converted into cyclonedx before uploading',
action='store_true',
default=False)
parser.add_argument('--overwritemetadata', '-O',
help='Overwrite name, version, description and classifier with data from minimal SBOM',
action='store_true',
default=False)
parser.add_argument('--target', '-T',
help='Target to scan, either path name for sources or docker image tag',
required=False)
parser.add_argument('--reimport', '-R',
help='Import the SBOM for an existing project/product once again',
required=False,
action='store_true',
default=False)
parser.add_argument('--verbose', '-V',
help='A lot of debug output',
required=False,
action='store_true',
default=False)
args = parser.parse_args()
projectName = args.name
projectVersion = args.version
projectDescription = args.description
productType = args.type
projectClassifier = args.classifier
reImport = args.reimport
uploadSbomFlag = args.uploadsbom
if uploadSbomFlag:
sbomFileName = args.sbomfile
minimalSbomFormat = args.minimalsbomformat
else:
target = args.target
if minimalSbomFormat:
overwriteMetadata = args.overwritemetadata
if not overwriteMetadata and not (projectName and projectVersion and projectClassifier and projectDescription):
raise MyLocalException("If overwriteMetadata is not selected, projectName, projectVersion, projectClassifier and projectDescription must be set.")
VERBOSE = args.verbose
# ---- main starts here --------------------------------------------------------------------------------------------------
if uploadSbomFlag:
# ------- read uploaded SBOM -------------
logger.info(f"Reading SBOM from file {sbomFileName}")
with open(sbomFileName, 'r') as sbomFile:
sbom = sbomFile.read()
logger.info("SBOM file read.")
if minimalSbomFormat:
logger.info("Start converting from minimal format into cyclonedx")
(sbom, nameFromMinimalSbom, versionFromMinimalSbom, classifierFromMinimalSbom, descriptionFromMinimalSbom) = minimalSbomFormatConverter(sbom, projectClassifier)
logger.info("Converted")
if overwriteMetadata:
projectName = nameFromMinimalSbom
projectVersion = versionFromMinimalSbom
projectClassifier = classifierFromMinimalSbom
projectDescription = descriptionFromMinimalSbom
logger.info("Done.")
else:
# ------- generate SBOM ------------
logger.info(f"Generating SBOM for {target}")
sbomJson = generateSBOM(target, projectName, projectVersion)
sbom = json.dumps(sbomJson)
logger.info("Done.")
# ------- create product and engagement in DefectDojo -------
if not reImport:
# in case of a reimport no modification on DefectDojo are required
defectdojo_configuration = defectdojo_api.Configuration(
host = DEFECTDOJO_URL
)
defectdojo_configuration.api_key['tokenAuth'] = DEFECTDOJO_TOKEN
defectdojo_configuration.api_key_prefix['tokenAuth'] = 'Token'
with defectdojo_api.ApiClient(defectdojo_configuration) as defectdojo_api_client:
print("Create product in DefectDojo")
productName = f"{projectName}:{projectVersion}"
product_response = \
executeApiCall(
defectdojo_api_client,
defectdojo_api.ProductsApi,
defectdojo_api.ProductsApi.products_create,
defectdojo_api.ProductRequest,
{ 'name': productName, 'description': projectDescription, 'prod_type': productType },
[]
)
product_id = product_response.id
print(f"{product_id=}")
print("Create engagement in DefectDojo")
start_time = datetime.date.today()
end_time = start_time + relativedelta(years=10)
engagementName = f"{productName} DTrack Link"
engagement_response = \
executeApiCall(
defectdojo_api_client,
defectdojo_api.EngagementsApi,
defectdojo_api.EngagementsApi.engagements_create,
defectdojo_api.EngagementRequest,
{ 'name': engagementName, 'target_start': start_time, 'target_end': end_time, 'status': 'In Progress', 'product': product_id },
[]
)
engagement_id = engagement_response.id
print(f"{engagement_id=}")
# ------- create project in DependencyTrack, connect project to engagement in DefectDojo, upload SBOM --------
dependencytrack_configuration = dependencytrack_api.Configuration(
host = f"{DTRACK_API_URL}/api"
)
dependencytrack_configuration.debug = False
dependencytrack_configuration.api_key['ApiKeyAuth'] = DTRACK_TOKEN
with dependencytrack_api.ApiClient(dependencytrack_configuration) as dependencytrack_api_client:
if not reImport:
# in case of a reimport it is not necessary to create the project
project_response = \
executeApiCall(
dependencytrack_api_client,
dependencytrack_api.ProjectApi,
dependencytrack_api.ProjectApi.create_project,
dependencytrack_api.Project,
{ 'name': projectName, 'version': projectVersion, 'classifier': projectClassifier, 'uuid': "", 'last_bom_import': 0 },
[]
)
project_uuid = project_response.uuid
print(f"{project_uuid=}")
properties = [
{ 'group_name': "integrations", 'property_name': "defectdojo.engagementId",
'property_value': str(engagement_id), 'property_type': "STRING" },
{ 'group_name': "integrations", 'property_name': "defectdojo.doNotReactivate",
'property_value': "true", 'property_type': "BOOLEAN" },
{ 'group_name': "integrations", 'property_name': "defectdojo.reimport",
'property_value': "true", 'property_type': "BOOLEAN" }
]
for property in properties:
executeApiCall(
dependencytrack_api_client,
dependencytrack_api.ProjectPropertyApi,
dependencytrack_api.ProjectPropertyApi.create_property1,
dependencytrack_api.ProjectProperty,
property,
[ project_uuid ]
)
bom_response = \
executeApiCall(
dependencytrack_api_client,
dependencytrack_api.BomApi,
dependencytrack_api.BomApi.upload_bom,
None,
None,
[ None, False, projectName, projectVersion, None, None, None, None, True, sbom ]
)

174
src/sbom_dt_dd.py Normal file
View File

@@ -0,0 +1,174 @@
import os
import sys
from loguru import logger
import json
import datetime
from dateutil.relativedelta import relativedelta
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'defectdojo-client'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'dependencytrack-client'))
import defectdojo_api
from defectdojo_api.rest import ApiException as DefectDojoApiException
import dependencytrack_api
from dependencytrack_api.rest import ApiException as DependencyTrackApiException
class ApiException(Exception):
def __init__(self, status, reason, body, data, headers):
self.status = status
self.reason = reason
self.body = body
self.data = data
self.headers = None
class ApiCallExecutor:
def __init__(self, verbose):
self.verbose = verbose
def innerExecuteApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams):
logger.info(f"Calling {ApiClass=}.{EndpointMethod=} with {RequestClass=})")
if self.verbose:
logger.debug(f"{additionalParams=}, {requestParams=}")
instance = ApiClass(self)
if RequestClass:
request = RequestClass(**requestParams)
response = EndpointMethod(instance, *additionalParams, request)
else:
response = EndpointMethod(instance, *additionalParams)
logger.info(f"Response is {response}")
return response
class DefectDojoApiClient(defectdojo_api.ApiClient, ApiCallExecutor):
def __init__(self, config, verbose):
defectdojo_api.ApiClient.__init__(self, config)
ApiCallExecutor.__init__(self, verbose)
def executeApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams):
try:
return self.innerExecuteApiCall(ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams)
except defectdojo_api.exceptions.ApiException as e:
raise ApiException(e.status, e.reason, e.body, e.data, e.headers)
class DependencyTrackApiClient(dependencytrack_api.ApiClient, ApiCallExecutor):
def __init__(self, config, verbose):
dependencytrack_api.ApiClient.__init__(self, config)
ApiCallExecutor.__init__(self, verbose)
def executeApiCall(self, ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams):
try:
return self.innerExecuteApiCall(ApiClass, EndpointMethod, RequestClass, requestParams, additionalParams)
except dependencytrack_api.exceptions.ApiException as e:
raise ApiException(e.status, e.reason, e.body, e.data, e.headers)
def generateSBOM(target='.', name='dummyName', version='0.0.0'):
try:
result = subprocess.run(
["syft", "scan", target, "-o", "cyclonedx-json", "--source-name", name, "--source-version", version],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
sbom = json.loads(result.stdout)
return sbom
except subprocess.CalledProcessError as e:
logger.error(f"SBOM scanner failed: {e.stderr}")
raise MyLocalException(e)
def loadToDTrackAndDefectDojo(config, projectName, projectVersion, projectClassifier, projectDescription, productType, sbom, reImport):
# ------- create product and engagement in DefectDojo -------
if not reImport:
# in case of a reimport no modification on DefectDojo are required
defectdojo_configuration = defectdojo_api.Configuration(
host = config['DEFECTDOJO_URL']
)
defectdojo_configuration.api_key['tokenAuth'] = config['DEFECTDOJO_TOKEN']
defectdojo_configuration.api_key_prefix['tokenAuth'] = 'Token'
with DefectDojoApiClient(defectdojo_configuration, config['VERBOSE']) as client:
print("Create product in DefectDojo")
productName = f"{projectName}:{projectVersion}"
product_response = \
client.executeApiCall(
defectdojo_api.ProductsApi,
defectdojo_api.ProductsApi.products_create,
defectdojo_api.ProductRequest,
{ 'name': productName, 'description': projectDescription, 'prod_type': productType },
[]
)
product_id = product_response.id
print(f"{product_id=}")
print("Create engagement in DefectDojo")
start_time = datetime.date.today()
end_time = start_time + relativedelta(years=10)
engagementName = f"{productName} DTrack Link"
engagement_response = \
client.executeApiCall(
defectdojo_api.EngagementsApi,
defectdojo_api.EngagementsApi.engagements_create,
defectdojo_api.EngagementRequest,
{ 'name': engagementName, 'target_start': start_time, 'target_end': end_time, 'status': 'In Progress', 'product': product_id },
[]
)
engagement_id = engagement_response.id
print(f"{engagement_id=}")
# ------- create project in DependencyTrack, connect project to engagement in DefectDojo, upload SBOM --------
dependencytrack_configuration = dependencytrack_api.Configuration(
host = f"{config['DTRACK_API_URL']}/api"
)
dependencytrack_configuration.debug = False
dependencytrack_configuration.api_key['ApiKeyAuth'] = config['DTRACK_TOKEN']
with DependencyTrackApiClient(dependencytrack_configuration, config['VERBOSE']) as client:
if not reImport:
# in case of a reimport it is not necessary to create the project
project_response = \
client.executeApiCall(
dependencytrack_api.ProjectApi,
dependencytrack_api.ProjectApi.create_project,
dependencytrack_api.Project,
{ 'name': projectName, 'version': projectVersion, 'classifier': projectClassifier, 'uuid': "", 'last_bom_import': 0 },
[]
)
project_uuid = project_response.uuid
print(f"{project_uuid=}")
properties = [
{ 'group_name': "integrations", 'property_name': "defectdojo.engagementId",
'property_value': str(engagement_id), 'property_type': "STRING" },
{ 'group_name': "integrations", 'property_name': "defectdojo.doNotReactivate",
'property_value': "true", 'property_type': "BOOLEAN" },
{ 'group_name': "integrations", 'property_name': "defectdojo.reimport",
'property_value': "true", 'property_type': "BOOLEAN" }
]
for property in properties:
client.executeApiCall(
dependencytrack_api.ProjectPropertyApi,
dependencytrack_api.ProjectPropertyApi.create_property1,
dependencytrack_api.ProjectProperty,
property,
[ project_uuid ]
)
bom_response = \
client.executeApiCall(
dependencytrack_api.BomApi,
dependencytrack_api.BomApi.upload_bom,
None,
None,
[ None, False, projectName, projectVersion, None, None, None, None, True, sbom ]
)

116
src/sbom_dt_dd_api.py Normal file
View File

@@ -0,0 +1,116 @@
import os
import json
import yaml
from loguru import logger
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
from converter import minimalSbomFormatConverter
from sbom_dt_dd import generateSBOM, loadToDTrackAndDefectDojo, ApiException
app = FastAPI(
title="SBOM DTrack DefectDojo Synchronization API",
version="0.0.1",
description=""
)
config = {}
try:
config['DTRACK_API_URL'] = os.environ["DTRACK_API_URL"]
config['DTRACK_TOKEN'] = os.environ["DTRACK_TOKEN"]
config['DEFECTDOJO_URL'] = os.environ["DEFECTDOJO_URL"]
config['DEFECTDOJO_TOKEN'] = os.environ["DEFECTDOJO_TOKEN"]
config['VERBOSE'] = True
except KeyError as e:
raise Exception(f"Env variable {e} is shall be set")
app.state.config = config
@app.get("/hello")
async def say_hello(name: str):
"""
Returns a friendly greeting.
---
parameters:
- name: name
in: query
required: true
schema:
type: string
responses:
200:
description: Successful Response
content:
application/json:
schema:
type: object
properties:
message:
type: string
"""
return JSONResponse(content={"message": f"Hello, {name}!"})
@app.post("/uploadMinimalSBOM/")
async def uploadMinimalSBOM(
file: UploadFile = File(...),
reimport: bool = Form(...)
):
"""
Endpoint to upload a minimal SBOM definition
"""
try:
sbom = await file.read()
logger.info("Start converting from minimal format into cyclonedx")
(sbom, projectName, projectVersion, projectClassifier, projectDescription) = minimalSbomFormatConverter(sbom)
logger.info("Converted")
loadToDTrackAndDefectDojo(app.state.config, projectName, projectVersion, projectClassifier, projectDescription, 1, sbom, reimport)
logger.info("Done.")
except yaml.scanner.ScannerError as e:
logger.warning(f"uploadMinimalSBOM, yaml ScannerError: {e.context=}, {e.context_mark=}, {e.problem=}, {e.problem_mark=}, {e.note=}")
raise HTTPException(status_code=400, detail=f"yaml ScannerError: {e.context=}, {e.context_mark=}, {e.problem=}, {e.problem_mark=}, {e.note=}")
except ApiException as e:
logger.warning(f"uploadMinimalSBOM, ApiException: {e.status=}, {e.reason=}, {e.body=}")
raise HTTPException(status_code=e.status, detail=f"{e.reason=}, {e.body=}, {e.data=}")
except Exception as e:
logger.warning(f"uploadMinimalSBOM, Exception: {type(e)=}, {str(e)=}, {e.msg=}")
raise HTTPException(status_code=500, detail=f"Exception: {type(e)=}, {str(e)=}, {e.msg=}")
return JSONResponse(content={
"message": "Upload successful!"
})
@app.post("/uploadSBOM/")
async def uploadSBOM(
file: UploadFile = File(...),
projectName: str = Form(...),
projectVersion: str = Form(...),
projectClassifier: str = Form(...),
projectDescription: str = Form(...),
reimport: bool = Form(...)
):
"""
Endpoint to upload a CycloneDX SBOM
"""
sbom = await file.read()
try:
sbomJson = json.loads(sbom)
sbom = json.dumps(sbomJson)
loadToDTrackAndDefectDojo(app.state.config, projectName, projectVersion, projectClassifier, projectDescription, 1, str(sbom), reimport)
logger.info("Done.")
except json.decoder.JSONDecodeError as e:
logger.warning(f"uploadSBOM, JSONDecodeError: {e.msg=}")
raise HTTPException(status_code=400, detail=f"JSON decoding error: {e.msg=}, {e.doc=}, {e.pos=}, {e.lineno=}, {e.colno=}")
except ApiException as e:
logger.warning(f"uploadSBOM, ApiException: {e.status=}, {e.reason=}, {e.body=}")
raise HTTPException(status_code=e.status, detail=f"{e.reason=}, {e.body=}, {e.data=}")
except Exception as e:
logger.warning(f"uploadSBOM, Exception: {type(e)=}, {str(e)=}, {e.msg=}")
raise HTTPException(status_code=500, detail=f"Exception: {type(e)=}, {str(e)=}, {e.msg=}")
return JSONResponse(content={
"message": "Upload successful!"
})

133
src/sbom_dt_dd_cli.py Normal file
View File

@@ -0,0 +1,133 @@
import os
import sys
from loguru import logger
import argparse
import subprocess
import json
from converter import minimalSbomFormatConverter
from sbom_dt_dd import generateSBOM, loadToDTrackAndDefectDojo
# ---- main starts here with preparation of config -----------------------------------------------------------------------
parser = argparse.ArgumentParser(description='sbom-dt-dd glue logic')
parser.add_argument('--name', '-n',
help='Project Name',
required=False,
default=''),
parser.add_argument('--version', '-v',
help='Project Version',
required=False,
default='')
parser.add_argument('--description', '-d',
help='Project Description',
required=False,
default='')
parser.add_argument('--type', '-t',
help='Product Type from DefectDojo',
type=int,
required=True)
parser.add_argument('--classifier', '-c',
help='Project Classifier from DependencyTrack',
choices=['APPLICATION', 'FRAMEWORK', 'LIBRARY', 'CONTAINER', 'OPERATING_SYSTEM', 'DEVICE',
'FIRMWARE', 'FILE', 'PLATFORM', 'DEVICE_DRIVER', 'MACHINE_LEARNING_MODEL', 'DATA'],
required=False,
default='')
parser.add_argument('--uploadsbom', '-U',
help='Upload a already existing SBOM instead of generating it. Give the SBOM file at -F instead of a target',
required=False,
action='store_true',
default=False)
parser.add_argument('--sbomfile', '-F',
help='Filename of existing SBOM file to upload, use together with -U, do not use together with -T',
required=False)
parser.add_argument('--minimalsbomformat', '-K',
help='SBOM file comes in dedicated minimal format and will be converted into cyclonedx before uploading',
action='store_true',
default=False)
parser.add_argument('--overwritemetadata', '-O',
help='Overwrite name, version, description and classifier with data from minimal SBOM',
action='store_true',
default=False)
parser.add_argument('--target', '-T',
help='Target to scan, either path name for sources or docker image tag',
required=False)
parser.add_argument('--reimport', '-R',
help='Import the SBOM for an existing project/product once again',
required=False,
action='store_true',
default=False)
parser.add_argument('--verbose', '-V',
help='A lot of debug output',
required=False,
action='store_true',
default=False)
args = parser.parse_args()
projectName = args.name
projectVersion = args.version
projectDescription = args.description
productType = args.type
projectClassifier = args.classifier
reImport = args.reimport
uploadSbomFlag = args.uploadsbom
if uploadSbomFlag:
sbomFileName = args.sbomfile
minimalSbomFormat = args.minimalsbomformat
else:
target = args.target
if minimalSbomFormat:
overwriteMetadata = args.overwritemetadata
if not overwriteMetadata and not (projectName and projectVersion and projectClassifier and projectDescription):
raise MyLocalException("If overwriteMetadata is not selected, projectName, projectVersion, projectClassifier and projectDescription must be set.")
CONFIG = {}
try:
CONFIG['DTRACK_API_URL'] = os.environ["DTRACK_API_URL"]
CONFIG['DTRACK_TOKEN'] = os.environ["DTRACK_TOKEN"]
CONFIG['DEFECTDOJO_URL'] = os.environ["DEFECTDOJO_URL"]
CONFIG['DEFECTDOJO_TOKEN'] = os.environ["DEFECTDOJO_TOKEN"]
except KeyError as e:
raise Exception(f"Env variable {e} is shall be set")
CONFIG['VERBOSE'] = args.verbose
# ---- main starts here --------------------------------------------------------------------------------------------------
if uploadSbomFlag:
# ------- read uploaded SBOM -------------
logger.info(f"Reading SBOM from file {sbomFileName}")
with open(sbomFileName, 'r') as sbomFile:
sbom = sbomFile.read()
logger.info("SBOM file read.")
if minimalSbomFormat:
logger.info("Start converting from minimal format into cyclonedx")
(sbom, nameFromMinimalSbom, versionFromMinimalSbom, classifierFromMinimalSbom, descriptionFromMinimalSbom) = minimalSbomFormatConverter(sbom)
logger.info("Converted")
if overwriteMetadata:
projectName = nameFromMinimalSbom
projectVersion = versionFromMinimalSbom
projectClassifier = classifierFromMinimalSbom
projectDescription = descriptionFromMinimalSbom
logger.info("Done.")
else:
# ------- generate SBOM ------------
logger.info(f"Generating SBOM for {target}")
sbomJson = generateSBOM(target, projectName, projectVersion)
sbom = json.dumps(sbomJson)
logger.info("Done.")
loadToDTrackAndDefectDojo(CONFIG, projectName, projectVersion, projectClassifier, projectDescription, productType, sbom, reImport)