Package integrators
initialize integrators module
Sub-modules
integrators.docuvision_integrator
-
Handle API calls to DocuVision.
integrators.docuvision_task
-
represents a single request to the docuvision api
integrators.docuvision_utils
-
Utility and helper functions for docuvision_integrator.py
integrators.provider_integrator
-
implement provider lookups from both local tables and public API
Classes
class DocuVisionIntegrator (documents: dict[str, utilities.library_utils.PDFLibProto] = <factory>, page_map: dict[str, dict[str, list[int]]] = <factory>, split_by_pid: bool = False, min_confidence: float = 0.1, api_timeout: int = 5400, out_dir: str | None = None, aws_region: str = 'us-east-1', api_secret_name: str = '', base_url: str = '', base_path: str = '', api_key: str = '', service: str = 'docuvision-1', model: str = 'base-medrec-anesthesia', jp_query: str = 'sort_by(\n [].{label:dotmap,value:text_ocr,confidence:confidence},\n &confidence\n )', mock_ids: list[int] = <factory>, fail_on_error: bool = False, default_dos: str = '2024-08-27', table_converters: dict[str, collections.abc.Callable[[list[str]], list[dict[str, str]]]] = <factory>, dv_preferred_networks: list[str] | None = None, dv_required_page_types: set[str] | None = None, *, result_formatter: collections.abc.Callable[..., tuple[str, dict[str, typing.Any]]] | None = None, response_reducer: collections.abc.Callable[..., list[dict[str, typing.Any]]] | None = None)
-
Post a batch of PDFs to docuvision API, poll for results, and preprocess raw output to clean up and reformat results into form expected by downstream processes.
If values are supplied for base_url, base_path, and api_key, the supplied values will override the corresponding values in the response from the boto3 secretsmanager client when api_secret_name is requested. If all 3 are supplied, no call to boto3 will occur at all allowing for "sans-AWS" use cases.
Basic field validation occurs on first call to post_tasks (or manual call to check_api_info which tests with _post_location()). Instancing process can either pass a "documents" dict in during init OR manually populate the "documents" attribute with entries of filename:bytes for each pdf to be processed. When all documents are loaded, instancing process should call the .post_tasks() method.
All documents will be posted and tasks submitted upon first call of the post_tasks() function.
All tasks will be polled and responses collected upon first access of the results property. Subsequent calls to results will return previously stored results.
Args
documents
:dict[str, lu.PDFLibProto]
- dict of {filename: PDFLibProto} where PDFLibProto is a namedtuple of (body, meta) where body is a bytes object and meta is a dict of metadata for the pdf.
page_map
:dict[str, dict[str, list[int]]]
- dict of {new_doc_id: {old_doc_id: [page_nums]}} where new_doc_id is the doc_id for the combined pdf created by docuvision and old_doc_id is the doc_id for the original pdf. page_nums is a list of page numbers from the original pdf that were included in the combined pdf.
split_by_pid
:bool
- if True, docuvision will split each pdf into separate documents based on patient id. If False, docuvision will combine all pdfs into a single document.
min_confidence
:float
- minimum confidence required for a result to be included in the final output. All results with confidence below this value will be dropped.
api_timeout
:int
- number of seconds to wait for a response from the docuvision API before raising an error.
out_dir
:str
- path to directory where output json files will be written. If None, no files will be written.
aws_region
:str
- AWS region to use for boto3 calls.
api_secret_name
:str
- name of secret in AWS secretsmanager containing the base_url, base_path, and api_key values for the docuvision API.
base_url
:str
- base url for docuvision API. If None, will be pulled from AWS secretsmanager.
base_path
:str
- base path for docuvision API. If None, will be pulled from AWS secretsmanager.
api_key
:str
- api key for docuvision API. If None, will be pulled from AWS secretsmanager.
service
:str
- docuvision service to use. Defaults to "docuvision-1".
model
:str
- docuvision model to use. Defaults to "base-medrec-anesthesia".
jp_query
:str
- jmespath query to use for result formatting. Defaults to: '''sort_by( [].{label:dotmap,value:text_ocr,confidence:confidence}, &confidence )'''
mock_ids
:list[int]
- list of task_ids to use for mock responses. Defaults to [].
fail_on_error
:bool
- if True, raise an error if any task fails to post or any response fails to be collected. Defaults to False.
default_dos
:str
- default date of service to use if no date of service is extracted from the facesheet. Defaults to gvars.DEFAULT_DOS.
result_formatter
:Callable[…, tuple[str, dict[str, Any]]]
- function to use for reformatting raw results. Defaults to self._format_result.
response_reducer
:Callable[…, list[dict[str, Any]]]
- function to use for reducing raw results to final output. Defaults to self._condense_response.
dv_preferred_networks
:list[str] | None
- list of preferred Docuvision Neural Networks. Created for facilities where people manually upload 1-page PDFs
table_converters
:dict[str, Callable[[list[str]], list[dict[str, str]]]]
- function reference for processing '*Table' labels returned by DV-1.
dv_required_page_types
:set[str]
- if supplied, a case will only be created for a pid if at least one of the pages assigned to that pid have a type in this set.
Ancestors
- utilities.protocols.DocuVisionIntegratorProtocol
- typing.Protocol
- typing.Generic
Class variables
var api_key : str
var api_secret_name : str
var api_timeout : int
var aws_region : str
var base_path : str
var base_url : str
var default_dos : str
var documents : dict[str, utilities.library_utils.PDFLibProto]
var dv_preferred_networks : list[str] | None
var dv_required_page_types : set[str] | None
var fail_on_error : bool
var jp_query : str
var min_confidence : float
var mock_ids : list[int]
var model : str
var out_dir : str | None
var page_map : dict[str, dict[str, list[int]]]
var response_reducer : collections.abc.Callable[..., list[dict[str, typing.Any]]] | None
var result_formatter : collections.abc.Callable[..., tuple[str, dict[str, typing.Any]]] | None
var service : str
var split_by_pid : bool
var table_converters : dict[str, collections.abc.Callable[[list[str]], list[dict[str, str]]]]
Instance variables
prop doc_ex
-
Exception to utilize if an entire document fails to extract.
Controlled by self.fail_on_error. If True, raise an exception that will halt the entire process. If False, raise a ValueError that will be caught and logged by the LogExHandler decorator.
Returns
LogExOverrideError | ValueError
- exception to raise
prop pdf_library : dict[str, utilities.library_utils.PDFLibEntry]
-
Dict of {case_doc_id: PDFLibEntry} where each entry represents the combined_pdf of a DocuVisionCase created by a child DocuVisionTask instance.
Merged into S3Batch.pdf_library in aws_s3_batch.py for file matching and other downstream processes.
prop pdfs_by_doc_id : dict[str, bytes]
-
Returns a dict of pdf bytes split and/or concatenated by task pids.
prop results : dict[str, list[dict[str, str]]]
-
Process responses from all child tasks to produce results according to the current instance settings.
Methods
def check_api_info(self)
-
Test to see if api url, path, and key values have been provided and attempt to pull from AWS Secrets Manager via self.api_secret_name if not.
Raises
ValueError
- if api_key, base_url, or base_path cannot be determined.
def create_tasks(self, documents: dict[str, utilities.library_utils.PDFLibProto] | None = None)
-
Obtain upload location and post PDFs. If mock_ids were defined, create mock tasks for each id and collect the existing respones.
Args
documents
:dict[str, lu.PDFLibProto]
- dict of {filename: PDFLibProto} where PDFLibProto is a namedtuple of (body, meta) where body is a bytes object and meta is a dict of metadata for the pdf. Optional. Extends self.documents if supplied.
def job_dict_entries(self, extracted_data: dict[str, dict[str, typing.Any]]) ‑> dict[str, dict[str, typing.Any]]
-
Dict of {job_id: job_dict} for all tasks in self._tasks where each job_dict contains values for db columns 'input', 'comments', and 'note'.
Called by aws_s3_batch.py to recombine the values for the columns noted above with their corresponding output from table_transformer.py.
Args
extracted_data
:dict[str, dict[str, Any]]
- TableTransformer output data supplied from aws_s3_batch.S3Batch.transformed.
Returns
dict[str, dict[str, Any]]
- dict of {job_id: job_dict} for all tasks in self._tasks.
def reset(self, **kwargs)
-
Reset the dataclass to prepare for a new facility by clearing all documents, tasks, results, and internal variables.
KwArgs
documents
:dict[str, lu.PDFLibProto]
- dict of {filename: PDFLibProto} where PDFLibProto is a namedtuple of (body, meta) where body is a bytes object and meta is a dict of metadata for the pdf.
page_map
:dict[str, dict[str, list[int]]]
- dict of {new_doc_id: {old_doc_id: [page_nums]}} where new_doc_id is the doc_id for the combined pdf created by docuvision and old_doc_id is the doc_id for the original pdf. page_nums is a list of page numbers from the original pdf that were included in the combined pdf.
out_dir
:str
- path to directory where output json files will be written. If None, no files will be written.
split_by_pid
:bool
- if True, docuvision will split each pdf into separate documents based on patient id. If False, docuvision will combine all pdfs into a single document.
fail_on_error
:bool
- if True, raise an error if any task fails to post or any response fails to be collected. Defaults to False.
mock_ids
:list[int]
- list of task_ids to use for mock responses. Defaults to [].
default_dos
:str
- default date of service to use if no date of service is extracted from the facesheet. Defaults to gvars.DEFAULT_DOS.
api_secret_name
:str
- name of secret in AWS secretsmanager containing the base_url, base_path, and api_key values for the docuvision API.
dv_preferred_networks
:list[str] | None
- list of preferred Docuvision Neural Networks. Created for facilities where people manually upload 1-page PDFs
table_converters
:dict[str, Callable[[list[str]], list[dict[str, str]]]]
- function reference for processing '*Table' labels returned by DV-1.
dv_required_page_types
:set[str]
- if supplied, a case will only be created for a pid if at least one of the pages assigned to that pid have a type in this set.
def tables_for(self, doc_id: str, section: str = 'DocuVision', sep: str = '.') ‑> dict[str, list[dict[str, str]]]
-
Get results for the supplied doc_id in a tabular format suitable for downstream processing in table_transformer.py.
Args
doc_id
:str
- doc_id for the document to retrieve results for.
section
:str
- section name to use for the table. Defaults to "DocuVision".
sep
:str
- separator to use for table keys. Defaults to ".".
Returns
dict[str, list[dict[str, str]]]
- dict of {table_name: [table_rows]} where each table_row is a dict of {label: value}.
def task_attr_list(self, attr: str) ‑> list
-
List the specified attribute for all tasks in self._tasks.
class DocuVisionTask (pdf_entry: lu.PDFLibProto, api_url: str, api_key: str, split_by_pid: bool = False, service: str = 'docuvision-1', model: str = 'base-medrec-anesthesia', min_confidence: float = 0.1, task_id: int = 0, location: PostTaskLocation = <factory>, document: PostTaskDocument = <factory>, response: du.GetTaskResponse = <factory>, pdf_text_pages: list[str] = <factory>, children: list[DocuVisionCase] = <factory>, fail_on_error: bool = False, mock: bool = False, default_dos: str = '2024-08-27', required_page_types: set[str] | None = None)
-
Create and process a single docuvision request / response.
Attributes
pdf_entry
- The PDFLibEntry object for the pdf we're processing.
api_url
- The URL of the DocuVision API.
api_key
- The API key for the DocuVision API.
split_by_pid
- If True, split the PDF into individual child cases.
service
- The DocuVision service to use.
model
- The DocuVision model to use.
min_confidence
- The minimum confidence level for the DocuVision response.
task_id
- The DocuVision task ID.
location
- The upload location for the PDF.
document
- The document object for the DocuVision API request.
response
- The DocuVision API response.
pdf_text_pages
- The text of each page of the original PDF.
children
- The child cases.
fail_on_error
- If True, raise an error if the DocuVision API returns an error.
mock
- If True, use the mock API.
created_db_job
- If True, a placeholder job was created in the database.
default_dos
- the DOS to use for the placeholder if no DateEncounter labels are returned.
required_page_types
- if supplied, cases will be dropped unless they have at least one page whose type appears in this set.
Class variables
var api_key : str
var api_url : str
var children : list[DocuVisionCase]
var created_db_job : bool
var default_dos : str
var document : PostTaskDocument
var fail_on_error : bool
var location : PostTaskLocation
var min_confidence : float
var mock : bool
var model : str
var pdf_entry : utilities.library_utils.PDFLibProto
var pdf_text_pages : list[str]
var required_page_types : set[str] | None
var response : GetTaskResponse
var service : str
var split_by_pid : bool
var task_id : int
Instance variables
prop child_job_entries : dict[str, dict[str, Any]]
-
get a list of updated job representations to merge with other extracted data prior to pushing to DB/disk
prop db_job : dict[str, Any]
-
Find or create a placeholder case for this task in the database.
prop doc_id : str
-
bytes representing the original source PDF
prop file_id
-
The filename from the original document ID
prop fname_template
-
a template string for producing child pdf doc_ids. supply note_type and pid when calling format.
prop job_entries
-
aggregated placeholder and child job entries
prop note_type
-
The noteType property value assigned to the "Original" input entity and the Complete Record case_pdf of all child cases.
If the original filename followed the UI file naming convention (
_ .pdf), use the noteType indicated in the filename. Otherwise, assign "Scanned". prop original_doc_id_query : str
-
Return a jmespath query for searching an input entities array for references to the s3 key where the source pdf was originally uploaded.
prop page_map : dict[str, dict[str, list[int]]]
-
The collection of page map entries for all child cases.
Used when splitting by PID to segment the raw PDF and response into individual child cases.
prop pdf : bytes
-
bytes representing the original source PDF
prop pdf_library : dict[str, lu.PDFLibEntry]
-
A standard S3Batch pdf_library containing the PDFs of all child cases
prop pdf_reader : pu.PdfReader
-
bytes representing the original source PDF
prop pdfs_out : dict[str, bytes]
-
A dict of {document_id: bytes} for all child cases. Used to upload PDFs to S3.
prop placeholder : DocuVisionCase
-
The child to which the "Original" and "Summary" input entities are posted.
prop placeholder_scan_idx : int
-
next scan index to use when populating noteHeader
prop prefix
-
The S3 prefix of the original document ID
prop splitting : bool
-
True if splitting by PID or page map
Methods
def create_children(self, page_map: dict[str, dict[str, list[int]]] | None = None)
-
build pagemap for document splitting based on cacluated 'pid' in API response
def get(self) ‑> bool
-
Poll the API for the response.
If this is a mock task, find the original PDF object and download it from S3.
If the response indicates success: - Collect the text from each page of the raw PDF into self.pdf_text_pages - Verify PIDs cascaded to pid_found==False pages via extracted text. - Convert the response to a GetTaskResponse dataclass for downstream processing.
def hdr_for(self, input_id: str, is_placeholder: bool)
-
Get the noteHeader and noteType input entity properties for the provided input_id.
Args
input_id
- The document ID of the input entity.
is_placeholder
- If True, this is the placeholder case.
Returns
A dict defining the noteType and noteHeader input entity properties.
def post(self)
-
Obtain an upload location, upload the PDF, and post the task to the API.
def update_db_job_inputs(self, docvis_summary: list[dict[str, str]])
-
extend current db_job with entities from this task
class ProviderIntegrator (public_only: bool = False, api_url: str = 'https://clinicaltables.nlm.nih.gov/api/npi_idv/v3/search', url_params: ProviderAPIParams = <factory>)
-
performs lookups of extracted provider information against local DB tables with fallback to the public API maintained by the National Library of Medicine at https://clinicaltables.nlm.nih.gov/. also updates "lastOccurrence" info in local DB tables after a successful match.
Ancestors
- utilities.protocols.ProviderIntegratorProtocol
- typing.Protocol
- typing.Generic
Class variables
var api_url : str
var full_name : utilities.v_str.vStr
var is_anes_provider : bool
var mode : str | None
var npi : str | utilities.v_str.vStr
var public_only : bool
var url_params : ProviderAPIParams
Static methods
def cache_check(*, provider: ProviderIntegrator | ProviderRecord, facility: str = 'Initializing', update_only: bool = False) ‑> list[ProviderRecord]
-
check provider cache before search public API / DB and update cache if NPI has been found.
KwArgs
provider
:ProviderIntegrator
- provider integrator instance containing
- search criteria.
facility
:str
- facility name, resets the cache if changed
update_only
:bool
- initialize cache value without "found in cache" log
Returns
list[ProviderRecord]
- list containing a single provider record if
found in cache. list emulates return value of other search functions.
Instance variables
prop last_api_response : None | dict[str, Any]
-
last public API query response
prop last_url_params : dict[str, Any]
-
last public API query parameters
prop query_name
-
name of last query function
Methods
def next_query(self) ‑> Iterator[list[ProviderRecord] | list[ProviderAPIResult]]
-
generate all valid local and query parameterizations in order of priority for successive db searches
def search(self, is_anes_provider: bool, full_name: str | vStr, npi: str | vStr = '', mode: str | None = None) ‑> tuple[utilities.v_str.vStr, utilities.v_str.vStr]
-
progressively search local DB and public API with supplied provider data and return (a) fully populated vStr objects for the provider name and NPI upon a successful lookup or (b) an 'original value only' vStr object for the provider name and a "null" vStr upon failure.