Source code for timelink.kleio.groups.kgroup

import json
import textwrap
import warnings
from os import linesep as nl
from typing import Type, List, Union, Tuple, Dict

from box import Box

from timelink.kleio.groups.kelement import KElement


[docs] class KGroup: """ KGroup(*positional_elements ,**more_elements) Abstract Kleio Group. To define a Kleio Group extend this class and set default value for _name, or use extend(name,position, guaranteed, also). Define allowed elements in the default values for _position, _guaranteed, _also (list of strings) or call allowed_as_element . Use _part to list allowed enclosed groups, or call allow_as_part For an example see timelink.kleio.groups.KPerson """ # Class scoped list of reserved element names for system use _builtin_elements = [ "line", "level", "order", "inside", "groupname", "pom_class_id", "id", ] # These are shared between all instances of the class _position: list = [] # list of positional elements _guaranteed: list = [] # list of required elements _also: list = [] # list of optional elements _part: list = [] # allowed sub groups _extends: Type["KGroup"] # name of group this is based on _pom_class_id: str = "entity" # Id of PomSom mapper for this group _element_check = True # if true validates element assignment _elementsd: Type[Dict[str, KElement]] # Current elements _inside: Type["KGroup"] # group that includes this # The following fields are generated during the translation process. # If the Kleio groups are generated programatically (for instance when # importing from csv files or from databases) then the cls.include method # will update these values _line: int # line in the source file _level: int # nesting always +1 than enclosing group _order: int # sequential number of this group in the original source # class scoped counters to ensure proper numbering across different groups _global_sequence: int = 1 # global sequence count _global_line: int = 1 # global line count # we need the classes for attributes and relations, used by the # attr and rel methods. Since those class are subclasses of this one # they need to set this latter. Also allows for differet classes to # be used by attr and rel if necessary _attr_class = None # class used by the attr method (set by KAttribute) _rel_class = None # class used by the rel method (set by KRelation) # TODO to_kleio_str generates the definition of a group # for a kleio str file. recurse=yes # collects included groups and generates for those also. @property def kname(self): """The kleio name of this group, used in the to_kleio() method""" kname = self.unpack_from_kelement(self._name) if kname is None or kname == "kgroup": kname = self.__class__._name return kname @kname.setter def kname(self, value): self._name = self.pack_as_kelement("kname", value, element_class="name") @property def inside(self): return self._inside def get_container_id(self): if self._inside is None: gid = None elif KGroup.is_kgroup(self._inside): gid = self.unpack_from_kelement(self.inside.id) return gid @inside.setter def inside(self, value): if KGroup.is_kgroup(value): self._inside = value else: self._inside = KGroup(id=str(value), element_check=False) @property def line(self): return self.unpack_from_kelement(self._line) @line.setter def line(self, value): self._line = self.pack_as_kelement("line", value) @property def level(self): return self.unpack_from_kelement(self._level) @level.setter def level(self, value): self._level = self.pack_as_kelement("level", value) @property def order(self): return self.unpack_from_kelement(self._order) @order.setter def order(self, value): self._order = self.pack_as_kelement("order", value) @property def pom_class_id(self): return self.unpack_from_kelement(self._pom_class_id) @pom_class_id.setter def pom_class_id(self, pcid): self._pom_class_id = self.pack_as_kelement("pom_class_id", pcid)
[docs] @classmethod def extend( cls, name: str, position: Union[list, str, None] = None, guaranteed: Union[list, str, None] = None, also: Union[list, str, None] = None, part: Union[list, str, None] = None, synonyms: Union[Tuple[str, str], None] = None, ): """ Create a new group definition by extending this one :param name: name of the new group :param position: list of positional elements :param guaranteed: list of required elements :param also: list of optional elements :param part: list of groups that can be included in this one :param synonyms: list of tuples defining synonym for elements e,g, [("ano","year"),...] :return: new group class """ new_group = type(name, (cls,), {}) new_group._name = name # todo: k,v in kwargs if in cls set if not error if type(position) is str: position = [position] if type(guaranteed) is str: guaranteed = [guaranteed] if type(also) is str: also = [also] if position is not None: new_group._position = position else: new_group._position = list(cls._position) if guaranteed is not None: new_group._guaranteed = guaranteed else: new_group._guaranteed = list(cls._guaranteed) if also is not None: new_group._also = also else: new_group._also = list(cls._also) if part is not None: new_group._part = part else: new_group._part = list(cls._part) if synonyms is not None: try: for word, synonym in synonyms: sym_class = KElement.get_class_for(synonym) if sym_class is None: msg = f"synonym error {synonym} is unkown" raise ValueError(msg) else: # this creates a new KElement class # named "word" that inherits from the KElement # class of synonym sym_class.extend(word) except ValueError: msg = "synonyms must be list of tuples" "e.g. [('ano','date').." raise (ValueError(msg)) new_group._extends = cls return new_group
[docs] @classmethod def get_subclasses(cls): """Generator for subclasses of this group""" for subclass in cls.__subclasses__(): yield from subclass.get_subclasses() yield subclass
[docs] @classmethod def all_subclasses(cls): """List of all the subclasses of this group""" return list(cls.get_subclasses())
[docs] @classmethod def is_kgroup(cls, g): """True g is an instance of a subclass of KGroup""" return "KGroup" in [c.__name__ for c in type(g).mro()]
[docs] @classmethod def elements_allowed(cls) -> set: """Set of Elements allowed in this Group""" return set(cls._guaranteed).union(set(cls._also)).union(cls._position)
[docs] @classmethod def allow_as_element( cls, ename: Union[str, List[str]], guaranteed=False, also=True, position=None ): """ Add element or list to list of allowed elements for this group. Optionally define if element(s) is positional, required (guaranteed) or optional :param ename: name of element :param guaranteed: if True this is element is added to list of required elements. :param also: if True this element is optional (default) :param position: int, this is a positional element, at this position (0 = first position) :return: List of allowed elements """ if type(ename) is List: for e in ename: cls.allow_as_element( e, guaranteed=guaranteed, position=position, also=also ) return elif type(ename) is str: if guaranteed: if ename not in cls._guaranteed: cls._guaranteed.append(ename) if position is not None: if ename in cls._position: cls._position.remove(ename) cls._position.insert(position, ename) if ename not in cls.elements_allowed(): cls._also.append(ename) else: raise TypeError("first argument must be string or list of strings") return
[docs] @classmethod def allow_as_part(cls, g: Union[str, type]): """Allow g to be enclosed as part of this group. Arguments: g: the name of a group, or a subclass of KGroup. A string or class. """ if g not in cls._part: cls._part.append(g)
@classmethod def inc_sequence(cls): KGroup._global_sequence += 1 return KGroup._global_sequence @classmethod def inc_line(cls): KGroup._global_line += 1 return KGroup._global_line def __init__(self, *args, **kwargs): """Creates a new instance of group with setting the value of elements. Args: *args: values for positional arguments **kwargs: values for optional elements. Example: ``ks = KSource("s1", type="test", loc="auc", ref="alumni", obs="Nested")`` Use element_check=False to turn off checking of element names """ self.id: str = None self.source_id: str = None self._name: str = "kgroup" self._containsd: dict = {} self.level = 1 self.line = 1 self.order = 1 self._element_check = True self._elementsd = {} self._inside = None self.source_id = None if len(args) > len(self._position): raise ValueError("Too many positional elements") n = 0 # set the positional arguments according to "_position" for arg in args: e = self._position[n] # setattr(self, e, KElement(e, arg)) self[e] = arg # this will go through __setitem__ n = n + 1 # keyword arguments must be in one the element lists for k, v in kwargs.items(): if k == "element_check": self._element_check = v else: if self._element_check and not self.is_allowed_as_element(k): raise ValueError(f"Element not allowed: {k}") self[k] = v # test if the compulsory (guaranteed) elements are present for g in self._guaranteed: if getattr(self, g, None) is None: raise TypeError( f"{self.kname}: element {g} in _guaranteed " f"is missing or with None value" )
[docs] def include(self, group: Type["KGroup"]): """Include a group. `group`, or its class, must in _part list or extend a class in the part list. Returns self so it is possible to chain: g.include(g2).include(g3)""" allowed = self.is_allowed_as_part(group) if allowed is None: raise ValueError(f"Group {self.kname} cannot contain {group.kname}") group.level = self.level + 1 group.line = KGroup.inc_line() group.order = KGroup.inc_sequence() # Hook to before input processing in the group being included if hasattr(group, "before_include") and callable(group.before_include): if not group.before_include(self): raise TypeError( f"{group} includding aborted by " f"group.before_include(self)" f" returning False" ) # new style, dictionary based k = self._containsd.keys() if allowed in k: self._containsd[allowed].append(group) else: self._containsd[allowed] = [group] group.inside = self # Hook to after input processing in the group being included if hasattr(group, "after_include") and callable(group.after_include): group.after_include(self) return self
[docs] def before_include(self, container_group): """Method called before this group is included into another through KGroup.include(KGroup).""" if self.id is None: if container_group.id is None: raise ValueError( "A group with no id cannot be included in another " "group also without id" ) gid = f"{container_group.id}-{self.order:02d}-{self.kname[0:3]}" self["id"] = gid return True return True
[docs] def after_include(self, group): """Method called after a new group is included in this one through KGroup.include(KGroup).""" pass
[docs] def is_allowed_as_part(self, group): """Test if a group can be included in the current one. For a group to be allowed for inclusion one of 3 conditions necessary: 1. the kname of the group is in self._pars 2. the type of the group is in self._pars 3. the type of the group inherits from a type in self._pars Return key under which the group is allowed (kname, type or super type) Return None if not allowed """ if not self.is_kgroup(group): raise TypeError("Argument must be subclass of KGroup") if group.kname not in self._part: allowed_classes = [c for c in self._part if type(c) is not str] super_classes = type(group).mro() r = list(set(super_classes).intersection(set(allowed_classes))) if len(r) == 0: allowed = None else: allowed = r[0] # kname? else: allowed = group.kname return allowed
[docs] def is_allowed_as_element(self, element_name): """ Test if this element is allowed in this group. For an element to be allowed one the following must be true: 1. part of KGroup._builtin 2. part of position list 3. part of guarenteed list 4. part of also list Note that this function is unaffected by self._element_check :param element_name: name of element to check :return: True if element allowed False otherwise """ all_elements = ( self._builtin_elements + self._position + self._guaranteed + self._also ) return element_name in all_elements
[docs] def includes(self, group: Type[Union[str, Type["KGroup"]]] = None) -> list: """Returns included groups. Groups are returned by the order in _pars. :todo: this would better be a generator, yield instead of extend. :param str group: filter by group name """ if group is not None: if group in self._containsd.keys(): return self._containsd[group] else: if type(group) is str: gname = group elif KGroup.is_kgroup(group): gname = group.kname inc_by_part_order = [] classes_in_contains = [ c for c in self._containsd.keys() if hasattr(c, "_name") ] for class_in_contains in classes_in_contains: if class_in_contains._name == gname: inc_by_part_order.extend(self._containsd[class_in_contains]) return inc_by_part_order else: # no specific subgroup, we return by pars order inc_by_part_order = [] for p in self._part: if p in self._containsd.keys(): inc_by_part_order.extend(self._containsd[p]) return inc_by_part_order
[docs] def attr( self, the_type: Union[str, KElement, Tuple[str, str, str]], value: Union[str, KElement, Tuple[str, str, str]], date: Union[str, KElement, Tuple[str, str, str]], obs=None, ): """Utility function to include a KAttribute in this KGroup The call:: KGroup.attr('age','25','2021-08-08',obs='in May') is short hand for:: KGroup.include(KAttr('age','25','2021-08-08',obs='in May')) Params google style :param str or tuple the_type: core or (core,org,comment) :param str or tuple value: core or (core,org,comment) :param str date: date as string in Kleio format, or (date,org,comment) :param str obs: options observation field """ ka = self._attr_class self.include(ka(the_type, value, date=date, obs=obs)) return self
[docs] def rel( self, the_type: Union[str, tuple], value: Union[str, tuple], destname: Union[str, tuple], destination: Union[str, tuple], date: Union[str, tuple] = None, obs: str = None, ): """include a relation in this KGroup""" kr = self._rel_class self.include(kr(the_type, value, destname, destination, date=date, obs=obs))
[docs] def to_kleio(self, indent="") -> str: """Return a kleio representation of the group.""" return self.__str__(indent=indent, recurse=True)
[docs] def to_dict( self, allow_none: bool = False, include_str: bool = False, include_kleio: bool = False, redundant_keys: bool = True, include_builtin: bool = True, ): """Return group information as a dict. Params: allow_none bool = Include null values (default False) :param allow_none: Include null values (default False) :param include_str: include a string represention of each element with keys [elementname]_str, the value will include # and % if necessary :param include_kleio: include a kleio representation of each element with keys [elementname]_kleio; this is similar to *include_str* but the value also contains the element name :param redundant_keys: include redundant keys (see bellow) :param include_builtin: include also elements like line,level,... etc... :return: a dictionary with the structure bellow *Keys of returned dict* :group[element]: core value of element; also group[element_core] :group[element_comment]: comment aspect of element :group[element_original]: original aspect of element :group[element_str]: if *include_str=True*, a string representation of elementm(with # and % if necessary); :group[element_kleio]: if *include_kleio=True*, a kleio representation in the form ``element=string`` :group[includes]: list of enclosed groups :group[includes][subgroup]: list of enclosed groups of type subgroup If *redundant_keys=True* enclosed subgroups can also be accessed in the plural form, if no name conflict with existing elements:: group['persons'] == group['includes']['persons'] group['person']['id1'] == [p for p in group['includes']['persons'] if p.id='id1'][0] :todo: add first(), last() where includes is allowed:: group['first']['person'] = group['includes']['person'][0] """ kd = dict() kd["kleio_group"] = self._name elements_to_include = self.elements_allowed() if include_builtin: els = elements_to_include.union(set(self._builtin_elements)) - set( ["inside"] ) for e in els: v: KElement = getattr(self, e, None) if v is not None: if issubclass(type(v), KElement): core, comment, original = v.to_tuple() kd[e] = core kd[e + "_core"] = core kd[e + "_comment"] = comment kd[e + "_original"] = original if include_str: kd[e + "_str"] = str(v) if include_kleio: kd[e + "_kleio"] = v.to_kleio() else: kd[e] = v if not allow_none: kd = dict([(key, value) for key, value in kd.items() if value is not None]) # we now includes subgroups ki = dict() # we now collect subgroups by name included = list(self.includes()) for i in included: n = i.kname if n not in ki.keys(): ki[n] = [ i.to_dict( include_str=include_str, include_kleio=include_kleio, include_builtin=include_builtin, redundant_keys=redundant_keys, ) ] else: ki[n].append( i.to_dict( include_str=include_str, include_kleio=include_kleio, include_builtin=include_builtin, redundant_keys=redundant_keys, ) ) if len(ki) > 0: kd["includes"] = ki # if there are no name conflicts and plural form # so g['includes']['act'] can be accessed as # g['acts'] if redundant_keys: for subgroup in ki.keys(): if subgroup + "s" not in self.elements_allowed(): kd[subgroup + "s"] = ki[subgroup] # we include subgroup indexed by id # so we can have source['act']['ac010]['person']['p01'] for group in ki[subgroup]: gid = group.get("id", None) if ( gid is not None and subgroup not in self.elements_allowed() # noqa ): if subgroup not in kd.keys(): kd[subgroup] = dict() kd[subgroup][gid] = group return kd
def to_json(self): return json.dumps( self.to_dict( include_str=False, include_kleio=False, redundant_keys=False, include_builtin=True, ), indent=4, allow_nan=False, ) @property def get(self): return self.to_dict() def to_dots(self): return Box(self.to_dict()) @property def dots(self): """ Allows easy referencing of the dictionary representation of the group. It is very usefull in list comprehensiona, e.g.: >> 'Diamantina' in [ls.value for ls in person.dots.lss if ls.type == 'nome-geografico'] >> [ls.value for ls in person.dots.lss if ls.type == 'nome-geografico'] :return: """ return self.to_dots() def __str__(self, indent="", recurse=False): sname = self.kname s = str(sname) + "$" first = True out = [] for e in self._position: v: KElement = getattr(self, e, None) if v is not None: if not first: s = s + "/" + v.to_kleio(name=False) else: s = s + v.to_kleio(name=False) first = False out.append(e) more = sorted( list( set(self._guaranteed) .union(set(self._also)) .union(self._position) .difference(out) ) ) # print(more) if "obs" in more: # we like obs elements at the end more.remove("obs") more.append("obs") for e in more: m: Union[KElement, str] = getattr(self, e, None) if m is not None and ( type(m) is str and m > "" # noqa or (issubclass(type(m), KElement) and m.to_kleio() > "") # noqa ): # m contains data, lets output if issubclass(type(m), KElement): v = m.to_kleio() else: v = KElement(e, m).to_kleio() if not first: s = s + f"/{v}" else: s = s + f"{v}" first = False if recurse: for g in self.includes(): s = s + nl + g.__str__(indent + " ", recurse=recurse) return textwrap.indent(s, indent) def __getitem__(self, arg): if self._element_check and arg not in self.elements_allowed(): raise ValueError("Element does not exist in group") return getattr(self, arg) def __setitem__(self, arg, value): if self._element_check and not self.is_allowed_as_element(arg): raise ValueError(f"Element not allowed: {arg}") el = self.pack_as_kelement(arg, value) setattr(self, arg, el) self._elementsd[arg] = el
[docs] def pack_as_kelement(self, arg, value, element_class=None): """Packs value as a KElement with name arg :param arg: name of element :param value: value of element; can be a KElement, a tuple (core, cooment, original) or a value :param element_class: class of element as string; if none arg is used to find class TODO: the question is what is the class of the KElement to be stored if value is a KElement then it already has a class """ # TODO: this is wrong. if value is a KElement it should be stored as such if element_class is None: kelement = KElement.get_class_for(arg) else: kelement = KElement.get_class_for(element_class) if kelement is None and isinstance(value, KElement): if isinstance(value.element_class, KElement): kelement = value.element_class else: kelement = KElement.get_class_for(value.element_class) if kelement is None: # if there is no KElement class we create it kelement = KElement.extend(arg) warnings.warn( f"Created a KElement class for {arg}. " f"Better to create explicit or provide " f" synonyms= in group creation.", stacklevel=2, ) # we get KElement class that matches the name # this is how we handle localized name of elements that # have a builtin meaning or are referred to by standard names # in PomSomMapping if isinstance(value, KElement): el = kelement( core=value.core, comment=value.comment, original=value.original ) else: comment = None original = None if type(value) is tuple and len(value) == 3: core, comment, original = value else: core = value el = kelement(arg, core=core, comment=comment, original=original) return el
[docs] def unpack_from_kelement(self, value): """ if value is a KElement return core if not return value as is. Useful to obtain the core value in elements that normally have no comment or original. :param value: :return: """ if not isinstance(value, KElement): # we did not get a KElement return value else: # we got a KElement object return value.core
[docs] def get_core(self, element, default=None): """get_core(element_name [, default]) Returns the core value of an element """ try: core = self.unpack_from_kelement(self[element]) except ValueError: core = None except AttributeError: core = None if core is None: return default else: return core
[docs] def get_id(self): """Return the id of the group""" return self.unpack_from_kelement(self.id)
[docs] def get_element_by_name_or_class(self, element_spec, default=None): """ Return the value of an element with a given name or that inherits from an element of the given name. An element matches a element_spec if its name is equal to the element_spec name or if it is an instance of KElement subclass with the name equal to element_spec. :param element_spec: name of column, or name of POMClassAttributes :param default: default value if not element found :return: KElement, or whatever in default. """ el: KElement for ( name, el, ) in ( self._elementsd.items() ): # same name as column or in ancestors # noqa: E501 if name == element_spec: return el # Handles synonyms created by subclassing core KElements for el in self._elementsd.values(): # if name in inherited names if element_spec in el.inherited_names(): return el # handles multiple subclassing of core KElements for ( el ) in ( self._elementsd.values() ): # check if there are alternative classes # noqa: E501 # all classes for colspec targets = KElement.get_classes_for(element_spec) # other classes for el alternatives = KElement.get_classes_for(el.name) # now check if there is a common path # collect all the ancestors of all the classes for this el alt_ancestors = [(sc, sc.inherits_from()) for sc in alternatives] # check if any of colspec classes are there for _alternative, ancestors in alt_ancestors: common = set(ancestors).intersection(set(targets)) if len(common) > 0: return el return default