diff --git a/defectdojo_api/defectdojo.py b/defectdojo_api/defectdojo.py index f76f89b5310e773c3fedb2316d6b846e3ba22feb..79e4de6bf8bf57feb05a746ac9368a242994ae3c 100644 --- a/defectdojo_api/defectdojo.py +++ b/defectdojo_api/defectdojo.py @@ -18,7 +18,7 @@ class DefectDojoAPI(object): :param api_version: API version to call, the default is v1. :param verify_ssl: Specify if API requests will verify the host's SSL certificate, defaults to true. :param timeout: HTTP timeout in seconds, default is 30. - :param proxies: Proxy for API requests. + :param proxis: Proxy for API requests. :param user_agent: HTTP user agent string, default is "DefectDojo_api/[version]". :param cert: You can also specify a local cert to use as client side certificate, as a single file (containing the private key and the certificate) or as a tuple of both file's path @@ -193,7 +193,23 @@ class DefectDojoAPI(object): return self._request('POST', 'engagements/', data=data) - def set_engagement(self, id, name=None, product_id=None, lead_id=None, status=None, target_start=None, + def close_engagement(self, id, user_id=None): + + """Closes an engagement with the given properties. + :param id: Engagement id. + :param user_id: User from the user table. + """ + engagement = self.get_engagement(id).data + + #if user isn't provided then close with the lead ID + if user_id is None: + user_id = self.get_id_from_url(engagement["lead"]) + + product_id = engagement["product_id"] + + self.set_engagement(id, name=engagement["name"], lead_id=user_id, product_id=product_id, status="Completed", active=False) + + def set_engagement(self, id, product_id=None, lead_id=None, name=None, status=None, target_start=None, target_end=None, active=None, pen_test=None, check_list=None, threat_model=None, risk_path=None, test_strategy=None, progress=None, done_testing=None): @@ -225,7 +241,7 @@ class DefectDojoAPI(object): data['product'] = self.get_product_uri(product_id) if lead_id: - data['lead'] = self.get_user_uri(user_id) + data['lead'] = self.get_user_uri(lead_id) if status: data['status'] = status @@ -236,7 +252,7 @@ class DefectDojoAPI(object): if target_end: data['target_end'] = target_end - if active: + if active is not None: data['active'] = active if pen_test: diff --git a/examples/dojo_ci_cd.py b/examples/dojo_ci_cd.py index 15dd0ba306ccfa5e51cb804ef4e3f381265bd78e..25971c15ee4e62be77039b5e8196002a73a2fbf6 100644 --- a/examples/dojo_ci_cd.py +++ b/examples/dojo_ci_cd.py @@ -38,11 +38,12 @@ def dojo_connection(host, api_key, user, proxy): # 3. Call this script to load scan data, specifying scanner type # 4. Script returns along with a pass or fail results: Example: 2 new critical vulns, 1 low out of 10 vulnerabilities -def return_engagement(dd, product_id, user): +def return_engagement(dd, product_id, user, build_id=None): #Specify the product id product_id = product_id engagement_id = None + """ # Check for a CI/CD engagement_id engagements = dd.list_engagements(product_in=product_id, status="In Progress") @@ -52,16 +53,21 @@ def return_engagement(dd, product_id, user): engagement_id = engagement['id'] if engagement_id == None: - start_date = datetime.now() - end_date = start_date+timedelta(days=180) - users = dd.list_users(user) - user_id = None + """ + start_date = datetime.now() + end_date = start_date+timedelta(days=1) + users = dd.list_users(user) + user_id = None - if users.success: - user_id = users.data["objects"][0]["id"] + if users.success: + user_id = users.data["objects"][0]["id"] - engagement_id = dd.create_engagement("Recurring CI/CD Integration", product_id, str(user_id), - "In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")) + engagementText = "CI/CD Integration" + if build_id is not None: + engagementText = engagementText + " - Build #" + build_id + + engagement_id = dd.create_engagement(engagementText, product_id, str(user_id), + "In Progress", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")) return engagement_id def process_findings(dd, engagement_id, dir, build=None): @@ -238,6 +244,7 @@ class Main: parser.add_argument('--host', help="DefectDojo Hostname", required=True) parser.add_argument('--proxy', help="Proxy ex:localhost:8080", required=False, default=None) parser.add_argument('--api_key', help="API Key", required=True) + parser.add_argument('--build_id', help="Reference to external build id", required=False) parser.add_argument('--user', help="User", required=True) parser.add_argument('--product', help="DefectDojo Product ID", required=True) parser.add_argument('--file', help="Scanner file", required=False) @@ -264,10 +271,11 @@ class Main: max_medium = args["medium"] build = args["build"] proxy = args["proxy"] + build_id = args["build_id"] if dir is not None or file is not None: dd = dojo_connection(host, api_key, user, proxy=proxy) - engagement_id = return_engagement(dd, product_id, user) + engagement_id = return_engagement(dd, product_id, user, build_id=build_id) test_ids = None if file is not None: if scanner is not None: @@ -277,6 +285,8 @@ class Main: else: test_ids = process_findings(dd, engagement_id, dir, build) + #Close the engagement_id + dd.close_engagement(engagement_id) summary(dd, engagement_id, test_ids, max_critical, max_high, max_medium) else: print "No file or directory to scan specified."