Package integrators

initialize integrators module

Sub-modules

integrators.docuvision_integrator

Handle API calls to DocuVision.

integrators.docuvision_task

represents a single request to the docuvision api

integrators.docuvision_utils

Utility and helper functions for docuvision_integrator.py

integrators.provider_integrator

implement provider lookups from both local tables and public API

Classes

class DocuVisionIntegrator (documents: dict[str, utilities.library_utils.PDFLibProto] = <factory>, page_map: dict[str, dict[str, list[int]]] = <factory>, split_by_pid: bool = False, min_confidence: float = 0.1, api_timeout: int = 5400, out_dir: str | None = None, aws_region: str = 'us-east-1', api_secret_name: str = '', base_url: str = '', base_path: str = '', api_key: str = '', service: str = 'docuvision-1', model: str = 'base-medrec-anesthesia', jp_query: str = 'sort_by(\n [].{label:dotmap,value:text_ocr,confidence:confidence},\n &confidence\n )', mock_ids: list[int] = <factory>, fail_on_error: bool = False, default_dos: str = '2024-08-27', table_converters: dict[str, collections.abc.Callable[[list[str]], list[dict[str, str]]]] = <factory>, dv_preferred_networks: list[str] | None = None, dv_required_page_types: set[str] | None = None, *, result_formatter: collections.abc.Callable[..., tuple[str, dict[str, typing.Any]]] | None = None, response_reducer: collections.abc.Callable[..., list[dict[str, typing.Any]]] | None = None)

Post a batch of PDFs to docuvision API, poll for results, and preprocess raw output to clean up and reformat results into form expected by downstream processes.

If values are supplied for base_url, base_path, and api_key, the supplied values will override the corresponding values in the response from the boto3 secretsmanager client when api_secret_name is requested. If all 3 are supplied, no call to boto3 will occur at all allowing for "sans-AWS" use cases.

Basic field validation occurs on first call to post_tasks (or manual call to check_api_info which tests with _post_location()). Instancing process can either pass a "documents" dict in during init OR manually populate the "documents" attribute with entries of filename:bytes for each pdf to be processed. When all documents are loaded, instancing process should call the .post_tasks() method.

All documents will be posted and tasks submitted upon first call of the post_tasks() function.

All tasks will be polled and responses collected upon first access of the results property. Subsequent calls to results will return previously stored results.

Args

documents : dict[str, lu.PDFLibProto]
dict of {filename: PDFLibProto} where PDFLibProto is a namedtuple of (body, meta) where body is a bytes object and meta is a dict of metadata for the pdf.
page_map : dict[str, dict[str, list[int]]]
dict of {new_doc_id: {old_doc_id: [page_nums]}} where new_doc_id is the doc_id for the combined pdf created by docuvision and old_doc_id is the doc_id for the original pdf. page_nums is a list of page numbers from the original pdf that were included in the combined pdf.
split_by_pid : bool
if True, docuvision will split each pdf into separate documents based on patient id. If False, docuvision will combine all pdfs into a single document.
min_confidence : float
minimum confidence required for a result to be included in the final output. All results with confidence below this value will be dropped.
api_timeout : int
number of seconds to wait for a response from the docuvision API before raising an error.
out_dir : str
path to directory where output json files will be written. If None, no files will be written.
aws_region : str
AWS region to use for boto3 calls.
api_secret_name : str
name of secret in AWS secretsmanager containing the base_url, base_path, and api_key values for the docuvision API.
base_url : str
base url for docuvision API. If None, will be pulled from AWS secretsmanager.
base_path : str
base path for docuvision API. If None, will be pulled from AWS secretsmanager.
api_key : str
api key for docuvision API. If None, will be pulled from AWS secretsmanager.
service : str
docuvision service to use. Defaults to "docuvision-1".
model : str
docuvision model to use. Defaults to "base-medrec-anesthesia".
jp_query : str
jmespath query to use for result formatting. Defaults to: '''sort_by( [].{label:dotmap,value:text_ocr,confidence:confidence}, &confidence )'''
mock_ids : list[int]
list of task_ids to use for mock responses. Defaults to [].
fail_on_error : bool
if True, raise an error if any task fails to post or any response fails to be collected. Defaults to False.
default_dos : str
default date of service to use if no date of service is extracted from the facesheet. Defaults to gvars.DEFAULT_DOS.
result_formatter : Callable[…, tuple[str, dict[str, Any]]]
function to use for reformatting raw results. Defaults to self._format_result.
response_reducer : Callable[…, list[dict[str, Any]]]
function to use for reducing raw results to final output. Defaults to self._condense_response.
dv_preferred_networks : list[str] | None
list of preferred Docuvision Neural Networks. Created for facilities where people manually upload 1-page PDFs
table_converters : dict[str, Callable[[list[str]], list[dict[str, str]]]]
function reference for processing '*Table' labels returned by DV-1.
dv_required_page_types : set[str]
if supplied, a case will only be created for a pid if at least one of the pages assigned to that pid have a type in this set.

Ancestors

  • utilities.protocols.DocuVisionIntegratorProtocol
  • typing.Protocol
  • typing.Generic

Class variables

var api_key : str
var api_secret_name : str
var api_timeout : int
var aws_region : str
var base_path : str
var base_url : str
var default_dos : str
var documents : dict[str, utilities.library_utils.PDFLibProto]
var dv_preferred_networks : list[str] | None
var dv_required_page_types : set[str] | None
var fail_on_error : bool
var jp_query : str
var min_confidence : float
var mock_ids : list[int]
var model : str
var out_dir : str | None
var page_map : dict[str, dict[str, list[int]]]
var response_reducer : collections.abc.Callable[..., list[dict[str, typing.Any]]] | None
var result_formatter : collections.abc.Callable[..., tuple[str, dict[str, typing.Any]]] | None
var service : str
var split_by_pid : bool
var table_converters : dict[str, collections.abc.Callable[[list[str]], list[dict[str, str]]]]

Instance variables

prop doc_ex

Exception to utilize if an entire document fails to extract.

Controlled by self.fail_on_error. If True, raise an exception that will halt the entire process. If False, raise a ValueError that will be caught and logged by the LogExHandler decorator.

Returns

LogExOverrideError | ValueError
exception to raise
prop pdf_library : dict[str, utilities.library_utils.PDFLibEntry]

Dict of {case_doc_id: PDFLibEntry} where each entry represents the combined_pdf of a DocuVisionCase created by a child DocuVisionTask instance.

Merged into S3Batch.pdf_library in aws_s3_batch.py for file matching and other downstream processes.

prop pdfs_by_doc_id : dict[str, bytes]

Returns a dict of pdf bytes split and/or concatenated by task pids.

prop results : dict[str, list[dict[str, str]]]

Process responses from all child tasks to produce results according to the current instance settings.

Methods

def check_api_info(self)

Test to see if api url, path, and key values have been provided and attempt to pull from AWS Secrets Manager via self.api_secret_name if not.

Raises

ValueError
if api_key, base_url, or base_path cannot be determined.
def create_tasks(self, documents: dict[str, utilities.library_utils.PDFLibProto] | None = None)

Obtain upload location and post PDFs. If mock_ids were defined, create mock tasks for each id and collect the existing respones.

Args

documents : dict[str, lu.PDFLibProto]
dict of {filename: PDFLibProto} where PDFLibProto is a namedtuple of (body, meta) where body is a bytes object and meta is a dict of metadata for the pdf. Optional. Extends self.documents if supplied.
def job_dict_entries(self, extracted_data: dict[str, dict[str, typing.Any]]) ‑> dict[str, dict[str, typing.Any]]

Dict of {job_id: job_dict} for all tasks in self._tasks where each job_dict contains values for db columns 'input', 'comments', and 'note'.

Called by aws_s3_batch.py to recombine the values for the columns noted above with their corresponding output from table_transformer.py.

Args

extracted_data : dict[str, dict[str, Any]]
TableTransformer output data supplied from aws_s3_batch.S3Batch.transformed.

Returns

dict[str, dict[str, Any]]
dict of {job_id: job_dict} for all tasks in self._tasks.
def reset(self, **kwargs)

Reset the dataclass to prepare for a new facility by clearing all documents, tasks, results, and internal variables.

KwArgs

documents : dict[str, lu.PDFLibProto]
dict of {filename: PDFLibProto} where PDFLibProto is a namedtuple of (body, meta) where body is a bytes object and meta is a dict of metadata for the pdf.
page_map : dict[str, dict[str, list[int]]]
dict of {new_doc_id: {old_doc_id: [page_nums]}} where new_doc_id is the doc_id for the combined pdf created by docuvision and old_doc_id is the doc_id for the original pdf. page_nums is a list of page numbers from the original pdf that were included in the combined pdf.
out_dir : str
path to directory where output json files will be written. If None, no files will be written.
split_by_pid : bool
if True, docuvision will split each pdf into separate documents based on patient id. If False, docuvision will combine all pdfs into a single document.
fail_on_error : bool
if True, raise an error if any task fails to post or any response fails to be collected. Defaults to False.
mock_ids : list[int]
list of task_ids to use for mock responses. Defaults to [].
default_dos : str
default date of service to use if no date of service is extracted from the facesheet. Defaults to gvars.DEFAULT_DOS.
api_secret_name : str
name of secret in AWS secretsmanager containing the base_url, base_path, and api_key values for the docuvision API.
dv_preferred_networks : list[str] | None
list of preferred Docuvision Neural Networks. Created for facilities where people manually upload 1-page PDFs
table_converters : dict[str, Callable[[list[str]], list[dict[str, str]]]]
function reference for processing '*Table' labels returned by DV-1.
dv_required_page_types : set[str]
if supplied, a case will only be created for a pid if at least one of the pages assigned to that pid have a type in this set.
def tables_for(self, doc_id: str, section: str = 'DocuVision', sep: str = '.') ‑> dict[str, list[dict[str, str]]]

Get results for the supplied doc_id in a tabular format suitable for downstream processing in table_transformer.py.

Args

doc_id : str
doc_id for the document to retrieve results for.
section : str
section name to use for the table. Defaults to "DocuVision".
sep : str
separator to use for table keys. Defaults to ".".

Returns

dict[str, list[dict[str, str]]]
dict of {table_name: [table_rows]} where each table_row is a dict of {label: value}.
def task_attr_list(self, attr: str) ‑> list

List the specified attribute for all tasks in self._tasks.

class DocuVisionTask (pdf_entry: lu.PDFLibProto, api_url: str, api_key: str, split_by_pid: bool = False, service: str = 'docuvision-1', model: str = 'base-medrec-anesthesia', min_confidence: float = 0.1, task_id: int = 0, location: PostTaskLocation = <factory>, document: PostTaskDocument = <factory>, response: du.GetTaskResponse = <factory>, pdf_text_pages: list[str] = <factory>, children: list[DocuVisionCase] = <factory>, fail_on_error: bool = False, mock: bool = False, default_dos: str = '2024-08-27', required_page_types: set[str] | None = None)

Create and process a single docuvision request / response.

Attributes

pdf_entry
The PDFLibEntry object for the pdf we're processing.
api_url
The URL of the DocuVision API.
api_key
The API key for the DocuVision API.
split_by_pid
If True, split the PDF into individual child cases.
service
The DocuVision service to use.
model
The DocuVision model to use.
min_confidence
The minimum confidence level for the DocuVision response.
task_id
The DocuVision task ID.
location
The upload location for the PDF.
document
The document object for the DocuVision API request.
response
The DocuVision API response.
pdf_text_pages
The text of each page of the original PDF.
children
The child cases.
fail_on_error
If True, raise an error if the DocuVision API returns an error.
mock
If True, use the mock API.
created_db_job
If True, a placeholder job was created in the database.
default_dos
the DOS to use for the placeholder if no DateEncounter labels are returned.
required_page_types
if supplied, cases will be dropped unless they have at least one page whose type appears in this set.

Class variables

var api_key : str
var api_url : str
var children : list[DocuVisionCase]
var created_db_job : bool
var default_dos : str
var documentPostTaskDocument
var fail_on_error : bool
var locationPostTaskLocation
var min_confidence : float
var mock : bool
var model : str
var pdf_entry : utilities.library_utils.PDFLibProto
var pdf_text_pages : list[str]
var required_page_types : set[str] | None
var responseGetTaskResponse
var service : str
var split_by_pid : bool
var task_id : int

Instance variables

prop child_job_entries : dict[str, dict[str, Any]]

get a list of updated job representations to merge with other extracted data prior to pushing to DB/disk

prop db_job : dict[str, Any]

Find or create a placeholder case for this task in the database.

prop doc_id : str

bytes representing the original source PDF

prop file_id

The filename from the original document ID

prop fname_template

a template string for producing child pdf doc_ids. supply note_type and pid when calling format.

prop job_entries

aggregated placeholder and child job entries

prop note_type

The noteType property value assigned to the "Original" input entity and the Complete Record case_pdf of all child cases.

If the original filename followed the UI file naming convention (_.pdf), use the noteType indicated in the filename. Otherwise, assign "Scanned".

prop original_doc_id_query : str

Return a jmespath query for searching an input entities array for references to the s3 key where the source pdf was originally uploaded.

prop page_map : dict[str, dict[str, list[int]]]

The collection of page map entries for all child cases.

Used when splitting by PID to segment the raw PDF and response into individual child cases.

prop pdf : bytes

bytes representing the original source PDF

prop pdf_library : dict[str, lu.PDFLibEntry]

A standard S3Batch pdf_library containing the PDFs of all child cases

prop pdf_reader : pu.PdfReader

bytes representing the original source PDF

prop pdfs_out : dict[str, bytes]

A dict of {document_id: bytes} for all child cases. Used to upload PDFs to S3.

prop placeholder : DocuVisionCase

The child to which the "Original" and "Summary" input entities are posted.

prop placeholder_scan_idx : int

next scan index to use when populating noteHeader

prop prefix

The S3 prefix of the original document ID

prop splitting : bool

True if splitting by PID or page map

Methods

def create_children(self, page_map: dict[str, dict[str, list[int]]] | None = None)

build pagemap for document splitting based on cacluated 'pid' in API response

def get(self) ‑> bool

Poll the API for the response.

If this is a mock task, find the original PDF object and download it from S3.

If the response indicates success: - Collect the text from each page of the raw PDF into self.pdf_text_pages - Verify PIDs cascaded to pid_found==False pages via extracted text. - Convert the response to a GetTaskResponse dataclass for downstream processing.

def hdr_for(self, input_id: str, is_placeholder: bool)

Get the noteHeader and noteType input entity properties for the provided input_id.

Args

input_id
The document ID of the input entity.
is_placeholder
If True, this is the placeholder case.

Returns

A dict defining the noteType and noteHeader input entity properties.

def post(self)

Obtain an upload location, upload the PDF, and post the task to the API.

def update_db_job_inputs(self, docvis_summary: list[dict[str, str]])

extend current db_job with entities from this task

class ProviderIntegrator (public_only: bool = False, api_url: str = 'https://clinicaltables.nlm.nih.gov/api/npi_idv/v3/search', url_params: ProviderAPIParams = <factory>)

performs lookups of extracted provider information against local DB tables with fallback to the public API maintained by the National Library of Medicine at https://clinicaltables.nlm.nih.gov/. also updates "lastOccurrence" info in local DB tables after a successful match.

Ancestors

  • utilities.protocols.ProviderIntegratorProtocol
  • typing.Protocol
  • typing.Generic

Class variables

var api_url : str
var full_name : utilities.v_str.vStr
var is_anes_provider : bool
var mode : str | None
var npi : str | utilities.v_str.vStr
var public_only : bool
var url_paramsProviderAPIParams

Static methods

def cache_check(*, provider: ProviderIntegrator | ProviderRecord, facility: str = 'Initializing', update_only: bool = False) ‑> list[ProviderRecord]

check provider cache before search public API / DB and update cache if NPI has been found.

KwArgs

provider : ProviderIntegrator
provider integrator instance containing
search criteria.
facility : str
facility name, resets the cache if changed
update_only : bool
initialize cache value without "found in cache" log

Returns

list[ProviderRecord]
list containing a single provider record if

found in cache. list emulates return value of other search functions.

Instance variables

prop last_api_response : None | dict[str, Any]

last public API query response

prop last_url_params : dict[str, Any]

last public API query parameters

prop query_name

name of last query function

Methods

def next_query(self) ‑> Iterator[list[ProviderRecord] | list[ProviderAPIResult]]

generate all valid local and query parameterizations in order of priority for successive db searches

def search(self, is_anes_provider: bool, full_name: str | vStr, npi: str | vStr = '', mode: str | None = None) ‑> tuple[utilities.v_str.vStr, utilities.v_str.vStr]

progressively search local DB and public API with supplied provider data and return (a) fully populated vStr objects for the provider name and NPI upon a successful lookup or (b) an 'original value only' vStr object for the provider name and a "null" vStr upon failure.