Skip to content
Snippets Groups Projects
Select Git revision
  • 4844bb2f6ac848d370b53a4f085e9b64bc64b8fd
  • master default protected
  • revert-12-master
  • v1.1.4
  • 1.1.3
  • v.1.1.2
  • v.1.1.1
  • 1.1.0
  • 1.0.9
  • 1.0.8
  • 1.07
  • v1.0.6
  • v1.0.5
  • v1.0.4
  • v1.0.3
  • v.1.0.2
  • v.1.0.1
  • 1
  • 0.1.1
  • 0.1.0
  • 0.0.0
  • 0.0.9
  • 0.0.7
23 results

dojo_ci_cd.py

Blame
  • dojo_ci_cd.py 11.08 KiB
    """
    Example written by Aaron Weaver <aaron.weaver@owasp.org>
    as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects
    
    Description: CI/CD example for DefectDojo
    """
    from defectdojo_api import defectdojo
    from datetime import datetime, timedelta
    import os, sys
    import argparse
    import time
    
    test_cases = []
    
    def junit(toolName, file):
    
        junit_xml = junit_xml_output.JunitXml(toolName, test_cases, total_tests=None, total_failures=None)
        with open(file, 'w') as file:
            print "Writing Junit test files"
            file.write(junit_xml.dump())
    
    def dojo_connection(host, api_key, user, proxy):
        #Optionally, specify a proxy
        proxies = None
        if proxy:
            proxies = {
              'http': proxy,
              'https': proxy,
            }
    
        # Instantiate the DefectDojo api wrapper
        dd = defectdojo.DefectDojoAPI(host, api_key, user, proxies=proxies, verify_ssl=False, timeout=360, debug=False)
    
        return dd
        # Workflow as follows:
        # 1. Scan tool is run against build
        # 2. Reports is saved from scan tool
        # 3. Call this script to load scan data, specifying scanner type
        # 4. Script returns along with a pass or fail results: Example: 2 new critical vulns, 1 low out of 10 vulnerabilities
    
    def return_engagement(dd, product_id, user):
        #Specify the product id
        product_id = product_id
        engagement_id = None
    
        # Check for a CI/CD engagement_id
        engagements = dd.list_engagements(product_in=product_id, status="In Progress")
    
        if engagements.success:
            for engagement in engagements.data["objects"]:
                if "Recurring CI/CD Integration" == engagement['name']:
                    engagement_id = engagement['id']
    
        if engagement_id == None:
            start_date = datetime.now()
            end_date = start_date+timedelta(days=180)
            users = dd.list_users(user)
            user_id = None
    
            if users.success:
                user_id = users.data["objects"][0]["id"]
    
            engagement_id = dd.create_engagement("Recurring CI/CD Integration", product_id, str(user_id),
            "In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))
        return engagement_id
    
    def process_findings(dd, engagement_id, dir, build=None):
        test_ids = []
        for root, dirs, files in os.walk(dir):
            for name in files:
                file = os.path.join(os.getcwd(),root, name)
                test_id = processFiles(dd, engagement_id, file)
                if test_id is not None:
                    test_ids.append(str(test_id))
        return ','.join(test_ids)
    
    def processFiles(dd, engagement_id, file, scanner=None, build=None):
        upload_scan = None
        scannerName = None
        path=os.path.dirname(file)
        name = os.path.basename(file)
        tool = os.path.basename(path)
        tool = tool.lower()
    
        test_id = None
        date = datetime.now()
        dojoDate = date.strftime("%Y-%m-%d")
    
        #Tools without an importer in Dojo; attempted to import as generic
        if "generic" in name:
            scanner = "Generic Findings Import"
            if tool == "nikto":
                print "Uploading nikto scan: " + file
                test_id = dd.upload_scan(engagement_id, scanner, file, "true", dojoDate, build)
            elif tool == "bandit":
                print "Uploading bandit scan: " + file
                test_id = dd.upload_scan(engagement_id, scanner, file, "true", dojoDate, build)
        else:
            if tool == "burp":
                scannerName = "Burp Scan"
            elif tool == "nessus":
                scannerName = "Nessus Scan"
            elif tool == "nmap":
                scannerName = "Nmap Scan"
            elif tool == "nexpose":
                scannerName = "Nexpose Scan"
            elif tool == "veracode":
                scannerName = "Veracode Scan"
            elif tool == "checkmarx":
                scannerName = "Checkmarx Scan"
            elif tool == "zap":
                scannerName = "ZAP Scan"
            elif tool == "appspider":
                scannerName = "AppSpider Scan"
            elif tool == "Arachni Scan":
                scannerName = "Arachni Scan"
            elif tool == "vcg":
                scannerName = "VCG Scan"
            elif tool == "dependency":
                scannerName = "Dependency Check Scan"
            elif tool == "retirejs":
                scannerName = "Retire.js Scan"
            elif tool == "nodesecurity":
                scannerName = "Node Security Platform Scan"
            elif tool == "qualys":
                scannerName = "Qualys Scan"
            elif tool == "qualyswebapp":
                scannerName = "Qualys Webapp Scan"
            elif tool == "openvas":
                scannerName = "OpenVAS CSV"
            elif tool == "snyk":
                scannerName = "Snyk Scan"
    
            if scannerName is not None:
                print "Uploading " + scannerName + " scan: " + file
                test_id = dd.upload_scan(engagement_id, scannerName, file, "true", dojoDate, build)
    
        if test_id.success == False:
            print "Upload failed: Detailed error message: " + test_id.data
    
        return test_id
    
    def create_findings(dd, engagement_id, scanner, file, build=None):
        # Upload the scanner export
        if engagement_id > 0:
            print "Uploading scanner data."
            date = datetime.now()
    
            upload_scan = dd.upload_scan(engagement_id, scanner, file, "true", date.strftime("%Y-%m-%d"), build=build)
    
            if upload_scan.success:
                test_id = upload_scan.id()
            else:
                print upload_scan.message
                quit()
    
    def summary(dd, engagement_id, test_ids, max_critical=0, max_high=0, max_medium=0):
            findings = dd.list_findings(engagement_id_in=engagement_id, duplicate="false", active="true", verified="true")
            print"=============================================="
            print "Total Number of Vulnerabilities: " + str(findings.data["meta"]["total_count"])
            print"=============================================="
            print_findings(sum_severity(findings))
            print
            findings = dd.list_findings(test_id_in=test_ids, duplicate="true")
            print"=============================================="
            print "Total Number of Duplicate Findings: " + str(findings.data["meta"]["total_count"])
            print"=============================================="
            print_findings(sum_severity(findings))
            print
            #Delay while de-dupes
            sys.stdout.write("Sleeping for 10 seconds for de-dupe celery process:")
            sys.stdout.flush()
            for i in range(5):
                time.sleep(2)
                sys.stdout.write(".")
                sys.stdout.flush()
    
            findings = dd.list_findings(test_id_in=test_ids, duplicate="false", limit=500)
            if findings.count() > 0:
                """
                for finding in findings.data["objects"]:
                    test_cases.append(junit_xml_output.TestCase(finding["title"] + " Severity: " + finding["severity"], finding["description"],"failure"))
                if not os.path.exists("reports"):
                    os.mkdir("reports")
                junit("DefectDojo", "reports/junit_dojo.xml")
                """
    
            print"\n=============================================="
            print "Total Number of New Findings: " + str(findings.data["meta"]["total_count"])
            print"=============================================="
            sum_new_findings = sum_severity(findings)
            print_findings(sum_new_findings)
            print
            print"=============================================="
    
            strFail = None
            if max_critical is not None:
                if sum_new_findings[4] > max_critical:
                    strFail =  "Build Failed: Max Critical"
            if max_high is not None:
                if sum_new_findings[3] > max_high:
                    strFail = strFail +  " Max High"
            if max_medium is not None:
                if sum_new_findings[2] > max_medium:
                    strFail = strFail +  " Max Medium"
            if strFail is None:
                print "Build Passed!"
            else:
                print "Build Failed: " + strFail
            print"=============================================="
    
    def sum_severity(findings):
        severity = [0,0,0,0,0]
        for finding in findings.data["objects"]:
            if finding["severity"] == "Critical":
                severity[4] = severity[4] + 1
            if finding["severity"] == "High":
                severity[3] = severity[3] + 1
            if finding["severity"] == "Medium":
                severity[2] = severity[2] + 1
            if finding["severity"] == "Low":
                severity[1] = severity[1] + 1
            if finding["severity"] == "Info":
                severity[0] = severity[0] + 1
    
        return severity
    
    def print_findings(findings):
        print "Critical: " + str(findings[4])
        print "High: " + str(findings[3])
        print "Medium: " + str(findings[2])
        print "Low: " + str(findings[1])
        print "Info: " + str(findings[0])
    
    class Main:
        if __name__ == "__main__":
            parser = argparse.ArgumentParser(description='CI/CD integration for DefectDojo')
            parser.add_argument('--host', help="DefectDojo Hostname", required=True)
            parser.add_argument('--proxy', help="Proxy ex:localhost:8080", required=False, default=None)
            parser.add_argument('--api_key', help="API Key", required=True)
            parser.add_argument('--user', help="User", required=True)
            parser.add_argument('--product', help="DefectDojo Product ID", required=True)
            parser.add_argument('--file', help="Scanner file", required=False)
            parser.add_argument('--dir', help="Scanner directory, needs to have the scanner name with the scan file in the folder. Ex: reports/nmap/nmap.csv", required=False)
            parser.add_argument('--scanner', help="Type of scanner", required=False)
            parser.add_argument('--build', help="Build ID", required=False)
            parser.add_argument('--engagement', help="Engagement ID (optional)", required=False)
            parser.add_argument('--critical', help="Maximum new critical vulns to pass the build.", required=False)
            parser.add_argument('--high', help="Maximum new high vulns to pass the build.", required=False)
            parser.add_argument('--medium', help="Maximum new medium vulns to pass the build.", required=False)
    
            #Parse out arguments
            args = vars(parser.parse_args())
            host = args["host"]
            api_key = args["api_key"]
            user = args["user"]
            product_id = args["product"]
            file = args["file"]
            dir = args["dir"]
            scanner = args["scanner"]
            engagement_id = args["engagement"]
            max_critical = args["critical"]
            max_high = args["high"]
            max_medium = args["medium"]
            build = args["build"]
            proxy = args["proxy"]
    
            if dir is not None or file is not None:
                dd = dojo_connection(host, api_key, user, proxy=proxy)
                engagement_id = return_engagement(dd, product_id, user)
                test_ids = None
                if file is not None:
                    if scanner is not None:
                        test_ids = processFiles(dd, engagement_id, file, scanner=scanner)
                    else:
                        print "Scanner type must be specified for a file import. --scanner"
                else:
                    test_ids = process_findings(dd, engagement_id, dir, build)
    
                summary(dd, engagement_id, test_ids, max_critical, max_high, max_medium)
            else:
                print "No file or directory to scan specified."