Skip to content
Snippets Groups Projects
Commit 7e089076 authored by Aaron Weaver's avatar Aaron Weaver
Browse files

API Upload Example

parent 3b666e18
Branches
Tags 0.0.6
No related merge requests found
......@@ -9,7 +9,7 @@ from . import __version__ as version
class DefectDojoAPI(object):
"""An API wrapper for DefectDojo."""
def __init__(self, host, api_key, user, api_version='v1', verify_ssl=True, timeout=30, proxies=None, user_agent=None, cert=None, debug=False):
def __init__(self, host, api_key, user, api_version='v1', verify_ssl=True, timeout=60, proxies=None, user_agent=None, cert=None, debug=False):
"""Initialize a DefectDojo API instance.
:param host: The URL for the DefectDojo server. (e.g., http://localhost:8000/DefectDojo/)
......@@ -111,7 +111,7 @@ class DefectDojoAPI(object):
return self._request('GET', 'users/' + str(user_id) + '/')
###### Engagements API #######
def list_engagements(self, product_in=None,limit=20):
def list_engagements(self, status=None, product_in=None,limit=20):
"""Retrieves all the engagements.
:param product_in: List of product ids (1,2).
......@@ -126,6 +126,9 @@ class DefectDojoAPI(object):
if product_in:
params['product__in'] = product_in
if status:
params['status'] = status
return self._request('GET', 'engagements/', params)
def get_engagement(self, engagement_id):
......@@ -401,7 +404,7 @@ class DefectDojoAPI(object):
###### Findings API #######
def list_findings(self, active=None, duplicate=None, mitigated=None, severity=None, verified=None, severity_lt=None,
severity_gt=None, severity_contains=None, title_contains=None, url_contains=None, date_lt=None,
date_gt=None, date=None, product_id_in=None, engagement_id_in=None, test_in=None, limit=20):
date_gt=None, date=None, product_id_in=None, engagement_id_in=None, test_id_in=None, limit=20):
"""Returns filtered list of findings.
......@@ -474,8 +477,8 @@ class DefectDojoAPI(object):
if product_id_in:
params['product__id__in'] = product_id_in
if test_in:
params['test__in'] = test_in
if test_id_in:
params['test__id__in'] = test_id_in
return self._request('GET', 'findings/', params)
......@@ -598,7 +601,7 @@ class DefectDojoAPI(object):
##### Upload API #####
def upload_scan(self, engagement_id, scan_type, file_path, active, scan_date, tags):
def upload_scan(self, engagement_id, scan_type, file, active, scan_date, tags=None):
"""Uploads and processes a scan file.
:param application_id: Application identifier.
......@@ -607,8 +610,8 @@ class DefectDojoAPI(object):
"""
data = {
'file': open(file_path, 'rb'),
'eid': ('', str(engagement_id)),
'file': open(file, 'rb'),
'engagement': ('', self.get_engagement_uri(engagement_id)),
'scan_type': ('', scan_type),
'active': ('', active),
'scan_date': ('', scan_date),
......
"""
Example written by Aaron Weaver <aaron.weaver@owasp.org>
as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects
Description: CI/CD example for DefectDojo
"""
from defectdojo_api import defectdojo
from datetime import datetime, timedelta
import os
import argparse
# Setup DefectDojo connection information
host = 'http://localhost:8000'
api_key = os.environ['DOJO_API_KEY']
user = 'admin'
#Optionally, specify a proxy
proxies = {
'http': 'http://localhost:8080',
'https': 'http://localhost:8080',
}
"""
proxies=proxies
"""
def sum_severity(findings):
severity = [0,0,0,0,0]
for finding in findings.data["objects"]:
if finding["severity"] == "Critical":
severity[4] = severity[4] + 1
if finding["severity"] == "High":
severity[3] = severity[3] + 1
if finding["severity"] == "Medium":
severity[2] = severity[2] + 1
if finding["severity"] == "Info":
severity[1] = severity[1] + 1
return severity
def print_findings(findings):
print "Critical: " + str(findings[4])
print "High: " + str(findings[3])
print "Medium: " + str(findings[2])
print "Low: " + str(findings[1])
print "Info: " + str(findings[0])
def create_findings(product_id, user_id, file, scanner, engagement_id=None, max_critical=0, max_high=0, max_medium=0):
# Instantiate the DefectDojo api wrapper
dd = defectdojo.DefectDojoAPI(host, api_key, user, proxies=proxies, timeout=90, debug=False)
# Workflow as follows:
# 1. Scan tool is run against build
# 2. Reports is saved from scan tool
# 3. Call this script to load scan data, specifying scanner type
# 4. Script returns along with a pass or fail results: Example: 2 new critical vulns, 1 low out of 10 vulnerabilities
#Specify the product id
product_id = product_id
engagement_id = None
user_id = 1
# Check for a CI/CD engagement_id
engagements = dd.list_engagements(product_in=product_id, status="In Progress")
if engagements.success:
for engagement in engagements.data["objects"]:
if "Recurring CI/CD Integration" == engagement['name']:
engagement_id = engagement['id']
# Engagement doesn't exist, create it
if engagement_id == None:
start_date = datetime.now()
end_date = start_date+timedelta(days=180)
engagement_id = dd.create_engagement("Recurring CI/CD Integration", product_id, user_id,
"In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))
# Upload the scanner export
dir_path = os.path.dirname(os.path.realpath(__file__))
print "Uploading scanner data."
date = datetime.now()
upload_scan = dd.upload_scan(engagement_id, scanner, dir_path + file, "true", date.strftime("%Y/%m/%d"), "API")
if upload_scan.success:
test_id = upload_scan.id()
else:
print upload_scan.message
findings = dd.list_findings(engagement_id_in=engagement_id, duplicate="false", active="true", verified="true")
print"=============================================="
print "Total Number of Vulnerabilities: " + str(findings.data["meta"]["total_count"])
print"=============================================="
print_findings(sum_severity(findings))
print
findings = dd.list_findings(test_id_in=test_id, duplicate="true")
print"=============================================="
print "Total Number of Duplicate Findings: " + str(findings.data["meta"]["total_count"])
print"=============================================="
print_findings(sum_severity(findings))
print
findings = dd.list_findings(test_id_in=test_id, duplicate="false")
print"=============================================="
print "Total Number of New Findings: " + str(findings.data["meta"]["total_count"])
print"=============================================="
sum_new_findings = sum_severity(findings)
print_findings(sum_new_findings)
print
print"=============================================="
if sum_new_findings[4] > max_critical:
print "Build Failed: Max Critical"
elif sum_new_findings[3] > max_high:
print "Build Failed: Max High"
elif sum_new_findings[2] > max_medium:
print "Build Failed: Max Medium"
else:
print "Build Passed!"
print"=============================================="
class Main:
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='CI/CD integration for DefectDojo')
parser.add_argument('--user', help="Dojo Product ID", required=True)
parser.add_argument('--product', help="Dojo Product ID", required=True)
parser.add_argument('--file', help="Scanner file", required=True)
parser.add_argument('--scanner', help="Type of scanner", required=True)
parser.add_argument('--engagement', help="Engagement ID (optional)", required=False)
parser.add_argument('--critical', default=0, help="Maximum new critical vulns to pass the build.", required=False)
parser.add_argument('--high', default=0, help="Maximum new high vulns to pass the build.", required=False)
parser.add_argument('--medium', default=0, help="Maximum new medium vulns to pass the build.", required=False)
#Parse out arguments
args = vars(parser.parse_args())
user_id = args["user"]
product_id = args["product"]
file = args["file"]
scanner = args["scanner"]
engagement_id = args["engagement"]
max_critical = args["critical"]
max_high = args["high"]
max_medium = args["medium"]
create_findings(product_id, user_id, file, scanner, engagement_id, max_critical, max_high, max_medium)
"""
Example written by Aaron Weaver <aaron.weaver@owasp.org>
as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects
Description: Imports test data into DefectDojo and creates products,
engagements and tests along with findings.
"""
from defectdojo_api import defectdojo
from random import randint
import os
from datetime import datetime, timedelta
"""
Imports test data into Defect DefectDojo
"""
# Setup DefectDojo connection information
host = 'http://localhost:8000'
api_key = os.environ['DOJO_API_KEY']
......@@ -66,8 +69,9 @@ def create_load_data(product_name, product_desc, file=None, file_test_type=None)
if file is not None:
print "Loading scanner results from scanner export"
dir_path = os.path.dirname(os.path.realpath(__file__))
upload_scan = dd.upload_scan(engagement_id, "Burp Scan", dir_path + file,
"true", "01/11/2016", "API")
date = datetime.now()
upload_scan = dd.upload_scan(engagement_id, file_test_type, dir_path + file,
"true", date.strftime("%Y/%m/%d"), "API")
i = 0
while i < 6:
......@@ -89,7 +93,7 @@ def create_load_data(product_name, product_desc, file=None, file_test_type=None)
print product.message
##### Create Products, Engagements and Tests ########
create_load_data("BodgeIt", "Product description.", "../tests/scans/Bodgeit-burp.xml", "Burp Scan")
create_load_data("BodgeIt", "Product description.", "/tests/scans/Bodgeit-burp.xml", "Burp Scan")
create_load_data("A CRM App", "Product description.")
create_load_data("An Engineering Application", "Product description.")
create_load_data("A Marketing Site", "Product description.")
"""
Example written by Aaron Weaver <aaron.weaver@owasp.org>
as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects
Description: Creates a product in DefectDojo and returns information about the newly created product
"""
from defectdojo_api import defectdojo
import os
......
"""
UnitTests written by Aaron Weaver <aaron.weaver@owasp.org>
as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects
Description: Tests the functionality of the DefectDojo API.
"""
from defectdojo_api import defectdojo
import unittest
import os
from datetime import datetime
class TestDefectDojoAPI(unittest.TestCase):
......@@ -10,12 +17,14 @@ class TestDefectDojoAPI(unittest.TestCase):
api_key = os.environ['DOJO_API_KEY']
user = 'admin'
"""
proxies = {
'http': 'http://localhost:8080',
'https': 'http://localhost:8080',
}
self.dd = defectdojo.DefectDojoAPI(host, api_key, user, proxies=proxies, debug=False)
proxies=proxies
"""
self.dd = defectdojo.DefectDojoAPI(host, api_key, user, debug=False)
#### USER API TESTS ####
def test_01_get_user(self):
......@@ -31,16 +40,19 @@ class TestDefectDojoAPI(unittest.TestCase):
#### Product API TESTS ####
def test_03_create_product(self):
product = self.dd.create_product("API Product Test", "Description", 1)
self.__class__.product_id = product.id()
self.assertIsNotNone(product.id())
def test_04_get_product(self):
product = self.dd.get_product(1)
product = self.dd.get_product(self.__class__.product_id)
#print product.data_json(pretty=True)
self.assertIsNotNone(product.data['name'])
def test_05_set_product(self):
self.dd.set_product(1, name="Product Update Test")
product = self.dd.get_product(1)
product = self.dd.create_product("API Product Test", "Description", 1)
new_product_id = product.id()
self.dd.set_product(new_product_id, name="Product Update Test")
product = self.dd.get_product(new_product_id)
#print product.data_json(pretty=True)
self.assertEqual("Product Update Test", product.data['name'])
......@@ -52,13 +64,14 @@ class TestDefectDojoAPI(unittest.TestCase):
#### Engagement API TESTS ####
def test_07_create_engagement(self):
product_id = 1
product_id = self.__class__.product_id
user_id = 1
engagement = self.dd.create_engagement("API Engagement", product_id, user_id, "In Progress", "2016-11-01", "2016-12-01")
self.__class__.engagement_id = engagement.id()
self.assertIsNotNone(engagement.id())
def test_08_get_engagement(self):
engagement = self.dd.get_engagement(1)
engagement = self.dd.get_engagement(self.__class__.engagement_id)
#print engagement.data_json(pretty=True)
self.assertIsNotNone(str(engagement.data["name"]))
......@@ -67,9 +80,15 @@ class TestDefectDojoAPI(unittest.TestCase):
#print engagements.data_json(pretty=True)
self.assertTrue(engagements.data["meta"]["total_count"]>0)
#Note: Fails b/c of issue with DefectDojo's API
def test_10_set_engagement(self):
self.dd.set_engagement(1, name="Engagement Update Test")
engagement = self.dd.get_engagement(1)
user_id = 1
product_id = self.__class__.product_id
engagement = self.dd.create_engagement("API Engagement", product_id, user_id, "In Progress", "2016-11-01", "2016-12-01")
new_engagement_id = engagement.id()
self.dd.set_engagement(new_engagement_id, name="Engagement Update Test")
engagement = self.dd.get_engagement(new_engagement_id)
#print engagement.data_json(pretty=True)
self.assertEqual("Engagement Update Test", engagement.data['name'])
......@@ -79,10 +98,11 @@ class TestDefectDojoAPI(unittest.TestCase):
test_type = 1 #1 is the API Test
environment = 1 #1 is the Development Environment
test = self.dd.create_test(engagement_id, test_type, environment, "2016-11-01", "2016-12-01")
self.__class__.test_id = test.id()
self.assertIsNotNone(test.id())
def test_12_get_test(self):
test = self.dd.get_test(1)
test = self.dd.get_test(self.__class__.test_id)
#print test.data_json(pretty=True)
self.assertIsNotNone(str(test.data["engagement"]))
......@@ -92,23 +112,26 @@ class TestDefectDojoAPI(unittest.TestCase):
self.assertTrue(tests.data["meta"]["total_count"]>0)
def test_14_set_test(self):
self.dd.set_test(1, percent_complete="99")
test = self.dd.get_test(1)
self.dd.set_test(self.__class__.test_id, percent_complete="99")
test = self.dd.get_test(self.__class__.test_id)
#print test.data_json(pretty=True)
self.assertEqual(99, test.data['percent_complete'])
#### Findings API TESTS ####
#Fails b/c of DojoAPI Issue
def test_15_create_finding(self):
cwe = 25
product_id = 1
engagement_id = 1
test_id = 1
product_id = self.__class__.product_id
engagement_id = self.__class__.engagement_id
test_id = self.__class__.test_id
user_id = 1
finding = self.dd.create_finding("API Created", "Description", "Critical", cwe, "2016-11-01", product_id, engagement_id, test_id, user_id, "None", "true", "false", "References")
self.__class__.finding_id = finding.id()
self.assertIsNotNone(finding.id())
#Fails b/c of DojoAPI Issue
def test_16_get_finding(self):
finding = self.dd.get_finding(1)
finding = self.dd.get_finding(self.__class__.finding_id)
#print finding.data_json(pretty=True)
self.assertIsNotNone(str(finding.data["title"]))
......@@ -117,8 +140,10 @@ class TestDefectDojoAPI(unittest.TestCase):
#print findings.data_json(pretty=True)
self.assertTrue(findings.data["meta"]["total_count"]>0)
#Fails b/c of DojoAPI Issue
def test_18_set_finding(self):
self.dd.set_finding(1, 1, 1, 1, title="API Finding Updates")
self.dd.set_finding(self.__class__.finding_id, self.__class__.product_id, self.__class__.engagement_id,
test_id, title="API Finding Updates")
finding = self.dd.get_finding(1)
#print test.data_json(pretty=True)
self.assertEqual("API Finding Updates", finding.data['title'])
......@@ -126,11 +151,12 @@ class TestDefectDojoAPI(unittest.TestCase):
#### Upload API TESTS ####
def test_19_upload_scan(self):
dir_path = os.path.dirname(os.path.realpath(__file__))
engagement_id = 1
upload_scan = self.dd.upload_scan(engagement_id, "Burp Scan", dir_path + "/scans/Bodgeit-burp.xml",
"true", "01/11/2016", "Burp Upload")
#print upload_scan.data_json(pretty=True)
self.assertEqual("Bodgeit-burp.xml", upload_scan.data['file'])
date = datetime.now()
upload_scan = self.dd.upload_scan(self.__class__.engagement_id, "Burp Scan", dir_path + "/scans/Bodgeit-burp.xml",
"true", date.strftime("%Y/%m/%d"), "API")
self.assertIsNotNone(upload_scan.id())
if __name__ == '__main__':
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment