Source code for timelink.pandas.entities_with_attribute

# pyright: reportArgumentType=false
# pyright: reportCallIssue=false
# pyright: reportAssignmentType=false
# pyright: reportAttributeAccessIssue=false

from typing import List
import pandas as pd
from sqlalchemy.sql import select
from sqlalchemy.orm import Session
from timelink.api.database import TimelinkDatabase
from timelink.api.models.entity import Entity


[docs] def entities_with_attribute( the_type: str | List[str], the_value=None, column_name=None, entity_type="entity", show_elements=None, dates_in=None, name_like=None, filter_by=None, more_attributes=None, db: TimelinkDatabase | None = None, session: Session | None = None, sql_echo=False, ): """Generate a pandas dataframe with entities with a given attribute Args: the_type : type of attribute, can have SQL wildcards, string, or list the_value : if present, limit to this value, can be SQL wildcard, entity_type : if present, limit to this entity type, string column_name : if present, use this name for the attribute column, otherwise use the_type usefull when the_type is a list name_like : if present, limit to this name, can have SQL wildcards show_elements: List of entity elements to add to the dataframe dates_in : (after,before) if present only between those dates (exclusive) filter_by : list of ids, limit to these entities more_attributes: add more attributes if available db : A TimelinkDatabase object session : A SQLAlchemy session, if None will use db.session() sql_echo : if True echo the sql generated Example: # name, sex and function of people living in the same place neighbors = entities_with_attribute( entity_type="person", show_elements=["groupname","names","sex"], the_type='residencia', the_value="soure" column_name="local", # use this istead of "residencia" more_attributes=["profissao"] db=dbsystem, ) Ideas: Add : the_value_in: (list of values) the_value_between_inc (min, max, get >=min and <= max) the_value_between_exc (min, max, get >min and < max) """ # We try to use an existing connection and table introspection # to avoid extra parameters and going to database too much if db is None: raise ( Exception( "db: TimelinkDatabase required. Must create to set up a database connection" ) ) mysession: Session if session is None: mysession = db.session() else: mysession = session # if we dont have a name for the column we use the attribute type sanitized if column_name is None: # if the_type is a list or a string if type(the_type) is list: column_name = "_".join(the_type) else: column_name = str(the_type) date_column_name = f"{column_name}.date" obs_column_name = f"{column_name}.obs" type_column_name = f"{column_name}.type" line_column_name = f"{column_name}.line" level_column_name = f"{column_name}.level" atr_id_column_name = f"{column_name}.attr_id" attribute_extra_info_column_name = f"{column_name}.extra_info" entity_types = db.get_models_ids() if entity_type not in entity_types: raise ValueError(f"entity_type must be one of {entity_types}") entity_model: Entity = db.get_model(entity_type) # type: ignore # get the columns of the entity table, check if more_info is valid entity_columns = select(entity_model).selected_columns.keys() # type: ignore[reportCallIssue] entity_id_col = select(entity_model).selected_columns["id"] # type: ignore if show_elements is None: show_elements = [] extra_cols = [] # collect the column objects for the select list for mi in show_elements: if mi not in entity_columns: raise ValueError(f"{mi} is not a valid column for {entity_type}") else: extra_cols.append(select(entity_model).selected_columns[mi]) # type: ignore[assignment] if name_like is not None: if "name" not in entity_columns: raise (ValueError("To filter by name requires name in the show_elements list.")) attr = db.get_view("eattributes") # id_col = attr.c.entity.label("id") cols = [entity_id_col] cols.extend(extra_cols) more_info_cols = cols.copy() cols.extend( [ attr.c.attr_id.label(atr_id_column_name), attr.c.the_type.label(type_column_name), attr.c.the_value.label(column_name), attr.c.the_date.label(date_column_name), attr.c.the_line.label(line_column_name), attr.c.the_level.label(level_column_name), attr.c.aobs.label(obs_column_name), attr.c.a_extra_info.label(attribute_extra_info_column_name), ] ) if type(the_type) is list: attribute_query = attr.c.the_type.in_(the_type) else: attribute_query = attr.c.the_type.like(the_type) # filter by id list filtered_df: pd.DataFrame = pd.DataFrame() if filter_by is not None: # in some cases the filter_by list # may contain ids that are not in the attribute table # we need to add them to the final dataframe filter_by_sql = ( select(entity_model) # type: ignore .with_only_columns(*more_info_cols, maintain_column_froms=True) .where(entity_model.id.in_(filter_by)) # type: ignore ) with mysession as session: filtered_by_rows = session.execute(filter_by_sql) col_names = filter_by_sql.selected_columns.keys() filtered_df = pd.DataFrame.from_records( filtered_by_rows, index=["id"], columns=col_names ) stmt = ( (select(entity_model).where(entity_id_col.in_(filter_by))) .join(attr, attr.c.entity == entity_model.id, isouter=True) .where(attribute_query) .with_only_columns(*cols) ) else: stmt = ( select(entity_model) .join(attr, attr.c.entity == entity_model.id) .where(attribute_query) .with_only_columns(*cols) ) # Filter by value if the_value is not None: if type(the_value) is list: stmt = stmt.where(attr.c.the_value.in_(the_value)) elif type(the_value) is str: stmt = stmt.where(attr.c.the_value.like(the_value)) else: raise ValueError("the_value must be either a string or a list of strings") # filter by date if dates_in is not None: after_date, before_date = dates_in stmt = stmt.where(attr.c.the_date > after_date, attr.c.the_date < before_date) # filter by name if name_like is not None: stmt = stmt.where(select(entity_model).selected_columns["name"].like(name_like)) stmt = stmt.order_by(attr.c.the_date) if sql_echo: print(f"Query for {the_type}:\n", stmt) with mysession as session: records = session.execute(stmt) col_names = stmt.selected_columns.keys() df = pd.DataFrame.from_records(records, index=["id"], columns=col_names) if df.iloc[0].count() == 0: return None # nothing found we return None # Check for extra info extra_info_edits = [] for row_number, (_, row) in enumerate(df.iterrows()): # Perform operations using row_number, index, and row if row[attribute_extra_info_column_name] is not None: # get the extra_info dict from the row # the dict contains information stored during the import # process that is not stored directly in the attribute/columns # of the database. These include: comment and original aspects, # the element name and class in the original source # and the attribute name and column name in the database extra_info: dict = row[attribute_extra_info_column_name] for key, value in extra_info.items(): if key != "the_value": # here we determine the name of columns # with extra information about the value of # the kleio attribute (group attribute or descendants) # normally this would be comments or original wording # of the_type or the_date. # we need to use the name of the ORM attribute # "type" or "date" instead of the column name # and this is stored in "entity_attr_name" in # the extra_info dict under the key for the column name. attr_name = value.get("entity_attr_name", key) xtra_col_name = f"{column_name}.{attr_name}" else: xtra_col_name = column_name original = value.get("original", None) if original is not None: org_xtra_col_name = f"{xtra_col_name}.original" extra_info_edits.append((row_number, org_xtra_col_name, original)) comment = value.get("comment", None) if comment is not None: cmt_xtra_col_name = f"{xtra_col_name}.comment" extra_info_edits.append((row_number, cmt_xtra_col_name, comment)) for row_number, xtra_col_name, avalue in extra_info_edits: if xtra_col_name not in df.columns: df[xtra_col_name] = None df.iat[row_number, df.columns.get_loc(xtra_col_name)] = avalue if filter_by is not None: fb_ids = filtered_df.index.unique() attr_ids = df.index.unique() missing = list(set(fb_ids) - set(attr_ids)) if len(missing) > 0: missing_df = filtered_df.loc[missing] df = pd.concat([df, missing_df]) if df.iloc[0].count() == 0: return None # nothing found we if more_attributes is None: more_columns = [] else: more_columns = more_attributes if len(more_columns) > 0: for mcol in more_columns: column_name = mcol date_column_name = f"{column_name}.date" obs_column_name = f"{column_name}.obs" extra_info_column_name = f"{column_name}.extra_info" stmt = select( attr.c.entity.label("id"), attr.c.the_value.label(column_name), attr.c.the_date.label(date_column_name), attr.c.aobs.label(obs_column_name), attr.c.a_extra_info.label(extra_info_column_name), ).where(attr.c.the_type == mcol ).where(attr.c.entity.in_(df.index)) # col_names = stmt.columns.keys() with mysession as session: records = session.execute(stmt) col_names = stmt.selected_columns.keys() df2 = pd.DataFrame.from_records( records, index=["id"], columns=col_names ) if df2.iloc[0].count() > 0: extra_info_edits = [] for row_number, (_, row) in enumerate(df2.iterrows()): if row[extra_info_column_name] is not None: extra_info: dict = row[extra_info_column_name] for key, value in extra_info.items(): if key != "the_value": attr_name = value.get("entity_attr_name", key) xtra_col_name2 = f"{column_name}.{attr_name}" else: xtra_col_name2 = column_name original = value.get("original", None) if original is not None: org_xtra_col_name2 = f"{xtra_col_name2}.original" extra_info_edits.append((row_number, org_xtra_col_name2, original)) comment = value.get("comment", None) if comment is not None: cmt_xtra_col_name2 = f"{xtra_col_name2}.comment" extra_info_edits.append((row_number, cmt_xtra_col_name2, comment)) for row_number, xtra_col_name2, avalue in extra_info_edits: if xtra_col_name2 not in df2.columns: df2[xtra_col_name2] = None df2.iat[row_number, df2.columns.get_loc(xtra_col_name2)] = avalue if sql_echo: print(f"Query for more_attributes={mcol}:\n", stmt) if df2.iloc[0].count() == 0: df[mcol] = None # nothing found we set the column to nulls else: df = df.join(df2) return df