Skip to content
Snippets Groups Projects
Commit ace67c9c authored by Aaron Weaver's avatar Aaron Weaver
Browse files

Update API to support Dojo changes and CI/CD example

parent c44ff888
No related branches found
No related tags found
No related merge requests found
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# dotenv
.env
# virtualenv
.venv
venv/
ENV/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
# Config file
config/config.json
test.py
...@@ -13,6 +13,7 @@ Several quick start options are available: ...@@ -13,6 +13,7 @@ Several quick start options are available:
- Install with pip (recommended): :code:`pip install defectdojo_api` - Install with pip (recommended): :code:`pip install defectdojo_api`
- `Download the latest release <https://github.com/aaronweaver/defectdojo_api/releases/latest>`_ - `Download the latest release <https://github.com/aaronweaver/defectdojo_api/releases/latest>`_
- Clone the repository: :code:`git clone https://github.com/aaronweaver/defectdojo_api` - Clone the repository: :code:`git clone https://github.com/aaronweaver/defectdojo_api`
- If you are testing the api locally make sure to set the PYTHONPATH. export PYTHONPATH=/path/totheapi/defectdojo_api:$PYTHONPATH
Example Example
------- -------
......
__version__ = '0.1.2' __version__ = '1.0.0'
File added
...@@ -153,9 +153,9 @@ class DefectDojoAPI(object): ...@@ -153,9 +153,9 @@ class DefectDojoAPI(object):
""" """
return self._request('GET', 'engagements/' + str(engagement_id) + '/') return self._request('GET', 'engagements/' + str(engagement_id) + '/')
def create_engagement(self, name, product_id, lead_id, status, target_start, target_end, active='true', def create_engagement(self, name, product_id, lead_id, status, target_start, target_end, active='True',
pen_test='false', check_list='false', threat_model='false', risk_path="", test_strategy="", progress="", pen_test='False', check_list='False', threat_model='False', risk_path="", test_strategy="", progress="",
done_testing=""): done_testing='False'):
"""Creates an engagement with the given properties. """Creates an engagement with the given properties.
:param name: Engagement name. :param name: Engagement name.
......
File added
File added
...@@ -6,82 +6,147 @@ Description: CI/CD example for DefectDojo ...@@ -6,82 +6,147 @@ Description: CI/CD example for DefectDojo
""" """
from defectdojo_api import defectdojo from defectdojo_api import defectdojo
from datetime import datetime, timedelta from datetime import datetime, timedelta
import os import os, sys
import argparse import argparse
import time
import junit_xml_output
DEBUG = False DEBUG = True
def sum_severity(findings): test_cases = []
severity = [0,0,0,0,0]
for finding in findings.data["objects"]:
if finding["severity"] == "Critical":
severity[4] = severity[4] + 1
if finding["severity"] == "High":
severity[3] = severity[3] + 1
if finding["severity"] == "Medium":
severity[2] = severity[2] + 1
if finding["severity"] == "Low":
severity[1] = severity[1] + 1
if finding["severity"] == "Info":
severity[0] = severity[0] + 1
return severity def junit(toolName, file):
def print_findings(findings): junit_xml = junit_xml_output.JunitXml(toolName, test_cases, total_tests=None, total_failures=None)
print "Critical: " + str(findings[4]) with open(file, 'w') as file:
print "High: " + str(findings[3]) print "Writing Junit test files"
print "Medium: " + str(findings[2]) file.write(junit_xml.dump())
print "Low: " + str(findings[1])
print "Info: " + str(findings[0])
def create_findings(host, api_key, user, product_id, file, scanner, engagement_id=None, max_critical=0, max_high=0, max_medium=0, build=None):
def dojo_connection(host, api_key, user):
#Optionally, specify a proxy #Optionally, specify a proxy
proxies = { proxies = {
'http': 'http://localhost:8080', 'http': 'http://localhost:8081',
'https': 'http://localhost:8080', 'https': 'http://localhost:8081',
} }
if DEBUG: #if DEBUG:
# Instantiate the DefectDojo api wrapper # Instantiate the DefectDojo api wrapper
dd = defectdojo.DefectDojoAPI(host, api_key, user, proxies=proxies, verify_ssl=False, timeout=360, debug=False) dd = defectdojo.DefectDojoAPI(host, api_key, user, proxies=proxies, verify_ssl=False, timeout=360, debug=False)
else: #else:
dd = defectdojo.DefectDojoAPI(host, api_key, user, verify_ssl=False, timeout=360, debug=False) # dd = defectdojo.DefectDojoAPI(host, api_key, user, verify_ssl=False, timeout=360, debug=False)
return dd
# Workflow as follows: # Workflow as follows:
# 1. Scan tool is run against build # 1. Scan tool is run against build
# 2. Reports is saved from scan tool # 2. Reports is saved from scan tool
# 3. Call this script to load scan data, specifying scanner type # 3. Call this script to load scan data, specifying scanner type
# 4. Script returns along with a pass or fail results: Example: 2 new critical vulns, 1 low out of 10 vulnerabilities # 4. Script returns along with a pass or fail results: Example: 2 new critical vulns, 1 low out of 10 vulnerabilities
def return_engagement(dd, product_id):
#Specify the product id #Specify the product id
product_id = product_id product_id = product_id
engagement_id = None
# Check for a CI/CD engagement_id # Check for a CI/CD engagement_id
engagements = dd.list_engagements(product_in=product_id, status="In Progress") engagements = dd.list_engagements(product_in=product_id, status="In Progress")
if engagements.success: if engagements.success:
for engagement in engagements.data["objects"]: for engagement in engagements.data["objects"]:
if "Recurring CI/CD Integration" == engagement['name']: if "Recurring CI/CD Integration" == engagement['name']:
engagement_id = engagement['id'] engagement_id = engagement['id']
# Engagement doesn't exist, create it
if engagement_id == None: if engagement_id == None:
start_date = datetime.now() start_date = datetime.now()
end_date = start_date+timedelta(days=180) end_date = start_date+timedelta(days=180)
users = dd.list_users("admin") users = dd.list_users(user)
user_id = None user_id = None
if users.success: if users.success:
user_id = users.data["objects"][0]["id"] user_id = users.data["objects"][0]["id"]
engagement_id = dd.create_engagement("Recurring CI/CD Integration", product_id, user_id,
engagement_id = dd.create_engagement("Recurring CI/CD Integration", product_id, str(user_id),
"In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")) "In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))
return engagement_id
# Upload the scanner export def process_findings(dd, engagement_id, dir, build=None):
#dir_path = os.path.dirname(os.path.realpath(__file__)) test_ids = []
for root, dirs, files in os.walk(dir):
for name in files:
file = os.path.join(os.getcwd(),root, name)
test_id = processFiles(dd, engagement_id, file)
if test_id is not None:
test_ids.append(str(test_id))
return ','.join(test_ids)
def processFiles(dd, engagement_id, file, scanner=None, build=None):
upload_scan = None
scannerName = None
path=os.path.dirname(file)
name = os.path.basename(file)
tool = os.path.basename(path)
tool = tool.lower()
test_id = None
date = datetime.now()
dojoDate = date.strftime("%Y-%m-%d")
#Tools without an importer in Dojo; attempted to import as generic
if "generic" in name:
scanner = "Generic Findings Import"
if tool == "nikto":
print "Uploading nikto scan: " + file
test_id = dd.upload_scan(engagement_id, scanner, file, "true", dojoDate, build)
elif tool == "bandit":
print "Uploading bandit scan: " + file
test_id = dd.upload_scan(engagement_id, scanner, file, "true", dojoDate, build)
else:
if tool == "burp":
scannerName = "Burp Scan"
elif tool == "nessus":
scannerName = "Nessus Scan"
elif tool == "nmap":
scannerName = "Nmap Scan"
elif tool == "nexpose":
scannerName = "Nexpose Scan"
elif tool == "veracode":
scannerName = "Veracode Scan"
elif tool == "checkmarx":
scannerName = "Checkmarx Scan"
elif tool == "zap":
scannerName = "ZAP Scan"
elif tool == "appspider":
scannerName = "AppSpider Scan"
elif tool == "Arachni Scan":
scannerName = "Arachni Scan"
elif tool == "vcg":
scannerName = "VCG Scan"
elif tool == "dependency":
scannerName = "Dependency Check Scan"
elif tool == "retirejs":
scannerName = "Retire.js Scan"
elif tool == "nodesecurity":
scannerName = "Node Security Platform Scan"
elif tool == "qualys":
scannerName = "Qualys Scan"
elif tool == "qualyswebapp":
scannerName = "Qualys Webapp Scan"
elif tool == "openvas":
scannerName = "OpenVAS CSV"
elif tool == "snyk":
scannerName = "Snyk Scan"
if scannerName is not None:
print "Uploading " + scannerName + " scan: " + file
test_id = dd.upload_scan(engagement_id, scannerName, file, "true", dojoDate, build)
return test_id
#print os.path.basename(full_path)
def create_findings(dd, engagement_id, scanner, file, build=None):
# Upload the scanner export
if engagement_id > 0:
print "Uploading scanner data." print "Uploading scanner data."
date = datetime.now() date = datetime.now()
print scanner
upload_scan = dd.upload_scan(engagement_id, scanner, file, "true", date.strftime("%Y-%m-%d"), build=build) upload_scan = dd.upload_scan(engagement_id, scanner, file, "true", date.strftime("%Y-%m-%d"), build=build)
if upload_scan.success: if upload_scan.success:
...@@ -90,20 +155,36 @@ def create_findings(host, api_key, user, product_id, file, scanner, engagement_i ...@@ -90,20 +155,36 @@ def create_findings(host, api_key, user, product_id, file, scanner, engagement_i
print upload_scan.message print upload_scan.message
quit() quit()
def summary(dd, engagement_id, test_ids, max_critical=0, max_high=0, max_medium=0):
findings = dd.list_findings(engagement_id_in=engagement_id, duplicate="false", active="true", verified="true") findings = dd.list_findings(engagement_id_in=engagement_id, duplicate="false", active="true", verified="true")
print"==============================================" print"=============================================="
print "Total Number of Vulnerabilities: " + str(findings.data["meta"]["total_count"]) print "Total Number of Vulnerabilities: " + str(findings.data["meta"]["total_count"])
print"==============================================" print"=============================================="
print_findings(sum_severity(findings)) print_findings(sum_severity(findings))
print print
findings = dd.list_findings(test_id_in=test_id, duplicate="true") findings = dd.list_findings(test_id_in=test_ids, duplicate="true")
print"==============================================" print"=============================================="
print "Total Number of Duplicate Findings: " + str(findings.data["meta"]["total_count"]) print "Total Number of Duplicate Findings: " + str(findings.data["meta"]["total_count"])
print"==============================================" print"=============================================="
print_findings(sum_severity(findings)) print_findings(sum_severity(findings))
print print
findings = dd.list_findings(test_id_in=test_id, duplicate="false") #Delay while de-dupes
print"==============================================" sys.stdout.write("Sleeping for 30 seconds for de-dupe celery process:")
sys.stdout.flush()
for i in range(15):
time.sleep(2)
sys.stdout.write(".")
sys.stdout.flush()
findings = dd.list_findings(test_id_in=test_ids, duplicate="false", limit=500)
if findings.count() > 0:
for finding in findings.data["objects"]:
test_cases.append(junit_xml_output.TestCase(finding["title"] + " Severity: " + finding["severity"], finding["description"],"failure"))
if not os.path.exists("reports"):
os.mkdir("reports")
junit("DefectDojo", "reports/junit_dojo.xml")
print"\n=============================================="
print "Total Number of New Findings: " + str(findings.data["meta"]["total_count"]) print "Total Number of New Findings: " + str(findings.data["meta"]["total_count"])
print"==============================================" print"=============================================="
sum_new_findings = sum_severity(findings) sum_new_findings = sum_severity(findings)
...@@ -111,7 +192,7 @@ def create_findings(host, api_key, user, product_id, file, scanner, engagement_i ...@@ -111,7 +192,7 @@ def create_findings(host, api_key, user, product_id, file, scanner, engagement_i
print print
print"==============================================" print"=============================================="
strFail = None strFail = ""
if max_critical is not None: if max_critical is not None:
if sum_new_findings[4] > max_critical: if sum_new_findings[4] > max_critical:
strFail = "Build Failed: Max Critical" strFail = "Build Failed: Max Critical"
...@@ -127,6 +208,29 @@ def create_findings(host, api_key, user, product_id, file, scanner, engagement_i ...@@ -127,6 +208,29 @@ def create_findings(host, api_key, user, product_id, file, scanner, engagement_i
print "Build Failed: " + strFail print "Build Failed: " + strFail
print"==============================================" print"=============================================="
def sum_severity(findings):
severity = [0,0,0,0,0]
for finding in findings.data["objects"]:
if finding["severity"] == "Critical":
severity[4] = severity[4] + 1
if finding["severity"] == "High":
severity[3] = severity[3] + 1
if finding["severity"] == "Medium":
severity[2] = severity[2] + 1
if finding["severity"] == "Low":
severity[1] = severity[1] + 1
if finding["severity"] == "Info":
severity[0] = severity[0] + 1
return severity
def print_findings(findings):
print "Critical: " + str(findings[4])
print "High: " + str(findings[3])
print "Medium: " + str(findings[2])
print "Low: " + str(findings[1])
print "Info: " + str(findings[0])
class Main: class Main:
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description='CI/CD integration for DefectDojo') parser = argparse.ArgumentParser(description='CI/CD integration for DefectDojo')
...@@ -134,8 +238,9 @@ class Main: ...@@ -134,8 +238,9 @@ class Main:
parser.add_argument('--api_key', help="API Key", required=True) parser.add_argument('--api_key', help="API Key", required=True)
parser.add_argument('--user', help="User", required=True) parser.add_argument('--user', help="User", required=True)
parser.add_argument('--product', help="Dojo Product ID", required=True) parser.add_argument('--product', help="Dojo Product ID", required=True)
parser.add_argument('--file', help="Scanner file", required=True) parser.add_argument('--file', help="Scanner file", required=False)
parser.add_argument('--scanner', help="Type of scanner", required=True) parser.add_argument('--dir', help="Scanner directory, needs to have the scanner name with the scan file in the folder. Ex: reports/nmap/nmap.csv", required=False)
parser.add_argument('--scanner', help="Type of scanner", required=False)
parser.add_argument('--build', help="Build ID", required=False) parser.add_argument('--build', help="Build ID", required=False)
parser.add_argument('--engagement', help="Engagement ID (optional)", required=False) parser.add_argument('--engagement', help="Engagement ID (optional)", required=False)
parser.add_argument('--critical', help="Maximum new critical vulns to pass the build.", required=False) parser.add_argument('--critical', help="Maximum new critical vulns to pass the build.", required=False)
...@@ -149,6 +254,7 @@ class Main: ...@@ -149,6 +254,7 @@ class Main:
user = args["user"] user = args["user"]
product_id = args["product"] product_id = args["product"]
file = args["file"] file = args["file"]
dir = args["dir"]
scanner = args["scanner"] scanner = args["scanner"]
engagement_id = args["engagement"] engagement_id = args["engagement"]
max_critical = args["critical"] max_critical = args["critical"]
...@@ -156,4 +262,18 @@ class Main: ...@@ -156,4 +262,18 @@ class Main:
max_medium = args["medium"] max_medium = args["medium"]
build = args["build"] build = args["build"]
create_findings(host, api_key, user, product_id, file, scanner, engagement_id, max_critical, max_high, max_medium, build) if dir is not None or file is not None:
dd = dojo_connection(host, api_key, user)
engagement_id = return_engagement(dd, product_id)
test_ids = None
if file is not None:
if scanner is not None:
test_ids = processFiles(dd, engagement_id, file, scanner=scanner)
else:
print "Scanner type must be specified for a file import. --scanner"
else:
test_ids = process_findings(dd, engagement_id, dir, build)
summary(dd, engagement_id, test_ids, max_critical, max_high, max_medium)
else:
print "No file or directory to scan specified."
File added
Date,Title,CweId,Url,Severity,Description,Mitigation,Impact,References,Active,Verified,FalsePositive,Duplicate
11/09/2017,blacklist,,,3,"Using cElementTree to parse untrusted XML data is known to be vulnerable to XML attacks. Replace cElementTree with the equivalent defusedxml package, or make sure defusedxml.defuse_stdlib() is called. Filename: PyBitBucket.py Line number: 6 Line range: [6, 7, 8, 9] Issue Confidence: HIGH",,,,False,False,False,False
This diff is collapsed.
<?xml version="1.0" ?>
<testsuite failures="4" name="DefectDojo" tests="4">
<testcase name="blacklist Severity: Info">
<failure>Using cElementTree to parse untrusted XML data is known to be vulnerable to XML attacks. Replace cElementTree with the equivalent defusedxml package, or make sure defusedxml.defuse_stdlib() is called. Filename: PyBitBucket.py Line number: 6 Line range: [6, 7, 8, 9] Issue Confidence: HIGH</failure>
</testcase>
<testcase name="SQL injection Severity: High">
<failure>SQL injection vulnerabilities arise when user-controllable data is
incorporated into database SQL queries in an unsafe manner. An attacker can
supply crafted input to break out of the data context in which their input
appears and interfere with the structure of the surrounding query.
A wide range of damaging attacks can often be delivered via SQL injection,
including reading or modifying critical application data, interfering with
application logic, escalating privileges within the database and taking
control of the database server.
The **sort_column** parameter appears to be vulnerable to SQL injection
attacks. The payload **,(select*from(select(sleep(20)))a)** was submitted in
the sort_column parameter. The application took **20562** milliseconds to
respond to the request, compared with **1980** milliseconds for the original
request, indicating that the injected SQL command caused a time delay.
The database appears to be MySQL.
</failure>
</testcase>
<testcase name="Cross-origin resource sharing Severity: Info">
<failure>An HTML5 cross-origin resource sharing (CORS) policy controls whether and how
content running on other domains can perform two-way interaction with the
domain that publishes the policy. The policy is fine-grained and can apply
access controls per-request based on the URL and other features of the
request.
If another domain is allowed by the policy, then that domain can potentially
attack users of the application. If a user is logged in to the application,
and visits a domain allowed by the policy, then any malicious content running
on that domain can potentially retrieve content from the application, and
sometimes carry out actions within the security context of the logged in user.
Even if an allowed domain is not overtly malicious in itself, security
vulnerabilities within that domain could potentially be leveraged by an
attacker to exploit the trust relationship and attack the application that
allows access. CORS policies on pages containing sensitive information should
be reviewed to determine whether it is appropriate for the application to
trust both the intentions and security posture of any domains granted access.
The application implements an HTML5 cross-origin resource sharing (CORS)
policy for this request.
If the application relies on network firewalls or other IP-based access
controls, this policy is likely to present a security risk.
Since the Vary: Origin header was not present in the response, reverse proxies
and intermediate servers may cache it. This may enable an attacker to carry
out cache poisoning attacks.
</failure>
</testcase>
<testcase name="Cross-origin resource sharing: arbitrary origin trusted Severity: Info">
<failure>An HTML5 cross-origin resource sharing (CORS) policy controls whether and how
content running on other domains can perform two-way interaction with the
domain that publishes the policy. The policy is fine-grained and can apply
access controls per-request based on the URL and other features of the
request.
Trusting arbitrary origins effectively disables the same-origin policy,
allowing two-way interaction by third-party web sites. Unless the response
consists only of unprotected public content, this policy is likely to present
a security risk.
If the site specifies the header Access-Control-Allow-Credentials: true,
third-party sites may be able to carry out privileged actions and retrieve
sensitive information. Even if it does not, attackers may be able to bypass
any IP-based access controls by proxying through users' browsers.
The application implements an HTML5 cross-origin resource sharing (CORS)
policy for this request that allows access from any domain.
The application allowed access from the requested origin
**https://pfcxuvwamstc.com**
If the application relies on network firewalls or other IP-based access
controls, this policy is likely to present a security risk.
Since the Vary: Origin header was not present in the response, reverse proxies
and intermediate servers may cache it. This may enable an attacker to carry
out cache poisoning attacks.
</failure>
</testcase>
</testsuite>
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment