import os import json import yaml from loguru import logger from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request from fastapi.responses import JSONResponse, HTMLResponse from fastapi.templating import Jinja2Templates from converter import minimalSbomFormatConverter from sbom_dt_dd import generateSBOM, loadToDTrackAndDefectDojo, ApiException app = FastAPI( title="SBOM DTrack DefectDojo Synchronization API", version="0.0.1", description="", root_path="/sbom-integrator/v1" ) config = {} try: config['DTRACK_API_URL'] = os.environ["DTRACK_API_URL"] config['DTRACK_TOKEN'] = os.environ["DTRACK_TOKEN"] config['DEFECTDOJO_URL'] = os.environ["DEFECTDOJO_URL"] config['DEFECTDOJO_TOKEN'] = os.environ["DEFECTDOJO_TOKEN"] config['VERBOSE'] = True except KeyError as e: raise Exception(f"Env variable {e} is shall be set") app.state.config = config @app.get("/upload-form", response_class=HTMLResponse) async def upload_form(request: Request): """ Route serving an HTML page with the upload form """ # BY AWARE OF THE HARDCODED ROOT_PATH BELOW html_content = """ Upload Minimal SBOM

Upload Minimal SBOM






""" return HTMLResponse(content=html_content) @app.post("/upload-minimal-sbom/") async def uploadMinimalSBOM( file: UploadFile = File(...), reimport: bool = Form(...) ): """ Endpoint to upload a minimal SBOM definition """ try: sbom = await file.read() logger.info("Start converting from minimal format into cyclonedx") (sbom, projectName, projectVersion, projectClassifier, projectDescription) = minimalSbomFormatConverter(sbom) logger.info("Converted") loadToDTrackAndDefectDojo(app.state.config, projectName, projectVersion, projectClassifier, projectDescription, 1, sbom, reimport) logger.info("Done.") except yaml.scanner.ScannerError as e: logger.warning(f"uploadMinimalSBOM, yaml ScannerError: {e.context=}, {e.context_mark=}, {e.problem=}, {e.problem_mark=}, {e.note=}") raise HTTPException( status_code=400, detail={ "error": "yaml ScannerError", "context": e.context, "context_mark": str(e.context_mark), "problem": e.problem, "problem_mark": str(e.problem_mark), "note": e.note } ) except ApiException as e: logger.warning(f"uploadMinimalSBOM, ApiException: {type(e.cause)=}, {e.status=}, {e.reason=}, {e.body=}") raise HTTPException( status_code=e.status, detail={ "type": str(type(e.cause)), "reason": e.reason, "body": e.body, "data": e.data } ) except Exception as e: logger.warning(f"uploadMinimalSBOM, Exception: {type(e)=}, {str(e)=}") raise HTTPException( status_code=500, detail={ "error": "Exception occurred", "type": str(type(e)), "message": str(e) } ) return JSONResponse(content={ "message": "Upload successful!" }) @app.post("/upload-sbom/") async def uploadSBOM( file: UploadFile = File(...), projectName: str = Form(...), projectVersion: str = Form(...), projectClassifier: str = Form(...), projectDescription: str = Form(...), reimport: bool = Form(...) ): """ Endpoint to upload a CycloneDX SBOM """ sbom = await file.read() try: sbomJson = json.loads(sbom) sbom = json.dumps(sbomJson) loadToDTrackAndDefectDojo(app.state.config, projectName, projectVersion, projectClassifier, projectDescription, 1, str(sbom), reimport) logger.info("Done.") except json.decoder.JSONDecodeError as e: logger.warning(f"uploadSBOM, JSONDecodeError: {e.msg=}") raise HTTPException( status_code=400, detail={ "error": "JSON decoding error", "msg": e.msg, "doc": e.doc, "pos": e.pos, "lineno": e.lineno, "colno": e.colno } ) except ApiException as e: logger.warning(f"uploadSBOM, ApiException: {type(e.cause)=}, {e.status=}, {e.reason=}, {e.body=}") raise HTTPException( status_code=e.status, detail={ "type": str(type(e.cause)), "reason": e.reason, "body": e.body, "data": e.data } ) except Exception as e: logger.warning(f"uploadSBOM, Exception: {type(e)=}, {str(e)=}") raise HTTPException( status_code=500, detail={ "error": "Exception occurred", "type": str(type(e)), "message": str(e) } ) return JSONResponse(content={ "message": "Upload successful!" })