Skip to content
Snippets Groups Projects
Commit ed13704f authored by Christopher Bohn's avatar Christopher Bohn :thinking:
Browse files

created script to convert data from Timeline into csv data for burndown charts

parent eea9a8a1
No related branches found
No related tags found
No related merge requests found
[
{
"name": "Airline Crew Scheduler 1",
"file": "airline_crew_scheduler_1.json"
"file": "airline_crew_scheduler_1.json",
"csv": "airline_crew_scheduler_1.csv"
},
{
"name": "Airline Crew Scheduler 2",
"file": "airline_crew_scheduler_2.json"
"file": "airline_crew_scheduler_2.json",
"csv": "airline_crew_scheduler_2.csv"
},
{
"name": "Airline Crew Scheduler 3",
"file": "airline_crew_scheduler_3.json"
"file": "airline_crew_scheduler_3.json",
"csv": "airline_crew_scheduler_3.csv"
},
{
"name": "Boggle 1",
"file": "boggle_1.json"
"file": "boggle_1.json",
"csv": "boggle_1.csv"
},
{
"name": "Boggle 2",
"file": "boggle_2.json"
"file": "boggle_2.json",
"csv": "boggle_2.csv"
},
{
"name": "Boggle 3",
"file": "boggle_3.json"
"file": "boggle_3.json",
"csv": "boggle_3.csv"
},
{
"name": "Chess 1",
"file": "chess_1.json"
"file": "chess_1.json",
"csv": "chess_1.csv"
},
{
"name": "Chess 2",
"file": "chess_2.json"
"file": "chess_2.json",
"csv": "chess_2.csv"
},
{
"name": "Chess 3",
"file": "chess_4.json"
"file": "chess_3.json",
"csv": "chess_3.csv"
},
{
"name": "Chess 4",
"file": "chess_4.json"
"file": "chess_4.json",
"csv": "chess_4.csv"
},
{
"name": "Package Tracker 1",
"file": "package_tracker_1.json"
"file": "package_tracker_1.json",
"csv": "package_tracker_1.csv"
},
{
"name": "Package Tracker 2",
"file": "package_tracker_2.json"
"file": "package_tracker_2.json",
"csv": "package_tracker_2.csv"
}
]
import csv
import json
from datetime import datetime
from datetime import timedelta
from pytz import timezone
board_labels = ('Sprint Backlog', 'Developing', 'Done')
add_label = 'Add Label'
remove_label = 'Remove Label'
class TimelineData:
# noinspection PyShadowingNames
def __init__(self, project_name, json_filename, csv_filename):
super().__init__()
with open(json_filename, mode='r') as json_file:
this_dict = json.load(json_file)
self.start_time = get_tz_aware_datetime(this_dict['start_time'])
self.observational_periods = self.process_observational_periods(this_dict['observational_periods'])
self.issues = self.process_issues(this_dict['issues'])
self.project_name = project_name
self.csv_filename = csv_filename
@staticmethod
def process_observational_periods(periods_with_strings):
periods_with_dates = []
for period in periods_with_strings:
from_date = get_tz_aware_datetime(period['from'])
to_date = get_tz_aware_datetime(period['to'])
periods_with_dates.append((from_date, to_date))
return sorted(periods_with_dates, key=lambda p: p[0])
@staticmethod
def process_issues(issues_with_strings):
issues_with_dates = []
for issue in issues_with_strings:
revised_issue = Issue(issue['number'], issue['opened'], issue['closed'])
revised_issue.place_dates_on_labels(issue['events'])
issues_with_dates.append(revised_issue)
return sorted(issues_with_dates, key=lambda i: i.number)
def collect_event_dates(self):
timestamps = []
for issue in self.issues:
timestamps.append(issue.opened)
if issue.closed is not None:
timestamps.append(issue.closed)
for label in issue.labels:
if issue.labels[label][0] is not None:
timestamps.append(issue.labels[label][0])
if issue.labels[label][1] is not None:
timestamps.append(issue.labels[label][1])
return sorted(timestamps)
def issue_status_count_over_time(self, start, stop, event_list):
filtered_issues = list(filter(
lambda issue: (issue.opened <= start and issue.closed is None) or (issue.opened <= start <= issue.closed),
self.issues))
issue_board_timeline = IssueBoardTimeline(start, len(filtered_issues), self.csv_filename)
for label in board_labels:
filtered_issues = list(filter(
lambda issue: issue.labels[label][0] is not None and issue.labels[label][0] <= start and (
issue.labels[label][1] is None or start < issue.labels[label][1]), self.issues))
issue_board_timeline.initialize_count(label, len(filtered_issues))
for event in event_list:
filtered_issues = list(filter(lambda issue: issue.opened == event, self.issues))
if len(filtered_issues) != 0:
issue_board_timeline.open_issue(event)
filtered_issues = list(filter(lambda issue: issue.closed == event, self.issues))
if len(filtered_issues) != 0:
issue_board_timeline.close_issue(event)
for label in board_labels:
filtered_issues = list(filter(lambda issue: issue.labels[label][0] == event, self.issues))
if len(filtered_issues) != 0:
issue_board_timeline.add_label(label, event)
filtered_issues = list(filter(lambda issue: issue.labels[label][1] == event, self.issues))
if len(filtered_issues) != 0:
issue_board_timeline.remove_label(label, event)
issue_board_timeline.finalize_board_timeline(stop)
return issue_board_timeline
class Issue:
def __init__(self, number, opened_date_string, closed_date_string):
super().__init__()
self.number = number
self.opened = get_tz_aware_datetime(opened_date_string)
if closed_date_string is not None:
self.closed = get_tz_aware_datetime(closed_date_string)
else:
self.closed = None
self.labels = dict()
# noinspection PyShadowingNames
def place_dates_on_labels(self, events_list):
for label in board_labels:
events = sorted(list(filter(lambda e: e[1] == label, events_list)), key=lambda e: e[2])
if len(events) > 0:
start = get_tz_aware_datetime(events[0][2])
else:
start = None
if len(events) > 1:
stop = get_tz_aware_datetime(events[1][2])
else:
stop = None
self.labels[label] = (start, stop)
class IssueBoardTimeline:
def __init__(self, start_time, initial_opened_count, csv_filename):
super().__init__()
board_name = 'Board Name'
self.csv_filename = csv_filename
self.headers = [board_name, start_time]
self.start_time = start_time
self.last_board_count = dict()
self.opened_count = dict()
self.opened_count[board_name] = 'Opened'
self.opened_count[start_time] = self.last_opened_count = initial_opened_count
self.closed_count = dict()
self.closed_count[board_name] = 'Closed'
self.closed_count[start_time] = self.last_closed_count = 0
self.board_count = dict()
for label in board_labels:
self.board_count[label] = dict()
self.board_count[label][board_name] = label
self.board_count[label][start_time] = 0
self.last_board_count[label] = 0
def initialize_count(self, label, count):
self.board_count[label][self.start_time] = self.last_board_count[label] = count
def open_issue(self, timestamp):
self.mark_previous_microsecond(timestamp)
if self.headers[-1] != timestamp:
self.headers.append(timestamp)
self.opened_count[timestamp] = self.last_opened_count + 1
self.last_opened_count = self.opened_count[timestamp]
self.closed_count[timestamp] = self.last_closed_count
for board_name in self.board_count:
self.board_count[board_name][timestamp] = self.last_board_count[board_name]
def close_issue(self, timestamp):
self.mark_previous_microsecond(timestamp)
if self.headers[-1] != timestamp:
self.headers.append(timestamp)
self.closed_count[timestamp] = self.last_closed_count + 1
self.last_closed_count = self.closed_count[timestamp]
self.opened_count[timestamp] = self.last_opened_count - 1
self.last_opened_count = self.opened_count[timestamp]
if self.headers[-1] != timestamp:
self.headers.append(timestamp)
for board_name in self.board_count:
self.board_count[board_name][timestamp] = self.last_board_count[board_name]
def add_label(self, label, timestamp):
self.mark_previous_microsecond(timestamp)
if self.headers[-1] != timestamp:
self.headers.append(timestamp)
for board_name in self.board_count:
if board_name == label:
self.board_count[board_name][timestamp] = self.last_board_count[board_name] + 1
self.last_board_count[board_name] = self.board_count[board_name][timestamp]
else:
self.board_count[board_name][timestamp] = self.last_board_count[board_name]
self.opened_count[timestamp] = self.last_opened_count
self.closed_count[timestamp] = self.last_closed_count
def remove_label(self, label, timestamp):
self.mark_previous_microsecond(timestamp)
if self.headers[-1] != timestamp:
self.headers.append(timestamp)
for board_name in self.board_count:
if board_name == label:
self.board_count[board_name][timestamp] = self.last_board_count[board_name] - 1
self.last_board_count[board_name] = self.board_count[board_name][timestamp]
else:
self.board_count[board_name][timestamp] = self.last_board_count[board_name]
self.opened_count[timestamp] = self.last_opened_count
self.closed_count[timestamp] = self.last_closed_count
def mark_previous_microsecond(self, timestamp):
one_microsecond_ago = timestamp - timedelta.resolution
if self.headers[-1] != one_microsecond_ago:
self.headers.append(one_microsecond_ago)
self.opened_count[one_microsecond_ago] = self.last_opened_count
self.closed_count[one_microsecond_ago] = self.last_closed_count
for board_name in self.board_count:
self.board_count[board_name][one_microsecond_ago] = self.last_board_count[board_name]
def finalize_board_timeline(self, stop_time):
self.opened_count[stop_time] = self.last_opened_count
self.closed_count[stop_time] = self.last_closed_count
for board_name in self.board_count:
self.board_count[board_name][stop_time] = self.last_board_count[board_name]
if self.headers[-1] != stop_time:
self.headers.append(stop_time)
def write_to_csv(self):
with open(self.csv_filename, 'w', newline='') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=self.headers)
writer.writeheader()
writer.writerow(self.opened_count)
for board_name in self.board_count:
writer.writerow(self.board_count[board_name])
writer.writerow(self.closed_count)
def get_tz_aware_datetime(datetime_string):
timestamp = datetime.fromisoformat(datetime_string)
if timestamp.tzinfo is None or timestamp.tzinfo.utcoffset(timestamp) is None:
timestamp = datetime.fromisoformat(f'{datetime_string}+00:00')
return timestamp
"""
if __name__ == '__main__':
name = 'Chess 1'
files = ('sample_project.json', 'sample_project.csv')
sprint_start = timezone('US/Central').localize(datetime(2019, 11, 20, 14, 30, 0, 0))
sprint_stop = timezone('US/Central').localize(datetime(2019, 12, 4, 14, 30, 0, 0))
timeline_data = TimelineData(name, files[0], files[1])
events = timeline_data.collect_event_dates()
truncated_events = list(filter(lambda event: sprint_start <= event < sprint_stop, events))
counts = timeline_data.issue_status_count_over_time(sprint_start, sprint_stop, truncated_events)
counts.write_to_csv()
"""
if __name__ == '__main__':
sprint_start = timezone('US/Central').localize(datetime(2019, 11, 20, 14, 30, 0, 0))
sprint_stop = timezone('US/Central').localize(datetime(2019, 12, 4, 14, 30, 0, 0))
with open('1198-capstones.json', mode='r') as json_file:
capstones = json.load(json_file)
for capstone in capstones:
timeline_data = TimelineData(capstone['name'], capstone['file'], capstone['csv'])
events = timeline_data.collect_event_dates()
truncated_events = list(filter(lambda event: sprint_start <= event < sprint_stop, events))
counts = timeline_data.issue_status_count_over_time(sprint_start, sprint_stop, truncated_events)
counts.write_to_csv()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment