From 7e08907642fbec61d38cbb1429b48d99be3285f5 Mon Sep 17 00:00:00 2001 From: Aaron Weaver <aaron.weaver2@gmail.com> Date: Thu, 1 Dec 2016 16:05:32 -0500 Subject: [PATCH] API Upload Example --- defectdojo_api/defectdojo.py | 19 ++-- examples/dojo_ci_cd.py | 143 ++++++++++++++++++++++++++++++ examples/dojo_populate.py | 18 ++-- examples/dojo_product.py | 6 ++ tests/defectdojo_api_unit_test.py | 70 ++++++++++----- 5 files changed, 219 insertions(+), 37 deletions(-) create mode 100644 examples/dojo_ci_cd.py diff --git a/defectdojo_api/defectdojo.py b/defectdojo_api/defectdojo.py index a099adc..09b41bd 100644 --- a/defectdojo_api/defectdojo.py +++ b/defectdojo_api/defectdojo.py @@ -9,7 +9,7 @@ from . import __version__ as version class DefectDojoAPI(object): """An API wrapper for DefectDojo.""" - def __init__(self, host, api_key, user, api_version='v1', verify_ssl=True, timeout=30, proxies=None, user_agent=None, cert=None, debug=False): + def __init__(self, host, api_key, user, api_version='v1', verify_ssl=True, timeout=60, proxies=None, user_agent=None, cert=None, debug=False): """Initialize a DefectDojo API instance. :param host: The URL for the DefectDojo server. (e.g., http://localhost:8000/DefectDojo/) @@ -111,7 +111,7 @@ class DefectDojoAPI(object): return self._request('GET', 'users/' + str(user_id) + '/') ###### Engagements API ####### - def list_engagements(self, product_in=None,limit=20): + def list_engagements(self, status=None, product_in=None,limit=20): """Retrieves all the engagements. :param product_in: List of product ids (1,2). @@ -126,6 +126,9 @@ class DefectDojoAPI(object): if product_in: params['product__in'] = product_in + if status: + params['status'] = status + return self._request('GET', 'engagements/', params) def get_engagement(self, engagement_id): @@ -401,7 +404,7 @@ class DefectDojoAPI(object): ###### Findings API ####### def list_findings(self, active=None, duplicate=None, mitigated=None, severity=None, verified=None, severity_lt=None, severity_gt=None, severity_contains=None, title_contains=None, url_contains=None, date_lt=None, - date_gt=None, date=None, product_id_in=None, engagement_id_in=None, test_in=None, limit=20): + date_gt=None, date=None, product_id_in=None, engagement_id_in=None, test_id_in=None, limit=20): """Returns filtered list of findings. @@ -474,8 +477,8 @@ class DefectDojoAPI(object): if product_id_in: params['product__id__in'] = product_id_in - if test_in: - params['test__in'] = test_in + if test_id_in: + params['test__id__in'] = test_id_in return self._request('GET', 'findings/', params) @@ -598,7 +601,7 @@ class DefectDojoAPI(object): ##### Upload API ##### - def upload_scan(self, engagement_id, scan_type, file_path, active, scan_date, tags): + def upload_scan(self, engagement_id, scan_type, file, active, scan_date, tags=None): """Uploads and processes a scan file. :param application_id: Application identifier. @@ -607,8 +610,8 @@ class DefectDojoAPI(object): """ data = { - 'file': open(file_path, 'rb'), - 'eid': ('', str(engagement_id)), + 'file': open(file, 'rb'), + 'engagement': ('', self.get_engagement_uri(engagement_id)), 'scan_type': ('', scan_type), 'active': ('', active), 'scan_date': ('', scan_date), diff --git a/examples/dojo_ci_cd.py b/examples/dojo_ci_cd.py new file mode 100644 index 0000000..ce8a5bf --- /dev/null +++ b/examples/dojo_ci_cd.py @@ -0,0 +1,143 @@ +""" +Example written by Aaron Weaver <aaron.weaver@owasp.org> +as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects + +Description: CI/CD example for DefectDojo +""" +from defectdojo_api import defectdojo +from datetime import datetime, timedelta +import os +import argparse + +# Setup DefectDojo connection information +host = 'http://localhost:8000' +api_key = os.environ['DOJO_API_KEY'] +user = 'admin' + +#Optionally, specify a proxy +proxies = { + 'http': 'http://localhost:8080', + 'https': 'http://localhost:8080', +} +""" +proxies=proxies +""" + +def sum_severity(findings): + severity = [0,0,0,0,0] + for finding in findings.data["objects"]: + if finding["severity"] == "Critical": + severity[4] = severity[4] + 1 + if finding["severity"] == "High": + severity[3] = severity[3] + 1 + if finding["severity"] == "Medium": + severity[2] = severity[2] + 1 + if finding["severity"] == "Info": + severity[1] = severity[1] + 1 + + return severity + +def print_findings(findings): + print "Critical: " + str(findings[4]) + print "High: " + str(findings[3]) + print "Medium: " + str(findings[2]) + print "Low: " + str(findings[1]) + print "Info: " + str(findings[0]) + +def create_findings(product_id, user_id, file, scanner, engagement_id=None, max_critical=0, max_high=0, max_medium=0): + # Instantiate the DefectDojo api wrapper + dd = defectdojo.DefectDojoAPI(host, api_key, user, proxies=proxies, timeout=90, debug=False) + + # Workflow as follows: + # 1. Scan tool is run against build + # 2. Reports is saved from scan tool + # 3. Call this script to load scan data, specifying scanner type + # 4. Script returns along with a pass or fail results: Example: 2 new critical vulns, 1 low out of 10 vulnerabilities + + #Specify the product id + product_id = product_id + engagement_id = None + user_id = 1 + + # Check for a CI/CD engagement_id + engagements = dd.list_engagements(product_in=product_id, status="In Progress") + if engagements.success: + for engagement in engagements.data["objects"]: + if "Recurring CI/CD Integration" == engagement['name']: + engagement_id = engagement['id'] + + # Engagement doesn't exist, create it + if engagement_id == None: + start_date = datetime.now() + end_date = start_date+timedelta(days=180) + + engagement_id = dd.create_engagement("Recurring CI/CD Integration", product_id, user_id, + "In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")) + + # Upload the scanner export + dir_path = os.path.dirname(os.path.realpath(__file__)) + + print "Uploading scanner data." + date = datetime.now() + upload_scan = dd.upload_scan(engagement_id, scanner, dir_path + file, "true", date.strftime("%Y/%m/%d"), "API") + + if upload_scan.success: + test_id = upload_scan.id() + else: + print upload_scan.message + + findings = dd.list_findings(engagement_id_in=engagement_id, duplicate="false", active="true", verified="true") + print"==============================================" + print "Total Number of Vulnerabilities: " + str(findings.data["meta"]["total_count"]) + print"==============================================" + print_findings(sum_severity(findings)) + print + findings = dd.list_findings(test_id_in=test_id, duplicate="true") + print"==============================================" + print "Total Number of Duplicate Findings: " + str(findings.data["meta"]["total_count"]) + print"==============================================" + print_findings(sum_severity(findings)) + print + findings = dd.list_findings(test_id_in=test_id, duplicate="false") + print"==============================================" + print "Total Number of New Findings: " + str(findings.data["meta"]["total_count"]) + print"==============================================" + sum_new_findings = sum_severity(findings) + print_findings(sum_new_findings) + print + print"==============================================" + + if sum_new_findings[4] > max_critical: + print "Build Failed: Max Critical" + elif sum_new_findings[3] > max_high: + print "Build Failed: Max High" + elif sum_new_findings[2] > max_medium: + print "Build Failed: Max Medium" + else: + print "Build Passed!" + print"==============================================" + +class Main: + if __name__ == "__main__": + parser = argparse.ArgumentParser(description='CI/CD integration for DefectDojo') + parser.add_argument('--user', help="Dojo Product ID", required=True) + parser.add_argument('--product', help="Dojo Product ID", required=True) + parser.add_argument('--file', help="Scanner file", required=True) + parser.add_argument('--scanner', help="Type of scanner", required=True) + parser.add_argument('--engagement', help="Engagement ID (optional)", required=False) + parser.add_argument('--critical', default=0, help="Maximum new critical vulns to pass the build.", required=False) + parser.add_argument('--high', default=0, help="Maximum new high vulns to pass the build.", required=False) + parser.add_argument('--medium', default=0, help="Maximum new medium vulns to pass the build.", required=False) + + #Parse out arguments + args = vars(parser.parse_args()) + user_id = args["user"] + product_id = args["product"] + file = args["file"] + scanner = args["scanner"] + engagement_id = args["engagement"] + max_critical = args["critical"] + max_high = args["high"] + max_medium = args["medium"] + + create_findings(product_id, user_id, file, scanner, engagement_id, max_critical, max_high, max_medium) diff --git a/examples/dojo_populate.py b/examples/dojo_populate.py index c54afb9..d3f4b54 100644 --- a/examples/dojo_populate.py +++ b/examples/dojo_populate.py @@ -1,12 +1,15 @@ +""" +Example written by Aaron Weaver <aaron.weaver@owasp.org> +as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects + +Description: Imports test data into DefectDojo and creates products, +engagements and tests along with findings. +""" from defectdojo_api import defectdojo from random import randint import os from datetime import datetime, timedelta -""" -Imports test data into Defect DefectDojo -""" - # Setup DefectDojo connection information host = 'http://localhost:8000' api_key = os.environ['DOJO_API_KEY'] @@ -66,8 +69,9 @@ def create_load_data(product_name, product_desc, file=None, file_test_type=None) if file is not None: print "Loading scanner results from scanner export" dir_path = os.path.dirname(os.path.realpath(__file__)) - upload_scan = dd.upload_scan(engagement_id, "Burp Scan", dir_path + file, - "true", "01/11/2016", "API") + date = datetime.now() + upload_scan = dd.upload_scan(engagement_id, file_test_type, dir_path + file, + "true", date.strftime("%Y/%m/%d"), "API") i = 0 while i < 6: @@ -89,7 +93,7 @@ def create_load_data(product_name, product_desc, file=None, file_test_type=None) print product.message ##### Create Products, Engagements and Tests ######## -create_load_data("BodgeIt", "Product description.", "../tests/scans/Bodgeit-burp.xml", "Burp Scan") +create_load_data("BodgeIt", "Product description.", "/tests/scans/Bodgeit-burp.xml", "Burp Scan") create_load_data("A CRM App", "Product description.") create_load_data("An Engineering Application", "Product description.") create_load_data("A Marketing Site", "Product description.") diff --git a/examples/dojo_product.py b/examples/dojo_product.py index 413f0a8..69faf7c 100644 --- a/examples/dojo_product.py +++ b/examples/dojo_product.py @@ -1,3 +1,9 @@ +""" +Example written by Aaron Weaver <aaron.weaver@owasp.org> +as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects + +Description: Creates a product in DefectDojo and returns information about the newly created product +""" from defectdojo_api import defectdojo import os diff --git a/tests/defectdojo_api_unit_test.py b/tests/defectdojo_api_unit_test.py index 54aeabf..5c0c9c9 100644 --- a/tests/defectdojo_api_unit_test.py +++ b/tests/defectdojo_api_unit_test.py @@ -1,7 +1,14 @@ +""" +UnitTests written by Aaron Weaver <aaron.weaver@owasp.org> +as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects + +Description: Tests the functionality of the DefectDojo API. +""" from defectdojo_api import defectdojo import unittest import os +from datetime import datetime class TestDefectDojoAPI(unittest.TestCase): @@ -10,12 +17,14 @@ class TestDefectDojoAPI(unittest.TestCase): api_key = os.environ['DOJO_API_KEY'] user = 'admin' + """ proxies = { 'http': 'http://localhost:8080', 'https': 'http://localhost:8080', } - - self.dd = defectdojo.DefectDojoAPI(host, api_key, user, proxies=proxies, debug=False) + proxies=proxies + """ + self.dd = defectdojo.DefectDojoAPI(host, api_key, user, debug=False) #### USER API TESTS #### def test_01_get_user(self): @@ -31,16 +40,19 @@ class TestDefectDojoAPI(unittest.TestCase): #### Product API TESTS #### def test_03_create_product(self): product = self.dd.create_product("API Product Test", "Description", 1) + self.__class__.product_id = product.id() self.assertIsNotNone(product.id()) def test_04_get_product(self): - product = self.dd.get_product(1) + product = self.dd.get_product(self.__class__.product_id) #print product.data_json(pretty=True) self.assertIsNotNone(product.data['name']) def test_05_set_product(self): - self.dd.set_product(1, name="Product Update Test") - product = self.dd.get_product(1) + product = self.dd.create_product("API Product Test", "Description", 1) + new_product_id = product.id() + self.dd.set_product(new_product_id, name="Product Update Test") + product = self.dd.get_product(new_product_id) #print product.data_json(pretty=True) self.assertEqual("Product Update Test", product.data['name']) @@ -52,13 +64,14 @@ class TestDefectDojoAPI(unittest.TestCase): #### Engagement API TESTS #### def test_07_create_engagement(self): - product_id = 1 + product_id = self.__class__.product_id user_id = 1 engagement = self.dd.create_engagement("API Engagement", product_id, user_id, "In Progress", "2016-11-01", "2016-12-01") + self.__class__.engagement_id = engagement.id() self.assertIsNotNone(engagement.id()) def test_08_get_engagement(self): - engagement = self.dd.get_engagement(1) + engagement = self.dd.get_engagement(self.__class__.engagement_id) #print engagement.data_json(pretty=True) self.assertIsNotNone(str(engagement.data["name"])) @@ -67,9 +80,15 @@ class TestDefectDojoAPI(unittest.TestCase): #print engagements.data_json(pretty=True) self.assertTrue(engagements.data["meta"]["total_count"]>0) + #Note: Fails b/c of issue with DefectDojo's API def test_10_set_engagement(self): - self.dd.set_engagement(1, name="Engagement Update Test") - engagement = self.dd.get_engagement(1) + user_id = 1 + product_id = self.__class__.product_id + + engagement = self.dd.create_engagement("API Engagement", product_id, user_id, "In Progress", "2016-11-01", "2016-12-01") + new_engagement_id = engagement.id() + self.dd.set_engagement(new_engagement_id, name="Engagement Update Test") + engagement = self.dd.get_engagement(new_engagement_id) #print engagement.data_json(pretty=True) self.assertEqual("Engagement Update Test", engagement.data['name']) @@ -79,10 +98,11 @@ class TestDefectDojoAPI(unittest.TestCase): test_type = 1 #1 is the API Test environment = 1 #1 is the Development Environment test = self.dd.create_test(engagement_id, test_type, environment, "2016-11-01", "2016-12-01") + self.__class__.test_id = test.id() self.assertIsNotNone(test.id()) def test_12_get_test(self): - test = self.dd.get_test(1) + test = self.dd.get_test(self.__class__.test_id) #print test.data_json(pretty=True) self.assertIsNotNone(str(test.data["engagement"])) @@ -92,23 +112,26 @@ class TestDefectDojoAPI(unittest.TestCase): self.assertTrue(tests.data["meta"]["total_count"]>0) def test_14_set_test(self): - self.dd.set_test(1, percent_complete="99") - test = self.dd.get_test(1) + self.dd.set_test(self.__class__.test_id, percent_complete="99") + test = self.dd.get_test(self.__class__.test_id) #print test.data_json(pretty=True) self.assertEqual(99, test.data['percent_complete']) #### Findings API TESTS #### + #Fails b/c of DojoAPI Issue def test_15_create_finding(self): cwe = 25 - product_id = 1 - engagement_id = 1 - test_id = 1 + product_id = self.__class__.product_id + engagement_id = self.__class__.engagement_id + test_id = self.__class__.test_id user_id = 1 finding = self.dd.create_finding("API Created", "Description", "Critical", cwe, "2016-11-01", product_id, engagement_id, test_id, user_id, "None", "true", "false", "References") + self.__class__.finding_id = finding.id() self.assertIsNotNone(finding.id()) + #Fails b/c of DojoAPI Issue def test_16_get_finding(self): - finding = self.dd.get_finding(1) + finding = self.dd.get_finding(self.__class__.finding_id) #print finding.data_json(pretty=True) self.assertIsNotNone(str(finding.data["title"])) @@ -117,8 +140,10 @@ class TestDefectDojoAPI(unittest.TestCase): #print findings.data_json(pretty=True) self.assertTrue(findings.data["meta"]["total_count"]>0) + #Fails b/c of DojoAPI Issue def test_18_set_finding(self): - self.dd.set_finding(1, 1, 1, 1, title="API Finding Updates") + self.dd.set_finding(self.__class__.finding_id, self.__class__.product_id, self.__class__.engagement_id, + test_id, title="API Finding Updates") finding = self.dd.get_finding(1) #print test.data_json(pretty=True) self.assertEqual("API Finding Updates", finding.data['title']) @@ -126,11 +151,12 @@ class TestDefectDojoAPI(unittest.TestCase): #### Upload API TESTS #### def test_19_upload_scan(self): dir_path = os.path.dirname(os.path.realpath(__file__)) - engagement_id = 1 - upload_scan = self.dd.upload_scan(engagement_id, "Burp Scan", dir_path + "/scans/Bodgeit-burp.xml", - "true", "01/11/2016", "Burp Upload") - #print upload_scan.data_json(pretty=True) - self.assertEqual("Bodgeit-burp.xml", upload_scan.data['file']) + + date = datetime.now() + upload_scan = self.dd.upload_scan(self.__class__.engagement_id, "Burp Scan", dir_path + "/scans/Bodgeit-burp.xml", + "true", date.strftime("%Y/%m/%d"), "API") + + self.assertIsNotNone(upload_scan.id()) if __name__ == '__main__': unittest.main() -- GitLab