Files
dtrack-defectdojo-automation/src/sbom_dt_dd_api.py
Wolfgang Hottgenroth b2db5b35ad
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
prepare second dockerfile
2025-07-15 14:33:07 +02:00

117 lines
4.1 KiB
Python

import os
import json
import yaml
from loguru import logger
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
from converter import minimalSbomFormatConverter
from sbom_dt_dd import generateSBOM, loadToDTrackAndDefectDojo, ApiException
app = FastAPI(
title="SBOM DTrack DefectDojo Synchronization API",
version="0.0.1",
description=""
)
config = {}
try:
config['DTRACK_API_URL'] = os.environ["DTRACK_API_URL"]
config['DTRACK_TOKEN'] = os.environ["DTRACK_TOKEN"]
config['DEFECTDOJO_URL'] = os.environ["DEFECTDOJO_URL"]
config['DEFECTDOJO_TOKEN'] = os.environ["DEFECTDOJO_TOKEN"]
config['VERBOSE'] = True
except KeyError as e:
raise Exception(f"Env variable {e} is shall be set")
app.state.config = config
@app.get("/hello")
async def say_hello(name: str):
"""
Returns a friendly greeting.
---
parameters:
- name: name
in: query
required: true
schema:
type: string
responses:
200:
description: Successful Response
content:
application/json:
schema:
type: object
properties:
message:
type: string
"""
return JSONResponse(content={"message": f"Hello, {name}!"})
@app.post("/uploadMinimalSBOM/")
async def uploadMinimalSBOM(
file: UploadFile = File(...),
reimport: bool = Form(...)
):
"""
Endpoint to upload a minimal SBOM definition
"""
try:
sbom = await file.read()
logger.info("Start converting from minimal format into cyclonedx")
(sbom, projectName, projectVersion, projectClassifier, projectDescription) = minimalSbomFormatConverter(sbom)
logger.info("Converted")
loadToDTrackAndDefectDojo(app.state.config, projectName, projectVersion, projectClassifier, projectDescription, 1, sbom, reimport)
logger.info("Done.")
except yaml.scanner.ScannerError as e:
logger.warning(f"uploadMinimalSBOM, yaml ScannerError: {e.context=}, {e.context_mark=}, {e.problem=}, {e.problem_mark=}, {e.note=}")
raise HTTPException(status_code=400, detail=f"yaml ScannerError: {e.context=}, {e.context_mark=}, {e.problem=}, {e.problem_mark=}, {e.note=}")
except ApiException as e:
logger.warning(f"uploadMinimalSBOM, ApiException: {e.status=}, {e.reason=}, {e.body=}")
raise HTTPException(status_code=e.status, detail=f"{e.reason=}, {e.body=}, {e.data=}")
except Exception as e:
logger.warning(f"uploadMinimalSBOM, Exception: {type(e)=}, {str(e)=}, {e.msg=}")
raise HTTPException(status_code=500, detail=f"Exception: {type(e)=}, {str(e)=}, {e.msg=}")
return JSONResponse(content={
"message": "Upload successful!"
})
@app.post("/uploadSBOM/")
async def uploadSBOM(
file: UploadFile = File(...),
projectName: str = Form(...),
projectVersion: str = Form(...),
projectClassifier: str = Form(...),
projectDescription: str = Form(...),
reimport: bool = Form(...)
):
"""
Endpoint to upload a CycloneDX SBOM
"""
sbom = await file.read()
try:
sbomJson = json.loads(sbom)
sbom = json.dumps(sbomJson)
loadToDTrackAndDefectDojo(app.state.config, projectName, projectVersion, projectClassifier, projectDescription, 1, str(sbom), reimport)
logger.info("Done.")
except json.decoder.JSONDecodeError as e:
logger.warning(f"uploadSBOM, JSONDecodeError: {e.msg=}")
raise HTTPException(status_code=400, detail=f"JSON decoding error: {e.msg=}, {e.doc=}, {e.pos=}, {e.lineno=}, {e.colno=}")
except ApiException as e:
logger.warning(f"uploadSBOM, ApiException: {e.status=}, {e.reason=}, {e.body=}")
raise HTTPException(status_code=e.status, detail=f"{e.reason=}, {e.body=}, {e.data=}")
except Exception as e:
logger.warning(f"uploadSBOM, Exception: {type(e)=}, {str(e)=}, {e.msg=}")
raise HTTPException(status_code=500, detail=f"Exception: {type(e)=}, {str(e)=}, {e.msg=}")
return JSONResponse(content={
"message": "Upload successful!"
})