import os
import sys
import time
import copy
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import List, Optional, Union
import platform
import urllib.request
from xml.sax import handler, make_parser, SAXParseException
from sqlalchemy import func, delete # pylint: disable=import-error
from sqlalchemy.orm import Session # pylint: disable=import-error
from sqlalchemy.exc import IntegrityError # pylint: disable=import-error
from timelink.kleio.groups import KGroup, KElement
from timelink.mhk.models.db import pom_som_base_mappings as pom_som_base_mappingsMHK
from timelink.mhk.models.pom_som_mapper import (
PomSomMapper as PomSomMapperMHK,
PomClassAttributes as PomClassAttributesMHK,
)
from timelink.mhk.models.entity import Entity as EntityMHK
from timelink.mhk.models.person import Person as PersonMHK
from timelink.mhk.models.system import KleioImportedFile as KleioFileMHK
from timelink.api.models.base_mappings import (
pom_som_base_mappings as pom_som_base_mappingsTL,
)
from timelink.api.models.pom_som_mapper import PomSomMapper as PomSomMapperTL
from timelink.api.models.pom_som_mapper import (
PomClassAttributes as PomClassAttributesTL,
)
from timelink.api.models.base import Entity as EntityTL, Person as PersonTL
from timelink.api.models.system import KleioImportedFile as KleioFileTL
[docs]
class KleioContext(Enum):
"""Kleio context enumeration"""
START = (1,)
KLEIO = (2,)
CLASS = (3,)
GROUP = (4,)
ELEMENT = (5,)
CORE = (6,)
COMMENT = 7
ORIGINAL = 8
[docs]
class SaxHandler(handler.ContentHandler):
"""SAX Handler for Kleio XML files"""
_context: KleioContext
_current_class: Optional[PomSomMapperTL | PomSomMapperMHK]
_current_class_attrs: List[PomClassAttributesTL | PomClassAttributesMHK]
_current_group: Optional[KGroup]
_current_element: Optional[KElement]
_current_entry: str
def __init__(self, kleio_handler):
handler.ContentHandler.__init__(self)
self._kleio_handler = kleio_handler
[docs]
def startDocument(self):
self._context = KleioContext.START
self._current_class = None
self._current_class_attrs = []
self._current_group = None
self._current_element = None
self._current_entry = ""
[docs]
def endDocument(self):
self._kleio_handler.endKleioFile()
[docs]
def startElement(self, name: str, attrs):
# https://stackoverflow.com/questions/15477363/xml-sax-parser-and-line-numbers-etc
loc = self._locator
ename = name.upper()
if ename == "KLEIO":
self._kleio_handler.newKleioFile(attrs)
self._context = KleioContext.KLEIO
elif ename == "CLASS":
# <CLASS NAME="source" SUPER="entity"
# TABLE="sources" GROUP="fonte">
if self._context != KleioContext.KLEIO:
raise SAXParseException(
exception=None,
locator=loc,
msg="CLASS out of context, should be inside KLEIO element",
)
self._current_class = self._kleio_handler.pom_som_mapper(
id=attrs["NAME"],
super_class=attrs["SUPER"],
table_name=attrs["TABLE"],
group_name=attrs["GROUP"],
)
self._current_class_attrs = []
self._context = KleioContext.CLASS
elif ename == "ATTRIBUTE":
# <ATTRIBUTE NAME="id"
# COLUMN="id"
# CLASS="id"
# TYPE="varchar"
# SIZE="64"
# PRECISION="0"
# PKEY="1" >
# </ATTRIBUTE>
if self._context != KleioContext.CLASS:
raise SAXParseException(
"ATTRIBUTE out of context, should be inside CLASS element",
None,
loc,
)
new_attribute = self._kleio_handler.pom_class_attributes()
new_attribute.the_class = self._current_class.id
new_attribute.name = attrs["NAME"]
new_attribute.colname = attrs["COLUMN"]
new_attribute.colclass = attrs["CLASS"]
new_attribute.coltype = attrs["TYPE"]
new_attribute.colsize = int(attrs["SIZE"])
new_attribute.colprecision = int(attrs["PRECISION"])
new_attribute.pkey = int(attrs["PKEY"])
self._current_class_attrs.append(new_attribute)
elif ename == "GROUP":
if self._context != KleioContext.KLEIO:
raise SAXParseException(
"GROUP out of context, should be inside KLEIO element", None, loc
)
# <GROUP ID="coja-rol-1841" NAME="fonte"
# CLASS="source" ORDER="1" LEVEL="1" LINE="4">
id = attrs["ID"]
name = attrs["NAME"]
the_class = attrs["CLASS"]
order = attrs["ORDER"]
level = attrs["LEVEL"]
line = attrs["LINE"]
# Mal [? porquĂȘ?]
self._current_group = KGroup()
self._current_group.id = id
self._current_group._name = name
self._current_group._pom_class_id = the_class
self._current_group._order = int(order)
self._current_group.level = int(level)
self._current_group.line = int(line)
self._context = KleioContext.GROUP
elif ename == "ELEMENT":
if self._context != KleioContext.GROUP:
raise SAXParseException(
"ELEMENT out of context, should be inside GROUP element",
exception=None,
locator=loc,
)
# <ELEMENT NAME="id" CLASS="id">
# <core>r1775-f1-per1</core></ELEMENT>
elname = attrs["NAME"]
self._current_group.allow_as_element(elname)
self._current_element = KElement(elname, None)
self._current_element.element_class = attrs["CLASS"]
self._context = KleioContext.ELEMENT
elif ename == "CORE":
if self._context != KleioContext.ELEMENT:
raise SAXParseException(
"CORE out of context, should be inside ELEMENT element",
exception=None,
locator=loc,
)
self._context = KleioContext.CORE
self._current_entry = ""
elif ename == "COMMENT":
if self._context != KleioContext.ELEMENT:
raise SAXParseException(
"COMMENT out of context, should be inside ELEMENT element",
exception=None,
locator=loc,
)
self._context = KleioContext.COMMENT
self._current_entry = ""
elif ename == "ORIGINAL":
if self._context != KleioContext.ELEMENT:
raise SAXParseException(
"ORIGINAL out of context, should be inside ELEMENT element",
exception=None,
locator=loc,
)
self._context = KleioContext.ORIGINAL
self._current_entry = ""
elif ename == "RELATION":
if self._context != KleioContext.KLEIO:
raise SAXParseException(
"GROUP out of context, should be inside KLEIO element",
exception=None,
locator=loc,
)
self._kleio_handler.newRelation(attrs)
else:
raise SAXParseException(
f"Unexpected element in Kleio file: {ename}",
exception=None,
locator=loc,
)
[docs]
def endElement(self, name):
ename = name.upper()
if ename == "GROUP":
self._kleio_handler.newGroup(self._current_group)
self._context = KleioContext.KLEIO
self._current_group = None
elif ename == "CLASS":
self._kleio_handler.newClass(self._current_class, self._current_class_attrs)
self._context = KleioContext.KLEIO
elif ename == "ATTRIBUTE":
self._context = KleioContext.CLASS
elif ename == "ELEMENT":
self._current_group[self._current_element.name] = self._current_element
self._context = KleioContext.GROUP
elif ename in ["CORE", "COMMENT", "ORIGINAL"]:
if ename == "CORE":
if self._current_element.core is None:
self._current_element.core = self._current_entry
if ename == "COMMENT":
if self._current_element.comment is None:
self._current_element.comment = self._current_entry
if ename == "ORIGINAL":
if self._current_element.original is None:
self._current_element.original = self._current_entry
self._context = KleioContext.ELEMENT
[docs]
def characters(self, content):
self._current_entry = self._current_entry + content
[docs]
def ignorableWhitespace(self, whitespace):
pass
[docs]
def processingInstruction(self, target, data):
pass
class KleioHandler:
pom_som_base_mappings = None
pom_som_mapper = None
pom_class_attributes = None
entity_model = None
person_model = None
kleio_file_model = None
model_type: str = None
postponed_relations = []
errors = []
warnings = []
kleio_file_origin = None
kleio_file = None
kleio_file_name = None
kleio_structure = None
kleio_translator = None
kleio_when = None
kleio_context = None
pom_som_cache = dict()
def __init__(self, session, mode="TL"):
"""
Arguments:
session: a SQLAlchemy session
mode: the mode of the import, either 'TL'(Timelink) or 'MHK'
"""
self.session = session
if mode == "TL":
self.pom_som_base_mappings = pom_som_base_mappingsTL
self.pom_som_mapper = PomSomMapperTL
self.pom_class_attributes = PomClassAttributesTL
self.pom_class_attributes = PomClassAttributesTL
self.entity_model = EntityTL
self.person_model = PersonTL
self.kleio_file_model = KleioFileTL
self.model_type = "TL"
elif mode == "MHK":
self.pom_som_base_mappings = pom_som_base_mappingsMHK
self.pom_som_mapper = PomSomMapperMHK
self.pom_class_attributes = PomClassAttributesMHK
self.entity_model = EntityMHK
self.person_model = PersonMHK
self.kleio_file_model = KleioFileMHK
self.model_type = "MHK"
else:
raise ValueError(f"Unknown mode {mode}")
def newKleioFile(self, attrs):
# <KLEIO STRUCTURE="/kleio-home/system/conf/kleio/stru/gacto2.str"
# SOURCE="/kleio-home/sources/soure-fontes/sources/1685-1720/baptismos/b1685.cli"
# TRANSLATOR="gactoxml2.str"
# WHEN="2020-8-18 17:10:32"
# OBS="" SPACE="">
self.session.commit()
self.postponed_relations = []
self.errors = []
self.warnings = []
self.kleio_file = attrs["SOURCE"]
# extract name of file from path
self.kleio_file_name = self.kleio_file.split("/")[-1]
self.kleio_structure = attrs["STRUCTURE"]
self.kleio_translator = attrs["TRANSLATOR"]
self.kleio_when = attrs["WHEN"]
self.pom_som_cache = dict()
def newClass(
self,
psm: PomSomMapperTL | PomSomMapperMHK,
attrs: List[PomClassAttributesTL | PomClassAttributesMHK],
):
if psm.id in self.pom_som_base_mappings.keys():
# we do not allow redefining of base mappings
return
session = self.session
# check if we have this class defined in the database.
existing_psm: PomSomMapperTL | PomSomMapperMHK = session.get(
self.pom_som_mapper, psm.id
)
if existing_psm is not None: # class exists: delete, insert again
try:
session.commit()
stmt = delete(self.pom_class_attributes).where(
self.pom_class_attributes.the_class == psm.id
)
session.execute(stmt)
session.delete(existing_psm)
session.commit()
except Exception as e:
session.rollback()
self.errors.append(
f"ERROR: deleting class {psm.id}: {e.__class__.__name__}: {e}"
)
# now we add the new mapping
try:
session.add(psm)
for class_attr in attrs:
session.add(class_attr)
session.commit()
except Exception as e:
session.rollback()
self.errors.append(
f"ERROR: adding class {psm.id}: {e.__class__.__name__}: {e}"
)
# ensure that the table and ORM classes are created
try:
psm.ensure_mapping(session)
session.commit()
except Exception as e:
session.rollback()
self.errors.append(
f"ERROR: creating ORM mapping for class {psm.id}: {e.__class__.__name__}: {e}"
)
def newGroup(self, group: KGroup):
# get the PomSomMapper
# pass storeKGroup
pom_mapper_for_group: PomSomMapperTL | PomSomMapperMHK
try:
if group.pom_class_id in self.pom_som_cache.keys():
pom_mapper_for_group = self.pom_som_cache[group.pom_class_id]
else:
pom_mapper_for_group = self.pom_som_mapper.get_pom_class(
group.pom_class_id, self.session
)
if pom_mapper_for_group is None:
raise ValueError(
f"Could not find PomSomMapper for class {group.pom_class_id}"
)
self.pom_som_cache[group.pom_class_id] = pom_mapper_for_group
except Exception as exc:
self.errors.append(
f"ERROR: {self.kleio_file_name} {str(group.line)}"
f" finding PomSomMapper for class "
f"{group.pom_class_id}: {exc.__class__.__name__}: {exc}"
)
return
if pom_mapper_for_group.id == "relation":
# it can happen that the destination of a relation
# is not yet in the database (forward reference in relation)
# In this case we postpone the storing of the relation
# until the end of file
exits_dest_rel = self.session.get(
self.entity_model, group.get_element_for_column("destination").core
)
if exits_dest_rel is None:
self.postponed_relations.append(
(pom_mapper_for_group.id, copy.deepcopy(group))
)
else:
try:
pom_mapper_for_group.store_KGroup(group, self.session)
except Exception as exc:
self.session.rollback()
self.errors.append(
f"ERROR: {self.kleio_file_name} {str(group.line)} "
f"storing group {group.kname}${group.id}: "
f"{exc.__class__.__name__}: {exc}"
)
self.session.rollback()
else:
try:
pom_mapper_for_group.store_KGroup(group, self.session)
except IntegrityError as ierror:
self.errors.append(
f"ERROR: {self.kleio_file_name} line {str(group.line)} "
f"** integrity error {group.kname}${group.id}: {ierror}"
)
self.session.rollback()
except Exception as exc:
self.session.rollback()
self.errors.append(
f"ERROR: {self.kleio_file_name} line {str(group.line)} "
f"storing group {group.kname}${group.id}: {exc.__class__.__name__}: {exc}"
)
def newRelation(self, attrs):
pass
def endKleioFile(self):
"""Process end of file: process postponed relations"""
# store postponed relations
postponed = len(self.postponed_relations)
if postponed > 0:
log = f"Storing {postponed} postponed relations"
print(log)
for pom_mapper_id, group in self.postponed_relations:
pom_mapper = self.pom_som_mapper.get_pom_class(pom_mapper_id, self.session)
try:
pom_mapper.store_KGroup(group, self.session)
self.session.commit()
except IntegrityError as ierror:
self.errors.append(
f"ERROR: {self.kleio_file_name} {str(group.line)} "
f"storing {group.to_kleio()}\n {ierror}"
)
self.session.rollback()
except Exception as exc:
self.errors.append(
f"ERROR: {self.kleio_file_name} {str(group.line)} "
f"storing {group.to_kleio()}\n {exc.__class__.__name__}: {exc}"
)
self.session.rollback()
self.postponed_relations = []
# store information on kleio file import
kfile = self.kleio_file_model(
path=self.kleio_file, # TODO: #20 should remove /Kleio-home from path
name=self.kleio_file_name,
structure=self.kleio_structure,
translator=self.kleio_translator,
# convert date from "2020-8-18 17:10:32" format
# to datetime object
translation_date=datetime.strptime(self.kleio_when, "%Y-%m-%d %H:%M:%S"),
nerrors=len(self.errors),
nwarnings=len(self.warnings),
)
if len(self.errors) > 0:
s = "\n\n".join(self.errors)
else:
s = "No errors"
kfile.error_rpt = s
if len(self.warnings) > 0:
s = "\n\n".join(self.warnings)
else:
s = "No warnings"
kfile.warning_rpt = s
kfile.imported = datetime.now(timezone.utc)
kfile.imported_string = datetime.now(timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S %Z"
)
kfile_exists = self.session.get(self.kleio_file_model, kfile.path)
if kfile_exists is not None:
self.session.delete(kfile_exists)
self.session.commit()
self.session.add(kfile)
self.session.commit()
[docs]
def import_from_xml(
filespec: Union[str, Path], session: Session, options: dict = None
) -> dict:
"""Import data from file or url into a timelink-mhk database.
The data file must be a XML file in the format exported by the ``kleio``
translator.
Arguments:
filespec (str,Path): a file path, URL of a data file or path in a Kleio Server.
conn_string (str): SQLAlchemy connection string to database.
options (dict): a dictionnary with options
- 'return_stats': if True import stats will be returned
- 'kleio_url': the url of kleio server;
- 'kleio_token': the authorization token for the kleio server.
- 'mode': the mode of the import, either 'TL'(Timelink) or 'MHK'
If kleio_url and kleio_token are specified the data will be fetched from
a KleioServer and the filespec should contain the "xml_path" of the file
in the server (e.g. /rest/exports/reference_sources/varia/vereacao.xml) and
the kleio_token will be inserted in the header for autentication
Returns:
If stats is True in options a dict with statistical information will be
returned.
- 'datetime': the time stamp of the start of the import
- 'machine': local machine name
- 'file': path to imported file or url
- 'import_time_seconds': elapsed time during import
- 'entities_processed': number of entities processed
- 'entity_rate': number of entities processed per second
- 'person_rate': number of persons (entities of class 'person')
- 'nerrors': number of errors during import
- 'errors': list of error messages
Examples:
Returned statistical information when stats=True
::
{
'datetime': '2022-01-02 18:11:12',
'machine': 'joaquims-mbpr.local',
'file': 'https://...b1685.xml',
'import_time_seconds': 7.022288084030151,
'entities_processed': 747,
'entity_rate': 106.37558457603042,
'person_rate': 27.483919441999827
'nerrors': 0
'errors': []
}
TODO: should use https when the kleio_url not local.
"""
collect_stats = False
kleio_url = None
kleio_token = None
mode = "TL" # determines the database model TL=Timelink, MHK=MHK
nentities_before = 0
npersons_before = 0
now = datetime.now()
if options is not None and options.get("return_stats", False):
collect_stats = True
if options is not None and options.get("mode", None) is not None:
mode = options.get("mode")
if options is not None and options.get("kleio_url", None) is not None:
kleio_url = options.get("kleio_url")
if options is not None and options.get("kleio_token", None) is not None:
kleio_token = options.get("kleio_token")
kleio_handler = KleioHandler(session, mode=mode)
sax_handler = SaxHandler(kleio_handler)
parser = make_parser()
parser.setContentHandler(sax_handler)
start = time.time()
if collect_stats:
nentities_before = session.query(
func.count(kleio_handler.entity_model.id)
).scalar()
npersons_before = session.query(
func.count(kleio_handler.person_model.id)
).scalar()
if kleio_url is not None and kleio_token is not None:
headers = {"Authorization": f"Bearer {kleio_token}"}
server_url = f"{kleio_url}{filespec}"
req = urllib.request.Request(server_url, headers=headers)
with urllib.request.urlopen(req) as source:
parser.parse(source)
elif kleio_token is not None or kleio_url is not None:
# this means that one of the options is missing
raise ValueError(
"Both kleio_url and kleio_token must be specified to fetch from kleio server"
)
elif isinstance(filespec, os.PathLike):
source = os.fspath(filespec)
parser.parse(source)
else:
source = filespec
parser.parse(source)
end = time.time()
if collect_stats:
machine = platform.node()
nentities = session.query(func.count(kleio_handler.entity_model.id)).scalar()
npersons = session.query(func.count(kleio_handler.person_model.id)).scalar()
erate = (nentities - nentities_before) / (end - start)
prate = (npersons - npersons_before) / (end - start)
stats = {
"datetime": now.timestamp(),
"machine": machine,
"database": session.bind.url,
"file": filespec,
"import_time_seconds": end - start,
"entities_processed": nentities - nentities_before,
"entity_rate": erate,
"person_rate": prate,
"nerrors": len(kleio_handler.errors),
"errors": kleio_handler.errors,
}
return stats
if (
__name__ == "__main__"
): # Not sure this works, where is the session for KleioHandler?
saxParser = make_parser()
kleioHandler = KleioHandler(None, mode="TL")
saxParser.setContentHandler(SaxHandler(kleioHandler))
saxParser.parse(sys.argv[1])