Skip to content
Snippets Groups Projects
Commit 60b1d9d3 authored by Aaron Weaver's avatar Aaron Weaver
Browse files

Initial commit

parent 3ba12637
Branches
Tags
No related merge requests found
Showing
with 1702 additions and 0 deletions
"""
Example written by Aaron Weaver <aaron.weaver@owasp.org>
as part of the OWASP DefectDojo and OWASP AppSec Pipeline Security projects
Description: CI/CD example for DefectDojo
"""
from defectdojo_api import defectdojo
from datetime import datetime, timedelta
import os, sys
import argparse
import time
import junit_xml_output
import shutil
DEBUG = True
test_cases = []
def junit(toolName, file):
junit_xml = junit_xml_output.JunitXml(toolName, test_cases, total_tests=None, total_failures=None)
with open(file, 'w') as file:
print "\nWriting Junit test file: junit_dojo.xml"
file.write(junit_xml.dump())
def dojo_connection(host, api_key, user, proxy=None):
if proxy is not None:
proxies = {
'http': 'http://' + proxy,
'https': 'http://' + proxy,
}
print proxy
# Instantiate the DefectDojo api wrapper
dd = defectdojo.DefectDojoAPI(host, api_key, user, proxies=proxies, verify_ssl=False, timeout=360, debug=True)
else:
dd = defectdojo.DefectDojoAPI(host, api_key, user, verify_ssl=False, timeout=360, debug=False)
return dd
def return_engagement(dd, product_id, user, build_id=None):
engagement_id = None
#Specify the product id
product_id = product_id
user_id = None
start_date = datetime.now()
end_date = start_date+timedelta(days=1)
users = dd.list_users(user)
if users.success == False:
print "Error in listing users: " + users.message
print "Exiting...\n"
sys.exit()
else:
user_id = users.data["objects"][0]["id"]
engagementText = "CI/CD Integration"
if build_id is not None:
engagementText = engagementText + " - Build #" + build_id
engagement_id = dd.create_engagement(engagementText, product_id, str(user_id),
"In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))
print "Engagement ID created: " + str(engagement_id)
return engagement_id
def process_findings(dd, engagement_id, dir, build=None):
test_ids = []
for root, dirs, files in os.walk(dir):
for name in files:
file = os.path.join(os.getcwd(),root, name)
if "processed" not in str(file) and "error" not in str(file):
#Test for file extension
if file.lower().endswith(('.json', '.csv','.txt','.js', '.xml')):
test_id = processFiles(dd, engagement_id, file)
if test_id is not None:
if str(test_id).isdigit():
test_ids.append(str(test_id))
else:
print "Skipped file, extension not supported: " + file + "\n"
return ','.join(test_ids)
def moveFile(file, success):
path = os.path.dirname(file)
name = os.path.basename(file)
dest = None
#folder for processed files
processFolder = os.path.join(path,"processed")
if not os.path.exists(processFolder):
os.mkdir(processFolder)
#folder for error file
errorFolder = os.path.join(path,"error")
if not os.path.exists(errorFolder):
os.mkdir(errorFolder)
if success == True:
dest = os.path.join(path,processFolder,name)
else:
dest = os.path.join(path,errorFolder,name)
shutil.move(file, dest)
def processFiles(dd, engagement_id, file, scanner=None, build=None):
upload_scan = None
scannerName = None
path=os.path.dirname(file)
name = os.path.basename(file)
tool = os.path.basename(path)
tool = tool.lower()
test_id = None
date = datetime.now()
dojoDate = date.strftime("%Y-%m-%d")
#Tools without an importer in Dojo; attempted to import as generic
if "generic" in name:
scanner = "Generic Findings Import"
print "Uploading " + tool + " scan: " + file
test_id = dd.upload_scan(engagement_id, scanner, file, "true", dojoDate, build)
if test_id.success == False:
print "An error occured while uploading the scan: " + test_id.message
moveFile(file, False)
else:
print "Succesful upload, TestID: " + str(test_id) + "\n"
moveFile(file, True)
else:
if tool == "burp":
scannerName = "Burp Scan"
elif tool == "nessus":
scannerName = "Nessus Scan"
elif tool == "nmap":
scannerName = "Nmap Scan"
elif tool == "nexpose":
scannerName = "Nexpose Scan"
elif tool == "veracode":
scannerName = "Veracode Scan"
elif tool == "checkmarx":
scannerName = "Checkmarx Scan"
elif tool == "zap":
scannerName = "ZAP Scan"
elif tool == "appspider":
scannerName = "AppSpider Scan"
elif tool == "arachni":
scannerName = "Arachni Scan"
elif tool == "vcg":
scannerName = "VCG Scan"
elif tool == "dependency-check":
scannerName = "Dependency Check Scan"
elif tool == "retirejs":
scannerName = "Retire.js Scan"
elif tool == "nodesecurity":
scannerName = "Node Security Platform Scan"
elif tool == "qualys":
scannerName = "Qualys Scan"
elif tool == "qualyswebapp":
scannerName = "Qualys Webapp Scan"
elif tool == "openvas":
scannerName = "OpenVAS CSV"
elif tool == "snyk":
scannerName = "Snyk Scan"
else:
print "Tool not defined in dojo_ci_cd script: " + tool
if scannerName is not None:
print "Uploading " + scannerName + " scan: " + file
test_id = dd.upload_scan(engagement_id, scannerName, file, "true", dojoDate, build)
if test_id.success == False:
print "An error occured while uploading the scan: " + test_id.message
moveFile(file, False)
else:
print "Succesful upload, TestID: " + str(test_id)
moveFile(file, True)
return test_id
def create_findings(dd, engagement_id, scanner, file, build=None):
# Upload the scanner export
if engagement_id > 0:
print "Uploading scanner data."
date = datetime.now()
upload_scan = dd.upload_scan(engagement_id, scanner, file, "true", date.strftime("%Y-%m-%d"), build=build)
if upload_scan.success:
test_id = upload_scan.id()
else:
print upload_scan.message
quit()
def summary(dd, engagement_id, test_ids, max_critical=0, max_high=0, max_medium=0):
findings = dd.list_findings(engagement_id_in=engagement_id, duplicate="false", active="true", verified="true")
if findings.success:
print"=============================================="
print "Total Number of Vulnerabilities: " + str(findings.data["meta"]["total_count"])
print"=============================================="
print_findings(sum_severity(findings))
print
else:
print "An error occurred: " + findings.message
findings = dd.list_findings(test_id_in=test_ids, duplicate="true")
if findings.success:
print"=============================================="
print "Total Number of Duplicate Findings: " + str(findings.data["meta"]["total_count"])
print"=============================================="
print_findings(sum_severity(findings))
print
"""
#Delay while de-dupes
sys.stdout.write("Sleeping for 30 seconds to wait for dedupe celery process:")
sys.stdout.flush()
for i in range(15):
time.sleep(2)
sys.stdout.write(".")
sys.stdout.flush()
"""
else:
print "An error occurred: " + findings.message
findings = dd.list_findings(test_id_in=test_ids, duplicate="false", limit=500)
if findings.success:
if findings.count() > 0:
for finding in findings.data["objects"]:
test_cases.append(junit_xml_output.TestCase(finding["title"] + " Severity: " + finding["severity"], finding["description"],"failure"))
#if not os.path.exists("reports"):
# os.mkdir("reports")
#junit("DefectDojo", "reports/junit_dojo.xml")
print"\n=============================================="
print "Total Number of New Findings: " + str(findings.data["meta"]["total_count"])
print"=============================================="
sum_new_findings = sum_severity(findings)
print_findings(sum_new_findings)
print
print"=============================================="
strFail = None
if max_critical is not None:
if sum_new_findings[4] > max_critical:
strFail = "Build Failed: Max Critical"
if max_high is not None:
if sum_new_findings[3] > max_high:
strFail = strFail + " Max High"
if max_medium is not None:
if sum_new_findings[2] > max_medium:
strFail = strFail + " Max Medium"
if strFail is None:
print "Build Passed!"
else:
print "Build Failed: " + strFail
print"=============================================="
else:
print "An error occurred: " + findings.message
def sum_severity(findings):
severity = [0,0,0,0,0]
for finding in findings.data["objects"]:
if finding["severity"] == "Critical":
severity[4] = severity[4] + 1
if finding["severity"] == "High":
severity[3] = severity[3] + 1
if finding["severity"] == "Medium":
severity[2] = severity[2] + 1
if finding["severity"] == "Low":
severity[1] = severity[1] + 1
if finding["severity"] == "Info":
severity[0] = severity[0] + 1
return severity
def print_findings(findings):
print "Critical: " + str(findings[4])
print "High: " + str(findings[3])
print "Medium: " + str(findings[2])
print "Low: " + str(findings[1])
print "Info: " + str(findings[0])
class Main:
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='CI/CD integration for DefectDojo')
parser.add_argument('--host', help="Dojo Hostname", required=True)
parser.add_argument('--api_key', help="API Key: user:guidvalue", required=True)
parser.add_argument('--product', help="Dojo Product ID", required=True)
parser.add_argument('--file', help="Scanner file", required=False)
parser.add_argument('--dir', help="Scanner directory, needs to have the scanner name with the scan file in the folder. Ex: reports/nmap/nmap.csv", required=False, default="reports")
parser.add_argument('--scanner', help="Type of scanner", required=False)
parser.add_argument('--build_id', help="Build ID", required=False)
parser.add_argument('--engagement', help="Engagement ID (optional)", required=False)
parser.add_argument('--closeengagement', help="Close Engagement", required=False, action='store_true')
parser.add_argument('--critical', help="Maximum new critical vulns to pass the build.", required=False)
parser.add_argument('--high', help="Maximum new high vulns to pass the build.", required=False)
parser.add_argument('--medium', help="Maximum new medium vulns to pass the build.", required=False)
parser.add_argument('--proxy', help="Proxy, specify as host:port, ex: localhost:8080")
#Parse arguments
args = vars(parser.parse_args())
host = args["host"]
api_key = args["api_key"]
product_id = args["product"]
file = args["file"]
dir = args["dir"]
scanner = args["scanner"]
engagement_id = args["engagement"]
closeEngagement = args["closeengagement"]
max_critical = args["critical"]
max_high = args["high"]
max_medium = args["medium"]
build_id = args["build_id"]
proxy = args["proxy"]
if dir is not None or file is not None:
if ":" not in api_key:
print "API Key not in the correct format, must be: <user>:<guid>"
quit()
apiParsed = api_key.split(':')
user = apiParsed[0]
api_key = apiParsed[1]
dd = dojo_connection(host, api_key, user, proxy)
if engagement_id is None:
engagement_id = return_engagement(dd, product_id, user, build_id=build_id)
test_ids = None
if file is not None:
if scanner is not None:
test_ids = processFiles(dd, engagement_id, file, scanner=scanner)
else:
print "Scanner type must be specified for a file import. --scanner"
else:
test_ids = process_findings(dd, engagement_id, dir, build_id)
#Close the engagement
if closeEngagement == True:
dd.close_engagement(engagement_id)
summary(dd, engagement_id, test_ids, max_critical, max_high, max_medium)
else:
print "No file or directory to scan specified."
dependency-check:
version: AppSecPipeline 0.5.0
tags:
- "Components with known Vulnerabilities"
type: "static"
description: "Dependency-Check is a utility that identifies project dependencies and checks if there are any known, publicly disclosed, vulnerabilities. Currently Java and .NET are supported; additional experimental support has been added for Ruby, Node.js, Python, and limited support for C/C++ build systems (autoconf and cmake)."
docker: "appsecpipeline/sast"
url: https://www.owasp.org/index.php/OWASP_Dependency_Check
documentation: https://jeremylong.github.io/DependencyCheck/
parameters:
LOC:
type: runtime
data_type: string
description: "Location of the source code."
PROJECT:
type: runtime
data_type: string
description: "Name of the Dependency project."
commands:
pre:
exec: "/usr/bin/dependency-check/bin/dependency-check.sh"
shell: False
post:
report: "--out {reportname} --format XML"
reportname: "{timestamp}.xml"
junit:
languages:
- "java"
- "nodejs"
- "ruby"
- ".net"
- "python"
profiles:
#Runs the full dependency scan, only updates every week
all: "--project $PROJECT --scan $LOC --cveValidForHours 168"
File added
git:
version: AppSecPipeline 0.5.0
tags:
- "Utility"
type: "utility"
description: "Git is a free and open source distributed version control system designed to handle everything from small to very large projects with speed and efficiency."
docker: "appsecpipeline/base"
url: https://git-scm.com/
documentation: https://git-scm.com/docs/git
parameters:
GIT_URL:
type: runtime
data_type: url
description: "URL of the source code repository."
LOC:
type: runtime
data_type: string
description: "Location of the source code."
GIT_TAGS:
type: runtime
data_type: string
description: "Checkout a specified tag."
commands:
pre:
exec: "sh /usr/bin/appsecpipeline/tools/git/git.sh"
shell: False
post:
report:
reportname:
junit:
profiles:
clone: "clone $GIT_URL $LOC"
tags: "tag $GIT_URL $LOC $GIT_TAGS"
action=$1
repo=$2
dest=$3
tag=$4
git clone $repo $dest
cd $dest
if [ $action = "tag" ]; then
git fetch --all
git checkout $3
fi
import re
from datetime import datetime
import subprocess
import shlex
import sys
def days_between(dateCompare):
d1 = datetime.strptime(dateCompare, "%Y-%m-%d %H:%M:%S")
return abs((datetime.now() - d1).seconds/60)
age = 0
uptime = subprocess.check_output(shlex.split("stat /proc/1/cmdline"))
for line in uptime.splitlines():
dockerStartTime = re.search("Access\:\s(\d{1,4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})", line)
if dockerStartTime:
age = days_between(dockerStartTime.group(1))
break
#Make configurable at some point, terminate if longer than 12 hours / 720
if age > 720:
sys.exit(1)
#!/usr/bin/env python
import xml.etree.ElementTree as ET
import csv
from datetime import datetime
import re
import argparse
import os
import junit_xml_output
test_cases = []
def junit(toolName, file):
junit_xml = junit_xml_output.JunitXml(toolName, test_cases, total_tests=None, total_failures=None)
with open(file, 'w') as file:
print "Writing Junit test files"
file.write(junit_xml.dump())
if __name__ == '__main__':
parser = argparse.ArgumentParser()
#Command line options
parser.add_argument("-t", "--tool", help="Tool name", required=True)
parser.add_argument("-f", "--file", help="File to process", required=True)
args = parser.parse_args()
test_cases = []
TITLE = 1
DESCRIPTION = 5
base = os.path.basename(args.file)
fileName = os.path.join(os.path.dirname(args.file), "generic_" + os.path.splitext(base)[0] + ".csv")
csvToParse = fileName
#Test for file
if os.path.isfile("csvToParse"):
with open(csvToParse, 'rb') as csvfile:
reader = csv.reader(csvfile, delimiter=',')
first = True
for row in reader:
if first:
first = False
else:
#Output a junit test file, should lows/med be condsider a failure?
test_cases.append(junit_xml_output.TestCase(row[TITLE], row[DESCRIPTION],"failure"))
junit(args.tool, os.path.join(os.path.dirname(args.file), "junit", "junit_" + os.path.splitext(base)[0] + ".xml"))
else:
print "File passed in doesn't exist."
This diff is collapsed.
nikto:
version: AppSecPipeline 0.5.0
tags:
- "Dyanmic Scanner"
type: "dynamic"
scan_type: "web"
icon-sm:
icon-lg:
description: "Web server scanner which performs comprehensive tests against web servers for multiple items, including over 3500 potentially dangerous files/CGIs, versions on over 900 servers, and version specific problems on over 250 servers."
url: https://cirt.net/Nikto2
documentation: https://cirt.net/nikto2-docs/
docker: "appsecpipeline/base-tools"
parameters:
URL:
type: runtime
data_type: url
description: "URL of the site to scan."
commands:
pre:
exec: "nikto -h $URL"
report: "-output '{reportname}'"
reportname: "{timestamp}.xml"
post: "python /usr/bin/appsecpipeline/tools/nikto/parser.py -f '{reportname}'"
junit: "junit.py -f '{reportname}' -t nikto"
profiles:
all: ""
tuned: "-Tuning x 6"
fast: "-Plugins \"headers;report_xml\""
file_upload: "-Tuning 0"
misconfig: "-Tuning 2"
info: "-Tuning 3"
xss: "-Tuning 4"
remote: "-Tuning 57"
dos: "-Tuning 6"
command_exec: "-Tuning 8"
sqli: "-Tuning 9"
identification: "-Tuning b"
#!/usr/bin/env python
import xml.etree.ElementTree as ET
import csv
from datetime import datetime
import re
import argparse
import os
if __name__ == '__main__':
parser = argparse.ArgumentParser()
#Command line options
parser.add_argument("-f", "--file", help="File to process", required=True)
args = parser.parse_args()
#Parse the XML file
tree = None
try:
#Open up the XML file from the nikto output
tree = ET.parse(args.file)
root = tree.getroot()
scan = root.find('scandetails')
datestring = datetime.strftime(datetime.now(), '%m/%d/%Y')
#Find only the base filname, save as csv
base = os.path.basename(args.file)
csv_output = open(os.path.join(os.path.dirname(args.file), "generic_" + os.path.splitext(base)[0] + ".csv"), 'w')
csvwriter = csv.writer(csv_output)
"""
Date: ::
Date of the finding in mm/dd/yyyy format.
Title: ::
Title of the finding
CweId: ::
Cwe identifier, must be an integer value.
Url: ::
Url associated with the finding.
Severity: ::
Severity of the finding. Must be one of Info, Low, Medium, High, or Critical.
Description: ::
Description of the finding. Can be multiple lines if enclosed in double quotes.
Mitigation: ::
Possible Mitigations for the finding. Can be multiple lines if enclosed in double quotes.
Impact: ::
Detailed impact of the finding. Can be multiple lines if enclosed in double quotes.
References: ::
References associated with the finding. Can be multiple lines if enclosed in double quotes.
Active: ::
Indicator if the finding is active. Must be empty, True or False
Verified: ::
Indicator if the finding has been verified. Must be empty, True, or False
FalsePositive: ::
Indicator if the finding is a false positive. Must be empty, True, or False
Duplicate: ::
Indicator if the finding is a duplicate. Must be empty, True, or False
"""
csvwriter.writerow(["Date","Title","CweId","Url","Severity","Description","Mitigation","Impact","References","Active","Verified","FalsePositive","Duplicate"])
for item in scan.findall('item'):
finding = []
#CSV format
####### Individual fields ########
#Date
finding.append(datestring)
#Title
titleText = None
description = item.find("description").text
#Cut the title down to the first sentence
sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', description)
if len(sentences) > 0:
titleText = sentences[0][:900]
else:
titleText = description[:900]
finding.append(titleText)
#CweId
finding.append("0")
#Url
ip = item.find("iplink").text
#Remove the port numbers for 80/443
ip = ip.replace(":80","")
ip = ip.replace(":443","")
finding.append(ip)
#Severity
finding.append("Low") #Nikto doesn't assign severity, default to low
#Description
finding.append(item.find("description").text)
#Mitigation
finding.append("")
#Impact
finding.append("")
#References
finding.append("")
#Active
finding.append("False")
#Verified
finding.append("False")
#FalsePositive
finding.append("False")
#Duplicate
finding.append("False")
csvwriter.writerow(finding)
csv_output.close()
except:
print "Nothing in report"
nmap:
version: AppSecPipeline 0.5.0
tags:
- "Dyanmic Scanner"
type: "dynamic"
scan_type: "infrastructure"
icon-sm:
icon-lg:
description: "Nmap is a free and open source (license) utility for network discovery and security auditing. Many systems and network administrators also find it useful for tasks such as network inventory, managing service upgrade schedules, and monitoring host or service uptime. Nmap uses raw IP packets in novel ways to determine what hosts are available on the network, what services (application name and version) those hosts are offering, what operating systems (and OS versions) they are running, what type of packet filters/firewalls are in use, and dozens of other characteristics. It was designed to rapidly scan large networks, but works fine against single hosts."
url: https://nmap.org/
documentation: https://nmap.org/book/man.html
docker: "appsecpipeline/base-tools"
parameters:
TARGET:
type: runtime
data_type: host
description: "Target hostname of the site to scan."
commands:
pre:
exec: "nmap"
shell: False
report: "-oX {reportname}"
reportname: "{timestamp}.xml"
post:
junit:
credentials:
simple:
profiles:
#Full handshake, fairly fast
intensive_evident: "-sT -p- -A -T4 $TARGET"
#Scans all TCP ports
all: "-p 1-65535 -T4 -A -v $TARGET"
#Default everything. Will issue a TCP SYN scan for the most common 1000 TCP ports, using ICMP Echo request (ping) for host detection.
regular: "$TARGET"
#scan the most common TCP ports. It will make an effort in determining the OS type and what services and their versions are running.
intense: "-T4 -A -v $TARGET"
#Same as the intense scan and will also scan UDP ports (-sU)
intense_udp: "-sS -sU -T4 -A -v $TARGET"
#Do only a ping only on the target, no port scan.
ping: "-sn $TARGET"
#Limiting the number of TCP ports scanned to only the top 100 most common TCP ports
quick: "-T4 -F $TARGET"
#Version and OS detection
quick_light: "-sV -T4 -O -F –version-light $TARGET"
#determine hosts and routers in a network scan. It will traceroute and ping all hosts defined in the target.
traceroute: "-sn –traceroute $TARGET"
#Intense scan plus UDP, highly intrusive and very slow
comprehensive: "-sS -sU -T4 -A -v -PE -PP -PS80,443 -PA3389 -PU40125 -PY -g 53 -script \"default or (discovery and safe)\" $TARGET"
#SYN scan
syn: "–sS $TARGET"
#UDP scan
udp: "–sU $TARGET"
#SCTP INIT Scan, Like SYN scan, INIT scan is relatively unobtrusive and stealthy, since it never completes SCTP associations.
sctp: "–sY $TARGET"
#TCP Window Scan, exactly the same as ACK scan, except that it exploits an implementation detail of certain systems to differentiate open ports from closed ones, rather than always printing unfiltered when an RST is returned.
windows: "–sW $TARGET"
#Stealth scan
stealth: "-sS -p- -T2 $TARGET"
File added
This diff is collapsed.
This diff is collapsed.
retirejs:
version: AppSecPipeline 0.5.0
tags:
- "Components with known Vulnerabilities"
type: "static"
description: "There is a plethora of JavaScript libraries for use on the Web and in Node.JS apps out there. This greatly simplifies development,but we need to stay up-to-date on security fixes. Using 'Components with Known Vulnerabilities' is now a part of the OWASP Top 10 list of security risks and insecure libraries can pose a huge risk to your Web app. The goal of Retire.js is to help you detect the use of JS-library versions with known vulnerabilities."
docker: "appsecpipeline/node"
url: https://retirejs.github.io/retire.js/
documentation: https://github.com/RetireJS/retire.js
parameters:
LOC:
type: runtime
data_type: string
description: "Location of the source code."
commands:
pre:
exec: "retire"
shell: False
post:
report: "--outputpath {reportname} --outputformat json"
reportname: "{timestamp}.json"
junit:
languages:
- "javascript"
- "nodejs"
profiles:
#Runs the full dependency scan
all: "--path $LOC"
snyk wizard
Now redirecting you to our github auth page, go ahead and log in,
and once the auth is complete, return to this prompt and you’ll
be ready to start using snyk.
If you can’t wait use this url:
https://snyk.io/login?token=<token>
Waiting...
https://snyk.io/account/
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment