import csv import json from datetime import datetime from datetime import timedelta from pytz import timezone board_labels = ('Sprint Backlog', 'Developing', 'Done') add_label = 'Add Label' remove_label = 'Remove Label' class TimelineData: # noinspection PyShadowingNames def __init__(self, project_name, json_filename, csv_filename): super().__init__() with open(json_filename, mode='r') as json_file: this_dict = json.load(json_file) self.start_time = get_tz_aware_datetime(this_dict['start_time']) self.observational_periods = self.process_observational_periods(this_dict['observational_periods']) self.issues = self.process_issues(this_dict['issues']) self.project_name = project_name self.csv_filename = csv_filename @staticmethod def process_observational_periods(periods_with_strings): periods_with_dates = [] for period in periods_with_strings: from_date = get_tz_aware_datetime(period['from']) to_date = get_tz_aware_datetime(period['to']) periods_with_dates.append((from_date, to_date)) return sorted(periods_with_dates, key=lambda p: p[0]) @staticmethod def process_issues(issues_with_strings): issues_with_dates = [] for issue in issues_with_strings: revised_issue = Issue(issue['number'], issue['opened'], issue['closed']) revised_issue.place_dates_on_labels(issue['events']) issues_with_dates.append(revised_issue) return sorted(issues_with_dates, key=lambda i: i.number) def collect_event_dates(self): timestamps = [] for issue in self.issues: timestamps.append(issue.opened) if issue.closed is not None: timestamps.append(issue.closed) for label in issue.labels: if issue.labels[label][0] is not None: timestamps.append(issue.labels[label][0]) if issue.labels[label][1] is not None: timestamps.append(issue.labels[label][1]) return sorted(timestamps) def issue_status_count_over_time(self, start, stop, event_list): filtered_issues = list(filter( lambda issue: (issue.opened <= start and issue.closed is None) or (issue.opened <= start <= issue.closed), self.issues)) issue_board_timeline = IssueBoardTimeline(start, len(filtered_issues), self.csv_filename) for label in board_labels: filtered_issues = list(filter( lambda issue: issue.labels[label][0] is not None and issue.labels[label][0] <= start and ( issue.labels[label][1] is None or start < issue.labels[label][1]), self.issues)) issue_board_timeline.initialize_count(label, len(filtered_issues)) for event in event_list: filtered_issues = list(filter(lambda issue: issue.opened == event, self.issues)) if len(filtered_issues) != 0: issue_board_timeline.open_issue(event) filtered_issues = list(filter(lambda issue: issue.closed == event, self.issues)) if len(filtered_issues) != 0: issue_board_timeline.close_issue(event) for label in board_labels: filtered_issues = list(filter(lambda issue: issue.labels[label][0] == event, self.issues)) if len(filtered_issues) != 0: issue_board_timeline.add_label(label, event) filtered_issues = list(filter(lambda issue: issue.labels[label][1] == event, self.issues)) if len(filtered_issues) != 0: issue_board_timeline.remove_label(label, event) issue_board_timeline.finalize_board_timeline(stop) return issue_board_timeline class Issue: def __init__(self, number, opened_date_string, closed_date_string): super().__init__() self.number = number self.opened = get_tz_aware_datetime(opened_date_string) if closed_date_string is not None: self.closed = get_tz_aware_datetime(closed_date_string) else: self.closed = None self.labels = dict() # noinspection PyShadowingNames def place_dates_on_labels(self, events_list): for label in board_labels: events = sorted(list(filter(lambda e: e[1] == label, events_list)), key=lambda e: e[2]) if len(events) > 0: start = get_tz_aware_datetime(events[0][2]) else: start = None if len(events) > 1: stop = get_tz_aware_datetime(events[1][2]) else: stop = None self.labels[label] = (start, stop) class IssueBoardTimeline: def __init__(self, start_time, initial_opened_count, csv_filename): super().__init__() board_name = 'Board Name' self.csv_filename = csv_filename self.headers = [board_name, start_time] self.start_time = start_time self.last_board_count = dict() self.opened_count = dict() self.opened_count[board_name] = 'Opened' self.opened_count[start_time] = self.last_opened_count = initial_opened_count self.closed_count = dict() self.closed_count[board_name] = 'Closed' self.closed_count[start_time] = self.last_closed_count = 0 self.board_count = dict() for label in board_labels: self.board_count[label] = dict() self.board_count[label][board_name] = label self.board_count[label][start_time] = 0 self.last_board_count[label] = 0 def initialize_count(self, label, count): self.board_count[label][self.start_time] = self.last_board_count[label] = count def open_issue(self, timestamp): self.mark_previous_microsecond(timestamp) if self.headers[-1] != timestamp: self.headers.append(timestamp) self.opened_count[timestamp] = self.last_opened_count + 1 self.last_opened_count = self.opened_count[timestamp] self.closed_count[timestamp] = self.last_closed_count for board_name in self.board_count: self.board_count[board_name][timestamp] = self.last_board_count[board_name] def close_issue(self, timestamp): self.mark_previous_microsecond(timestamp) if self.headers[-1] != timestamp: self.headers.append(timestamp) self.closed_count[timestamp] = self.last_closed_count + 1 self.last_closed_count = self.closed_count[timestamp] self.opened_count[timestamp] = self.last_opened_count - 1 self.last_opened_count = self.opened_count[timestamp] if self.headers[-1] != timestamp: self.headers.append(timestamp) for board_name in self.board_count: self.board_count[board_name][timestamp] = self.last_board_count[board_name] def add_label(self, label, timestamp): self.mark_previous_microsecond(timestamp) if self.headers[-1] != timestamp: self.headers.append(timestamp) for board_name in self.board_count: if board_name == label: self.board_count[board_name][timestamp] = self.last_board_count[board_name] + 1 self.last_board_count[board_name] = self.board_count[board_name][timestamp] else: self.board_count[board_name][timestamp] = self.last_board_count[board_name] self.opened_count[timestamp] = self.last_opened_count self.closed_count[timestamp] = self.last_closed_count def remove_label(self, label, timestamp): self.mark_previous_microsecond(timestamp) if self.headers[-1] != timestamp: self.headers.append(timestamp) for board_name in self.board_count: if board_name == label: self.board_count[board_name][timestamp] = self.last_board_count[board_name] - 1 self.last_board_count[board_name] = self.board_count[board_name][timestamp] else: self.board_count[board_name][timestamp] = self.last_board_count[board_name] self.opened_count[timestamp] = self.last_opened_count self.closed_count[timestamp] = self.last_closed_count def mark_previous_microsecond(self, timestamp): one_microsecond_ago = timestamp - timedelta.resolution if self.headers[-1] != one_microsecond_ago: self.headers.append(one_microsecond_ago) self.opened_count[one_microsecond_ago] = self.last_opened_count self.closed_count[one_microsecond_ago] = self.last_closed_count for board_name in self.board_count: self.board_count[board_name][one_microsecond_ago] = self.last_board_count[board_name] def finalize_board_timeline(self, stop_time): self.opened_count[stop_time] = self.last_opened_count self.closed_count[stop_time] = self.last_closed_count for board_name in self.board_count: self.board_count[board_name][stop_time] = self.last_board_count[board_name] if self.headers[-1] != stop_time: self.headers.append(stop_time) def write_to_csv(self): # try again to make the header sorted # self.headers[1:].sort() sorted_headers = sorted(self.headers[1:]) sorted_headers.insert(0, self.headers[0]) self.headers = sorted_headers # end try again with open(self.csv_filename, 'w', newline='') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=self.headers) writer.writeheader() writer.writerow(self.opened_count) for board_name in self.board_count: writer.writerow(self.board_count[board_name]) writer.writerow(self.closed_count) def get_tz_aware_datetime(datetime_string): timestamp = datetime.fromisoformat(datetime_string) if timestamp.tzinfo is None or timestamp.tzinfo.utcoffset(timestamp) is None: timestamp = datetime.fromisoformat(f'{datetime_string}+00:00') # timestamp = datetime.fromisoformat(f'{datetime_string}-06:00') return timestamp """ if __name__ == '__main__': name = 'Chess 1' files = ('sample_project.json', 'sample_project.csv') sprint_start = timezone('US/Central').localize(datetime(2019, 11, 20, 14, 30, 0, 0)) sprint_stop = timezone('US/Central').localize(datetime(2019, 12, 4, 14, 30, 0, 0)) timeline_data = TimelineData(name, files[0], files[1]) events = timeline_data.collect_event_dates() truncated_events = list(filter(lambda event: sprint_start <= event < sprint_stop, events)) counts = timeline_data.issue_status_count_over_time(sprint_start, sprint_stop, truncated_events) counts.write_to_csv() """ if __name__ == '__main__': """ sprint_start = timezone('US/Central').localize(datetime(2019, 11, 20, 14, 30, 0, 0)) sprint_stop = timezone('US/Central').localize(datetime(2019, 12, 4, 14, 30, 0, 0)) """ sprint_start = timezone('US/Central').localize(datetime(2019, 12, 4, 14, 30, 0, 0)) sprint_stop = timezone('US/Central').localize(datetime(2019, 12, 13, 12, 00, 0, 0)) with open('1198-capstones.json', mode='r') as json_file: capstones = json.load(json_file) for capstone in capstones: timeline_data = TimelineData(capstone['name'], capstone['file'], capstone['csv']) events = timeline_data.collect_event_dates() truncated_events = list(filter(lambda event: sprint_start <= event < sprint_stop, events)) counts = timeline_data.issue_status_count_over_time(sprint_start, sprint_stop, truncated_events) counts.write_to_csv()