diff --git a/defectdojo_api/defectdojo.py b/defectdojo_api/defectdojo.py index 09b41bd5f44b8f69338e02de5383512f08e27796..ad73582000912c5735458649ff592ef0afc7b130 100644 --- a/defectdojo_api/defectdojo.py +++ b/defectdojo_api/defectdojo.py @@ -111,10 +111,11 @@ class DefectDojoAPI(object): return self._request('GET', 'users/' + str(user_id) + '/') ###### Engagements API ####### - def list_engagements(self, status=None, product_in=None,limit=20): + def list_engagements(self, status=None, product_in=None, name_contains=None,limit=20): """Retrieves all the engagements. :param product_in: List of product ids (1,2). + :param name_contains: Engagement name :param limit: Number of records to return. """ @@ -129,6 +130,9 @@ class DefectDojoAPI(object): if status: params['status'] = status + if name_contains: + params['name_contains'] = name_contains + return self._request('GET', 'engagements/', params) def get_engagement(self, engagement_id): @@ -404,7 +408,7 @@ class DefectDojoAPI(object): ###### Findings API ####### def list_findings(self, active=None, duplicate=None, mitigated=None, severity=None, verified=None, severity_lt=None, severity_gt=None, severity_contains=None, title_contains=None, url_contains=None, date_lt=None, - date_gt=None, date=None, product_id_in=None, engagement_id_in=None, test_id_in=None, limit=20): + date_gt=None, date=None, product_id_in=None, engagement_id_in=None, test_id_in=None, build=None, limit=20): """Returns filtered list of findings. @@ -424,6 +428,7 @@ class DefectDojoAPI(object): :param product_id_in: Product id(s) associated with a finding. (1,2 or 1) :param engagement_id_in: Engagement id(s) associated with a finding. (1,2 or 1) :param test_in: Test id(s) associated with a finding. (1,2 or 1) + :param build_id: User specified build id relating to the build number from the build server. (Jenkins, Travis etc.). :param limit: Number of records to return. """ @@ -480,6 +485,9 @@ class DefectDojoAPI(object): if test_id_in: params['test__id__in'] = test_id_in + if build: + params['build_id__contains'] = build + return self._request('GET', 'findings/', params) def get_finding(self, finding_id): @@ -490,7 +498,7 @@ class DefectDojoAPI(object): return self._request('GET', 'findings/' + str(finding_id) + '/') def create_finding(self, title, description, severity, cwe, date, product_id, engagement_id, test_id, user_id, - impact, active, verified, mitigation, references=None): + impact, active, verified, mitigation, references=None, build=None): """Creates a finding with the given properties. @@ -508,7 +516,7 @@ class DefectDojoAPI(object): :param verified: Finding has been verified. :param mitigation: Steps to mitigate the finding. :param references: Details on finding. - + :param build: User specified build id relating to the build number from the build server. (Jenkins, Travis etc.). """ data = { @@ -525,7 +533,8 @@ class DefectDojoAPI(object): 'active': active, 'verified': verified, 'mitigation': mitigation, - 'references': references + 'references': references, + 'build_id' : build } return self._request('POST', 'findings/', data=data) @@ -550,6 +559,7 @@ class DefectDojoAPI(object): :param verified: Finding has been verified. :param mitigation: Steps to mitigate the finding. :param references: Details on finding. + :param build: User specified build id relating to the build number from the build server. (Jenkins, Travis etc.). """ @@ -597,17 +607,25 @@ class DefectDojoAPI(object): if references: data['references'] = references + if build: + data['build_id'] = build + return self._request('PUT', 'findings/' + str(finding_id) + '/', data=data) ##### Upload API ##### - def upload_scan(self, engagement_id, scan_type, file, active, scan_date, tags=None): + def upload_scan(self, engagement_id, scan_type, file, active, scan_date, tags=None, build=None): """Uploads and processes a scan file. :param application_id: Application identifier. :param file_path: Path to the scan file to be uploaded. """ + if tags is None: + tags = '' + + if build is None: + build = '' data = { 'file': open(file, 'rb'), @@ -615,7 +633,8 @@ class DefectDojoAPI(object): 'scan_type': ('', scan_type), 'active': ('', active), 'scan_date': ('', scan_date), - 'tags': ('', tags) + 'tags': ('', tags), + 'build_id': ('', build) } return self._request( @@ -725,6 +744,9 @@ class DefectDojoResponse(object): raise ValueError('Object not created:' + json.dumps(self.data, sort_keys=True, indent=4, separators=(',', ': '))) return int(self.data) + def count(self): + return self.data["meta"]["total_count"] + def data_json(self, pretty=False): """Returns the data as a valid JSON string.""" if pretty: diff --git a/examples/dojo_ci_cd.py b/examples/dojo_ci_cd.py index f3f1e38da705dc5e37504e728a2a24fbe01938fd..ac70c2e1e275df9da67cac96788c3fc3bbbd8371 100644 --- a/examples/dojo_ci_cd.py +++ b/examples/dojo_ci_cd.py @@ -32,7 +32,7 @@ def print_findings(findings): print "Low: " + str(findings[1]) print "Info: " + str(findings[0]) -def create_findings(host, api_key, user, product_id, file, scanner, engagement_id=None, max_critical=0, max_high=0, max_medium=0): +def create_findings(host, api_key, user, product_id, file, scanner, engagement_id=None, max_critical=0, max_high=0, max_medium=0, build=None): #Optionally, specify a proxy proxies = { @@ -75,11 +75,11 @@ def create_findings(host, api_key, user, product_id, file, scanner, engagement_i "In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")) # Upload the scanner export - dir_path = os.path.dirname(os.path.realpath(__file__)) + #dir_path = os.path.dirname(os.path.realpath(__file__)) print "Uploading scanner data." date = datetime.now() - upload_scan = dd.upload_scan(engagement_id, scanner, dir_path + file, "true", date.strftime("%Y/%m/%d"), "API") + upload_scan = dd.upload_scan(engagement_id, scanner, file, "true", date.strftime("%Y-%m-%d"), build=build) if upload_scan.success: test_id = upload_scan.id() @@ -132,6 +132,7 @@ class Main: parser.add_argument('--product', help="Dojo Product ID", required=True) parser.add_argument('--file', help="Scanner file", required=True) parser.add_argument('--scanner', help="Type of scanner", required=True) + parser.add_argument('--build', help="Build ID", required=False) parser.add_argument('--engagement', help="Engagement ID (optional)", required=False) parser.add_argument('--critical', help="Maximum new critical vulns to pass the build.", required=False) parser.add_argument('--high', help="Maximum new high vulns to pass the build.", required=False) @@ -149,5 +150,6 @@ class Main: max_critical = args["critical"] max_high = args["high"] max_medium = args["medium"] + build = args["build"] - create_findings(host, api_key, user, product_id, file, scanner, engagement_id, max_critical, max_high, max_medium) + create_findings(host, api_key, user, product_id, file, scanner, engagement_id, max_critical, max_high, max_medium, build) diff --git a/examples/dojo_finding.py b/examples/dojo_finding.py new file mode 100644 index 0000000000000000000000000000000000000000..df573ee73b92caea4065281849e975b5ca16decc --- /dev/null +++ b/examples/dojo_finding.py @@ -0,0 +1,125 @@ +""" +Example written by Aaron Weaver <aaron.weaver@owasp.org> +as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects + +Description: Creates a manual finding in DefectDojo and returns information about the newly created finding +""" +from defectdojo_api import defectdojo +from datetime import datetime, timedelta +from random import randint +import os + +# Setup DefectDojo connection information +host = 'http://localhost:8000' +api_key = os.environ['DOJO_API_KEY'] +user = 'admin' +user_id = 1 #Default user + + +#Optionally, specify a proxy +proxies = { + 'http': 'http://localhost:8080', + 'https': 'http://localhost:8080', +} +""" +proxies=proxies +""" + +def create_finding_data(product_id, engagement_id, test_id, build): + cwe = [352, 22, 676, 863, 134, 759, 798] + cwe_desc = ['Cross-Site Request Forgery (CSRF)', 'Improper Limitation of a Pathname to a Restricted Directory (\'Path Traversal\')', + 'Use of Potentially Dangerous Function', 'Incorrect Authorization', 'Uncontrolled Format String', + 'Use of a One-Way Hash without a Salt', 'Use of Hard-coded Credentials'] + severity=['Low','Medium','High', 'Critical'] + user_id = 1 + finding_date = datetime.now() + finding_date = finding_date+timedelta(days=randint(-30,0)) + finding_cwe = randint(0,6) + + finding = dd.create_finding(cwe_desc[finding_cwe], cwe_desc[finding_cwe], severity[randint(0,3)], + cwe[finding_cwe], finding_date.strftime("%Y-%m-%d"), product_id, engagement_id, test_id, user_id, + "None", "true", "true", "References", build=build) + +# Instantiate the DefectDojo api wrapper +dd = defectdojo.DefectDojoAPI(host, api_key, user, debug=False, proxies=proxies) + +# Search and see if product exists so that we don't create multiple product entries +product_name = "Acme API Finding Demo" +products = dd.list_products(name_contains=product_name) +product_id = None + +if products.count() > 0: + for product in products.data["objects"]: + product_id = product['id'] +else: + # Create a product + prod_type = 1 #1 - Research and Development, product type + product = dd.create_product(product_name, "This is a detailed product description.", prod_type) + + # Get the product id + product_id = product.id() + print "Product successfully created with an id: " + str(product_id) + +# Retrieve the newly created product +product = dd.get_product(product_id) + +product_name = "Acme API Finding Demo" +engagement = dd.list_engagements(product_in=product_id, name_contains="Intial " + product_name + " Engagement") +engagement_id = None + +start_date = datetime.now() +end_date = start_date+timedelta(days=randint(2,8)) + +if engagement.count() > 0: + for engagement in engagement.data["objects"]: + engagement_id = engagement['id'] +else: + # Create an engagement + print "Creating engagement: " + "Intial " + product_name + " Engagement" + engagement = dd.create_engagement("Intial " + product_name + " Engagement", product_id, user_id, + "In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")) + engagement_id = engagement.id() + +print "Creating the test" +# Create Test +test_type = 5 #Web Test +environment = 3 #Production environment +test = dd.create_test(engagement_id, test_type, environment, +start_date.strftime("%Y-%m-%d"), start_date.strftime("%Y-%m-%d")) +test_id = test.id() + +print "Creating the finding" +build = "Jenkins-" + str(randint(100,999)) +# Create Finding +create_finding_data(product_id, engagement_id, test_id, build=build) + +print "Listing the new findings for this build" + +i = 0 +#Creating four tests +while i < 4: + test_type = i+1 #Select some random tests + environment = randint(1,6) #Select random environments + test = dd.create_test(engagement_id, test_type, environment, + start_date.strftime("%Y-%m-%d"), start_date.strftime("%Y-%m-%d")) + test_id = test.id() + + f = 0 + f_max = randint(2,4) + while f < f_max: + # Load findings + create_finding_data(product_id, engagement_id, test_id, build=build) + f = f + 1 + + i = i + 1 + +#Summarize the findings loaded +print "***************************************" +findings = dd.list_findings(build=build) +print "Build ID: " + build +print "Total Created: " + str(findings.count()) +print "***************************************" +print +if findings.count() > 0: + for finding in findings.data["objects"]: + print finding["title"] + ", Severity: " + finding["severity"]