Package rollup_cascade
Defines the rollup_cascade subpackage for managing rollup and cascading operations when cleaning raw extracted tabular data.
Usage
>>> import rollup_cascade as rc
>>> rollup_cascade_manager = rc.RollupCascadeManager(
... rc.RollupCascadeSpec(
... rc.RollupColumn(
... column="A", when_not_empty=["C"], default_value="--", post_cascade=False
... ),
... rc.CascadeColumn(column="C", when_not_empty=["B"]),
... tables=["table1", "table2"],
... ),
... rc.RollupCascadeSpec(
... rc.RollupColumn(column="D", when_not_empty=["C"], default_value="?"),
... rc.CascadeColumn(column="E", when_not_empty=["F"]),
... tables=["table2", "table3"],
... ),
... )
>>> table_instance = {
... "a": ["", "", "a2", "a3", "", ""],
... "b": ["b1", "b2", "b3", "b4", "", ""],
... "c": ["c1", "", "", "", "", ""],
... "d": ["d1", "", "", "d2", "d3", "d4"],
... }
>>> table_name = "table2"
>>> cleaned = rollup_cascade_manager.rollup_cascade_rollup(table_name, table_instance)
>>> cleaned
{'a': ['--', '', 'a2', 'a3'],
'b': ['b1', 'b2', 'b3', 'b4'],
'c': ['c1', 'c1', 'c1', 'c1'],
'd': ['d1', '?', '?', 'd2 d3 d4']}
Sub-modules
rollup_cascade.column-
define CascadeColumn (base) and RollupColumn (derived) column classes
rollup_cascade.manager-
define RollupCascadeManager class as top level manager for all cascade/rollup operations for all tables across a single section. See …
rollup_cascade.spec-
Define RollupCascadeSpec for associating a group of cascade/rollup operations with specific table(s)
Classes
class CascadeColumn (*, column: str, when_not_empty: collections.abc.Sequence[str] = (), default_value: str = '', strict: bool = False, custom_trigger: collections.abc.Callable[[numpy.ndarray, numpy.ndarray, collections.abc.Iterable[str]], bool] | None = None)-
Specify a column whose value should be cascaded to subsequent rows when one or more reference columns has a value.
KwArgs
column:str- the column whose value should be cascaded
when_not_empty:Sequence[str]- the columns that trigger the cascade when not empty. if this setting itself is empty, the cascade is triggered for ALL rows. Default is ().
default_value:str- the value to cascade when no value has been captured for the cascade column. Default is "".
strict:bool- if True, ALL when_not_empty columns must be populated to trigger a cascade. Default is False.
custom_trigger:Callable | None- a callable accepting the current row value set, the next row value set, and an iterable of all column names
Subclasses
Class variables
var column : strvar custom_trigger : collections.abc.Callable[[numpy.ndarray, numpy.ndarray, collections.abc.Iterable[str]], bool] | Nonevar default_value : strvar strict : boolvar when_not_empty : collections.abc.Sequence[str]
Instance variables
prop check_when_not_empty : collections.abc.Callable[[numpy.ndarray], bool]-
Retuns the callable for testing whether or not a row meets the when_not_empty constraint.
Methods
def cascade(self, table: dict[str, list[str]]) ‑> dict[str, list[str]]-
Cascade values in the table for self.column per defined custom_trigger or when_not_empty column status.
Args
table- dict of lists of strings representing table rows
Returns
dict[str, list[str]]- table after cascading self.column.
def cascade_it(self, row_a: numpy.ndarray, row_b: numpy.ndarray, table: dict[str, list[str]]) ‑> bool-
Returns True if value from row_a should be cascaded to row_b.
If self.custom_trigger is defined, call it with the supplied parameters and return the result. Otherwise, return self.set_it(row_b, table).
Args
row_a:np.ndarray- the last row with a good value.
row_b:np.ndarray- the target of the cascade, if triggered
table- dict of lists of strings representing table rows
Returns
bool- True if the last good value should be cascaded to the next row
def column_value(self, row: numpy.ndarray, table: dict[str, list[str]]) ‑> str-
Retrieves the value of self.column from the supplied row.
Args
row:np.ndarray- the row to check
table- dict of lists of strings representing table rows
Returns
str- the value of self.column in the supplied row.
def header_check(self, table: dict[str, list[str]]) ‑> bool-
Check the columns in self (column + when_not_empty) against those in the header of a real table to determine if this operation is relevant given the columns available.
Args
table- dict of lists of strings representing table rows
Returns
bool- indicating whether this Cascade/RollupColumn is applicable given the columns available in the table instance.
def meets_when_not_empty(self, row: numpy.ndarray, table: dict[str, list[str]]) ‑> bool-
Tests the when_not_empty constraint on this row of values.
Args
row:np.ndarray- the row to check
table- dict of lists of strings representing table rows
Returns
bool- True if when_not_empty isn't defined, strict==False and a value is found at the position of any when_not_empty column, or strict is True and a value is present at the positions of ALL when_not_empty columns.
def set_column(self, value: str, row: numpy.ndarray, table: dict[str, list[str]]) ‑> numpy.ndarray-
Set the value of self.column in the supplied row to the supplied value.
Args
value:str- the new value for self.column
row:np.ndarray- the row to check
table- dict of lists of strings representing table rows
Returns
np.ndarray- the updated row
def set_it(self, row: numpy.ndarray, table: dict[str, list[str]]) ‑> bool-
Returns True if column is empty but the when_not_empty check is True. Used for defaulting and cascading.
Args
row:np.ndarray- the row to check
table- dict of lists of strings representing table rows
Returns
bool- True if this row's value should be set to the default.
class RollupCascadeManager (*args: RollupCascadeSpec, collapse_all: bool = True)-
Container for the set of rollup and cascade settings for all of the tables in a given section. rollup_casc_ref_instance[table_name] returns the combined rollup and cascade settings from all specs in which the specified table appears.
Args
*args:tuple[RollupCascadeSpec, …]- RollupCascadeSpec instances shared between a common set of tables (i.e. those of a section)
KwArgs
collapse_all:bool- If True, all tables will have a default rollup
operation that rolls up any row with more blanks than the previous
row. Default is True. Replaces legacy
table_utils.collapse_dict()functionality.
Methods
def cascade(self, table_name: str, table: dict[str, list[str]]) ‑> dict[str, list[str]]-
Perform all cascading operations defined for the supplied table_name.
Args
table_name- name of the table currently processing
table- dict of lists of strings representing table rows
Returns
dict[str, list[str]]- dict of lists of strings representing table rows with values cascaded as specified in cascade_columns
def find_cascades(self, table_name: str, table: dict[str, list[str]]) ‑> list[CascadeColumn]-
Finds the CascadeColumns defined for this table_name and filters them according to the column names in header. See self._column_check().
Args
table_name- name of the table currently processing
table- dict of lists of strings representing table rows
Returns
list[CascadeColumn]- the cascades to perform for this table instance.
def find_rollups(self, table_name: str, pre_cascade: bool, table: dict[str, list[str]]) ‑> list[RollupColumn]-
Finds the RollupColumns defined for this table_name and filters them according to the column names in header (see self._column_check()) and whether or not cascade operations have already occurred.
Args
table_name- name of the table currently processing
pre_cascade- bool indicating whether cascades have occurred.
table- dict of lists of strings representing table rows
Returns
list[CascadeColumn]- the rollups to perform for this table instance.
def rollup(self, table_name: str, table: dict[str, list[str]], pre_cascade: bool = True) ‑> dict[str, list[str]]-
Perform all rollup operations defined for the supplied table_name and cascade status (pre or post).
Args
table_name- name of the table currently processing
table- dict of lists of strings representing table rows
pre_cascade:bool- indicates whether this operation is ocurring before or after its sister operation cascade_column_values()
Returns
dict[str, list[str]]- dict of lists of strings representing table rows with values cascaded as specified in cascade_columns
def rollup_cascade_rollup(self, table_name: str, table: dict[str, list[str]]) ‑> dict[str, list[str]]-
Performs pre cascade rollup, cascade, and post cascade rollup in series.
Args
table_name- name of the table currently processing
table- dict of lists of strings representing table rows
roll_ref- RollupCascadeManager defining rollup and cascade operations for all tables in this section.
Returns
dict[str, list[str]]- dict of lists of strings representing table rows with values rolled up and cascaded as specified in roll_ref
class RollupCascadeSpec (*args: CascadeColumn | RollupColumn, tables: collections.abc.Sequence[str] = ())-
Settings for the rollup_column_values and cascade_column_values functions.
Args
tables:Sequence[str]- the names of the tables to which the settings should be applied. If empty, settings are applied to ALL tables.
column_specs:list[CascadeColumn]- list of CascadeColumn or RollupColumn instances (note that RollupColumn inherts from CascadeColumn) to associated with the listed tables or all tables if tables is empty.
Instance variables
prop cascade_columns : list[CascadeColumn]-
Settings for columns whose values should be cascaded to subsequent rows when one or more reference columns has a value.
prop rollup_columns : list[RollupColumn]-
Settings for columns whose values should be cascaded to subsequent rows when one or more reference columns has a value.
class RollupColumn (*, column: str, when_not_empty: collections.abc.Sequence[str] = (), default_value: str = '', strict: bool = False, custom_trigger: collections.abc.Callable[[numpy.ndarray, numpy.ndarray, collections.abc.Iterable[str]], bool] | None = None, pre_cascade: bool = True, post_cascade: bool = True, join_function: collections.abc.Callable[[tuple[str, str]], str] = <built-in method join of str object>)-
Specify a column whose values should be rolled up into the first row in which one or more reference columns is not empty.
KwArgs
column:str- the column whose value should be rolled up
when_not_empty:Sequence[str]- the columns that trigger the rollup when not empty.
default_value:str- the value to use when no value is present in the rollup column. Default is "".
strict:bool- if True, ALL when_not_empty columns must be populated to trigger a rollup. Default is False.
pre_cascade:bool- if True, the rollup is performed before column values have been cascaded. Default is True.
post_cascade:bool- if True, the rollup is performed after column values have been cascaded. Default is True.
join_function:Callable[[tuple[str, str]], str]- method to use to join the values when a rollup is triggered. Defaults to " ".join.
Ancestors
Class variables
var post_cascade : boolvar pre_cascade : bool
Methods
def join_function(iterable, /) ‑> collections.abc.Callable[[tuple[str, str]], str]-
Concatenate any number of strings.
The string whose method is called is inserted in between each given string. The result is returned as a new string.
Example: '.'.join(['ab', 'pq', 'rs']) -> 'ab.pq.rs'
def roll_it(self, row_a: numpy.ndarray, row_b: numpy.ndarray, table: dict[str, list[str]]) ‑> bool-
Check two subsequent table rows to see if row_b should be rolled into row_a.
Triggers rolling behavior when: - row_a meets our when_not_empty constraint, and - row_b does NOT meet our when_not_empty constraint, and - row_a has a value in the target column, and - row_b has a value in the target column, ** OR ** - only the target column is populated in both this and next row
Args
row_a:np.ndarray- the first row to check
row_b:np.ndarray- the second row to check
table- dict of lists of strings representing table rows
Returns
bool- True if row_b should be rolled into row_a
def rollup(self, table: dict[str, list[str]]) ‑> dict[str, list[str]]-
Roll up values in the table for self.column per defined custom_trigger or when_not_empty column status.
Args
table- dict of lists of strings representing table rows
Returns
dict[str, list[str]]- table after rolling up self.column.
Inherited members