Skip to content
Snippets Groups Projects
Commit 9378e493 authored by Aaron Weaver's avatar Aaron Weaver
Browse files

Example Updates

parent e48f63b1
No related branches found
No related tags found
No related merge requests found
......@@ -111,10 +111,11 @@ class DefectDojoAPI(object):
return self._request('GET', 'users/' + str(user_id) + '/')
###### Engagements API #######
def list_engagements(self, status=None, product_in=None,limit=20):
def list_engagements(self, status=None, product_in=None, name_contains=None,limit=20):
"""Retrieves all the engagements.
:param product_in: List of product ids (1,2).
:param name_contains: Engagement name
:param limit: Number of records to return.
"""
......@@ -129,6 +130,9 @@ class DefectDojoAPI(object):
if status:
params['status'] = status
if name_contains:
params['name_contains'] = name_contains
return self._request('GET', 'engagements/', params)
def get_engagement(self, engagement_id):
......@@ -404,7 +408,7 @@ class DefectDojoAPI(object):
###### Findings API #######
def list_findings(self, active=None, duplicate=None, mitigated=None, severity=None, verified=None, severity_lt=None,
severity_gt=None, severity_contains=None, title_contains=None, url_contains=None, date_lt=None,
date_gt=None, date=None, product_id_in=None, engagement_id_in=None, test_id_in=None, limit=20):
date_gt=None, date=None, product_id_in=None, engagement_id_in=None, test_id_in=None, build=None, limit=20):
"""Returns filtered list of findings.
......@@ -424,6 +428,7 @@ class DefectDojoAPI(object):
:param product_id_in: Product id(s) associated with a finding. (1,2 or 1)
:param engagement_id_in: Engagement id(s) associated with a finding. (1,2 or 1)
:param test_in: Test id(s) associated with a finding. (1,2 or 1)
:param build_id: User specified build id relating to the build number from the build server. (Jenkins, Travis etc.).
:param limit: Number of records to return.
"""
......@@ -480,6 +485,9 @@ class DefectDojoAPI(object):
if test_id_in:
params['test__id__in'] = test_id_in
if build:
params['build_id__contains'] = build
return self._request('GET', 'findings/', params)
def get_finding(self, finding_id):
......@@ -490,7 +498,7 @@ class DefectDojoAPI(object):
return self._request('GET', 'findings/' + str(finding_id) + '/')
def create_finding(self, title, description, severity, cwe, date, product_id, engagement_id, test_id, user_id,
impact, active, verified, mitigation, references=None):
impact, active, verified, mitigation, references=None, build=None):
"""Creates a finding with the given properties.
......@@ -508,7 +516,7 @@ class DefectDojoAPI(object):
:param verified: Finding has been verified.
:param mitigation: Steps to mitigate the finding.
:param references: Details on finding.
:param build: User specified build id relating to the build number from the build server. (Jenkins, Travis etc.).
"""
data = {
......@@ -525,7 +533,8 @@ class DefectDojoAPI(object):
'active': active,
'verified': verified,
'mitigation': mitigation,
'references': references
'references': references,
'build_id' : build
}
return self._request('POST', 'findings/', data=data)
......@@ -550,6 +559,7 @@ class DefectDojoAPI(object):
:param verified: Finding has been verified.
:param mitigation: Steps to mitigate the finding.
:param references: Details on finding.
:param build: User specified build id relating to the build number from the build server. (Jenkins, Travis etc.).
"""
......@@ -597,17 +607,25 @@ class DefectDojoAPI(object):
if references:
data['references'] = references
if build:
data['build_id'] = build
return self._request('PUT', 'findings/' + str(finding_id) + '/', data=data)
##### Upload API #####
def upload_scan(self, engagement_id, scan_type, file, active, scan_date, tags=None):
def upload_scan(self, engagement_id, scan_type, file, active, scan_date, tags=None, build=None):
"""Uploads and processes a scan file.
:param application_id: Application identifier.
:param file_path: Path to the scan file to be uploaded.
"""
if tags is None:
tags = ''
if build is None:
build = ''
data = {
'file': open(file, 'rb'),
......@@ -615,7 +633,8 @@ class DefectDojoAPI(object):
'scan_type': ('', scan_type),
'active': ('', active),
'scan_date': ('', scan_date),
'tags': ('', tags)
'tags': ('', tags),
'build_id': ('', build)
}
return self._request(
......@@ -725,6 +744,9 @@ class DefectDojoResponse(object):
raise ValueError('Object not created:' + json.dumps(self.data, sort_keys=True, indent=4, separators=(',', ': ')))
return int(self.data)
def count(self):
return self.data["meta"]["total_count"]
def data_json(self, pretty=False):
"""Returns the data as a valid JSON string."""
if pretty:
......
......@@ -32,7 +32,7 @@ def print_findings(findings):
print "Low: " + str(findings[1])
print "Info: " + str(findings[0])
def create_findings(host, api_key, user, product_id, file, scanner, engagement_id=None, max_critical=0, max_high=0, max_medium=0):
def create_findings(host, api_key, user, product_id, file, scanner, engagement_id=None, max_critical=0, max_high=0, max_medium=0, build=None):
#Optionally, specify a proxy
proxies = {
......@@ -75,11 +75,11 @@ def create_findings(host, api_key, user, product_id, file, scanner, engagement_i
"In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))
# Upload the scanner export
dir_path = os.path.dirname(os.path.realpath(__file__))
#dir_path = os.path.dirname(os.path.realpath(__file__))
print "Uploading scanner data."
date = datetime.now()
upload_scan = dd.upload_scan(engagement_id, scanner, dir_path + file, "true", date.strftime("%Y/%m/%d"), "API")
upload_scan = dd.upload_scan(engagement_id, scanner, file, "true", date.strftime("%Y-%m-%d"), build=build)
if upload_scan.success:
test_id = upload_scan.id()
......@@ -132,6 +132,7 @@ class Main:
parser.add_argument('--product', help="Dojo Product ID", required=True)
parser.add_argument('--file', help="Scanner file", required=True)
parser.add_argument('--scanner', help="Type of scanner", required=True)
parser.add_argument('--build', help="Build ID", required=False)
parser.add_argument('--engagement', help="Engagement ID (optional)", required=False)
parser.add_argument('--critical', help="Maximum new critical vulns to pass the build.", required=False)
parser.add_argument('--high', help="Maximum new high vulns to pass the build.", required=False)
......@@ -149,5 +150,6 @@ class Main:
max_critical = args["critical"]
max_high = args["high"]
max_medium = args["medium"]
build = args["build"]
create_findings(host, api_key, user, product_id, file, scanner, engagement_id, max_critical, max_high, max_medium)
create_findings(host, api_key, user, product_id, file, scanner, engagement_id, max_critical, max_high, max_medium, build)
"""
Example written by Aaron Weaver <aaron.weaver@owasp.org>
as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects
Description: Creates a manual finding in DefectDojo and returns information about the newly created finding
"""
from defectdojo_api import defectdojo
from datetime import datetime, timedelta
from random import randint
import os
# Setup DefectDojo connection information
host = 'http://localhost:8000'
api_key = os.environ['DOJO_API_KEY']
user = 'admin'
user_id = 1 #Default user
#Optionally, specify a proxy
proxies = {
'http': 'http://localhost:8080',
'https': 'http://localhost:8080',
}
"""
proxies=proxies
"""
def create_finding_data(product_id, engagement_id, test_id, build):
cwe = [352, 22, 676, 863, 134, 759, 798]
cwe_desc = ['Cross-Site Request Forgery (CSRF)', 'Improper Limitation of a Pathname to a Restricted Directory (\'Path Traversal\')',
'Use of Potentially Dangerous Function', 'Incorrect Authorization', 'Uncontrolled Format String',
'Use of a One-Way Hash without a Salt', 'Use of Hard-coded Credentials']
severity=['Low','Medium','High', 'Critical']
user_id = 1
finding_date = datetime.now()
finding_date = finding_date+timedelta(days=randint(-30,0))
finding_cwe = randint(0,6)
finding = dd.create_finding(cwe_desc[finding_cwe], cwe_desc[finding_cwe], severity[randint(0,3)],
cwe[finding_cwe], finding_date.strftime("%Y-%m-%d"), product_id, engagement_id, test_id, user_id,
"None", "true", "true", "References", build=build)
# Instantiate the DefectDojo api wrapper
dd = defectdojo.DefectDojoAPI(host, api_key, user, debug=False, proxies=proxies)
# Search and see if product exists so that we don't create multiple product entries
product_name = "Acme API Finding Demo"
products = dd.list_products(name_contains=product_name)
product_id = None
if products.count() > 0:
for product in products.data["objects"]:
product_id = product['id']
else:
# Create a product
prod_type = 1 #1 - Research and Development, product type
product = dd.create_product(product_name, "This is a detailed product description.", prod_type)
# Get the product id
product_id = product.id()
print "Product successfully created with an id: " + str(product_id)
# Retrieve the newly created product
product = dd.get_product(product_id)
product_name = "Acme API Finding Demo"
engagement = dd.list_engagements(product_in=product_id, name_contains="Intial " + product_name + " Engagement")
engagement_id = None
start_date = datetime.now()
end_date = start_date+timedelta(days=randint(2,8))
if engagement.count() > 0:
for engagement in engagement.data["objects"]:
engagement_id = engagement['id']
else:
# Create an engagement
print "Creating engagement: " + "Intial " + product_name + " Engagement"
engagement = dd.create_engagement("Intial " + product_name + " Engagement", product_id, user_id,
"In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))
engagement_id = engagement.id()
print "Creating the test"
# Create Test
test_type = 5 #Web Test
environment = 3 #Production environment
test = dd.create_test(engagement_id, test_type, environment,
start_date.strftime("%Y-%m-%d"), start_date.strftime("%Y-%m-%d"))
test_id = test.id()
print "Creating the finding"
build = "Jenkins-" + str(randint(100,999))
# Create Finding
create_finding_data(product_id, engagement_id, test_id, build=build)
print "Listing the new findings for this build"
i = 0
#Creating four tests
while i < 4:
test_type = i+1 #Select some random tests
environment = randint(1,6) #Select random environments
test = dd.create_test(engagement_id, test_type, environment,
start_date.strftime("%Y-%m-%d"), start_date.strftime("%Y-%m-%d"))
test_id = test.id()
f = 0
f_max = randint(2,4)
while f < f_max:
# Load findings
create_finding_data(product_id, engagement_id, test_id, build=build)
f = f + 1
i = i + 1
#Summarize the findings loaded
print "***************************************"
findings = dd.list_findings(build=build)
print "Build ID: " + build
print "Total Created: " + str(findings.count())
print "***************************************"
print
if findings.count() > 0:
for finding in findings.data["objects"]:
print finding["title"] + ", Severity: " + finding["severity"]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment