201 lines
7.5 KiB
Python
201 lines
7.5 KiB
Python
import codecs
|
|
import csv
|
|
import datetime
|
|
import logging
|
|
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.models import Group
|
|
|
|
from todo.models import Task, TaskList
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class CSVImporter:
|
|
"""Core upsert functionality for CSV import, for re-use by `import_csv` management command, web UI and tests.
|
|
Supplies a detailed log of what was and was not imported at the end. See README for usage notes.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.errors = []
|
|
self.upserts = []
|
|
self.summaries = []
|
|
self.line_count = 0
|
|
self.upsert_count = 0
|
|
|
|
def upsert(self, fileobj, as_string_obj=False):
|
|
"""Expects a file *object*, not a file path. This is important because this has to work for both
|
|
the management command and the web uploader; the web uploader will pass in in-memory file
|
|
with no path!
|
|
|
|
Header row is:
|
|
Title, Group, Task List, Created Date, Due Date, Completed, Created By, Assigned To, Note, Priority
|
|
"""
|
|
|
|
if as_string_obj:
|
|
# fileobj comes from mgmt command
|
|
csv_reader = csv.DictReader(fileobj)
|
|
else:
|
|
# fileobj comes from browser upload (in-memory)
|
|
csv_reader = csv.DictReader(codecs.iterdecode(fileobj, "utf-8"))
|
|
|
|
# DI check: Do we have expected header row?
|
|
header = csv_reader.fieldnames
|
|
expected = [
|
|
"Title",
|
|
"Group",
|
|
"Task List",
|
|
"Created By",
|
|
"Created Date",
|
|
"Due Date",
|
|
"Completed",
|
|
"Assigned To",
|
|
"Note",
|
|
"Priority",
|
|
]
|
|
if header != expected:
|
|
self.errors.append(
|
|
f"Inbound data does not have expected columns.\nShould be: {expected}"
|
|
)
|
|
return
|
|
|
|
for row in csv_reader:
|
|
self.line_count += 1
|
|
|
|
newrow = self.validate_row(row)
|
|
if newrow:
|
|
# newrow at this point is fully validated, and all FK relations exist,
|
|
# e.g. `newrow.get("Assigned To")`, is a Django User instance.
|
|
assignee = newrow.get("Assigned To") if newrow.get("Assigned To") else None
|
|
created_date = (
|
|
newrow.get("Created Date")
|
|
if newrow.get("Created Date")
|
|
else datetime.datetime.today()
|
|
)
|
|
due_date = newrow.get("Due Date") if newrow.get("Due Date") else None
|
|
priority = newrow.get("Priority") if newrow.get("Priority") else None
|
|
|
|
obj, created = Task.objects.update_or_create(
|
|
created_by=newrow.get("Created By"),
|
|
task_list=newrow.get("Task List"),
|
|
title=newrow.get("Title"),
|
|
defaults={
|
|
"assigned_to": assignee,
|
|
"completed": newrow.get("Completed"),
|
|
"created_date": created_date,
|
|
"due_date": due_date,
|
|
"note": newrow.get("Note"),
|
|
"priority": priority,
|
|
},
|
|
)
|
|
self.upsert_count += 1
|
|
msg = (
|
|
f'Upserted task {obj.id}: "{obj.title}"'
|
|
f' in list "{obj.task_list}" (group "{obj.task_list.group}")'
|
|
)
|
|
self.upserts.append(msg)
|
|
|
|
self.summaries.append(f"Processed {self.line_count} CSV rows")
|
|
self.summaries.append(f"Upserted {self.upsert_count} rows")
|
|
self.summaries.append(f"Skipped {self.line_count - self.upsert_count} rows")
|
|
|
|
return {"summaries": self.summaries, "upserts": self.upserts, "errors": self.errors}
|
|
|
|
def validate_row(self, row):
|
|
"""Perform data integrity checks and set default values. Returns a valid object for insertion, or False.
|
|
Errors are stored for later display. Intentionally not broken up into separate validator functions because
|
|
there are interdpendencies, such as checking for existing `creator` in one place and then using
|
|
that creator for group membership check in others."""
|
|
|
|
row_errors = []
|
|
|
|
# #######################
|
|
# Task creator must exist
|
|
if not row.get("Created By"):
|
|
msg = f"Missing required task creator."
|
|
row_errors.append(msg)
|
|
|
|
creator = get_user_model().objects.filter(username=row.get("Created By")).first()
|
|
if not creator:
|
|
msg = f"Invalid task creator {row.get('Created By')}"
|
|
row_errors.append(msg)
|
|
|
|
# #######################
|
|
# If specified, Assignee must exist
|
|
assignee = None # Perfectly valid
|
|
if row.get("Assigned To"):
|
|
assigned = get_user_model().objects.filter(username=row.get("Assigned To"))
|
|
if assigned.exists():
|
|
assignee = assigned.first()
|
|
else:
|
|
msg = f"Missing or invalid task assignee {row.get('Assigned To')}"
|
|
row_errors.append(msg)
|
|
|
|
# #######################
|
|
# Group must exist
|
|
try:
|
|
target_group = Group.objects.get(name=row.get("Group"))
|
|
except Group.DoesNotExist:
|
|
msg = f"Could not find group {row.get('Group')}."
|
|
row_errors.append(msg)
|
|
target_group = None
|
|
|
|
# #######################
|
|
# Task creator must be in the target group
|
|
if creator and target_group not in creator.groups.all():
|
|
msg = f"{creator} is not in group {target_group}"
|
|
row_errors.append(msg)
|
|
|
|
# #######################
|
|
# Assignee must be in the target group
|
|
if assignee and target_group not in assignee.groups.all():
|
|
msg = f"{assignee} is not in group {target_group}"
|
|
row_errors.append(msg)
|
|
|
|
# #######################
|
|
# Task list must exist in the target group
|
|
try:
|
|
tasklist = TaskList.objects.get(name=row.get("Task List"), group=target_group)
|
|
row["Task List"] = tasklist
|
|
except TaskList.DoesNotExist:
|
|
msg = f"Task list {row.get('Task List')} in group {target_group} does not exist"
|
|
row_errors.append(msg)
|
|
|
|
# #######################
|
|
# Validate Dates
|
|
datefields = ["Due Date", "Created Date"]
|
|
for datefield in datefields:
|
|
datestring = row.get(datefield)
|
|
if datestring:
|
|
valid_date = self.validate_date(datestring)
|
|
if valid_date:
|
|
row[datefield] = valid_date
|
|
else:
|
|
msg = f"Could not convert {datefield} {datestring} to valid date instance"
|
|
row_errors.append(msg)
|
|
|
|
# #######################
|
|
# Group membership checks have passed
|
|
row["Created By"] = creator
|
|
row["Group"] = target_group
|
|
if assignee:
|
|
row["Assigned To"] = assignee
|
|
|
|
# Set Completed
|
|
row["Completed"] = row["Completed"] == "Yes"
|
|
|
|
# #######################
|
|
if row_errors:
|
|
self.errors.append({self.line_count: row_errors})
|
|
return False
|
|
|
|
# No errors:
|
|
return row
|
|
|
|
def validate_date(self, datestring):
|
|
"""Inbound date string from CSV translates to a valid python date."""
|
|
try:
|
|
date_obj = datetime.datetime.strptime(datestring, "%Y-%m-%d")
|
|
return date_obj
|
|
except ValueError:
|
|
return False
|