diff --git a/atd-etl/populate_cr3_file_metadata/.dockerignore b/atd-etl/populate_cr3_file_metadata/.dockerignore new file mode 100644 index 000000000..0a764a4de --- /dev/null +++ b/atd-etl/populate_cr3_file_metadata/.dockerignore @@ -0,0 +1 @@ +env diff --git a/atd-etl/populate_cr3_file_metadata/.gitignore b/atd-etl/populate_cr3_file_metadata/.gitignore index 5f8bb4330..3c4b7906b 100644 --- a/atd-etl/populate_cr3_file_metadata/.gitignore +++ b/atd-etl/populate_cr3_file_metadata/.gitignore @@ -1 +1,3 @@ *pdf +env + diff --git a/atd-etl/populate_cr3_file_metadata/Dockerfile b/atd-etl/populate_cr3_file_metadata/Dockerfile index 6d5fdcb57..8a2f1dfe5 100644 --- a/atd-etl/populate_cr3_file_metadata/Dockerfile +++ b/atd-etl/populate_cr3_file_metadata/Dockerfile @@ -8,7 +8,5 @@ RUN apt-get install -y file WORKDIR /app COPY . /app -RUN chmod -R 755 /app/* - # Install requirements RUN pip install -r requirements.txt diff --git a/atd-etl/populate_cr3_file_metadata/README.md b/atd-etl/populate_cr3_file_metadata/README.md new file mode 100644 index 000000000..6c4c0bcf1 --- /dev/null +++ b/atd-etl/populate_cr3_file_metadata/README.md @@ -0,0 +1,29 @@ +# CR3 metadata generator ETL + +## Intent + +This program is used to inspect the CR3s on file in S3 and to generate some metadata about them. +This metadata is then stored in the database and used by the various portions of the stack to determine +if and when a CR3 can be made available to users. + +## Airflow DAG + +This program is a python script which is bundled up into a docker image and to be run by [Airflow](https://github.com/cityofaustin/atd-airflow/blob/production/dags/vz_populate_cr3_metadata.py). + +## Local use + +This program also comes with a docker compose stack. To run it locally, you can do something along these lines: + +```bash +cp env_template env; + +# Edit the env file to include the necessary environment variables + +# The following command will drop you in a bash shell in the metadata container. +docker compose run metadata; + +# some example invocations +./populate_cr3_file_metadata.py -h; # see the options +./populate_cr3_file_metadata.py -a; # process the entire queue, quietly, with a progress bar +./populate_cr3_file_metadata.py -t 10 -s 100 -v; # process the first 100 CR3s in the queue, verbosely, using 10 threads. +``` diff --git a/atd-etl/populate_cr3_file_metadata/docker-compose.yaml b/atd-etl/populate_cr3_file_metadata/docker-compose.yaml new file mode 100644 index 000000000..c28f930f4 --- /dev/null +++ b/atd-etl/populate_cr3_file_metadata/docker-compose.yaml @@ -0,0 +1,10 @@ +services: + metadata: + build: + context: . + dockerfile: Dockerfile + volumes: + - .:/app + entrypoint: /bin/bash + env_file: + - env diff --git a/atd-etl/populate_cr3_file_metadata/env_template b/atd-etl/populate_cr3_file_metadata/env_template new file mode 100644 index 000000000..267112955 --- /dev/null +++ b/atd-etl/populate_cr3_file_metadata/env_template @@ -0,0 +1,7 @@ +PDF_MAX_RECORDS=100 +AWS_BUCKET_NAME= +AWS_BUCKET_ENVIRONMENT= +HASURA_ENDPOINT= +HASURA_ADMIN_KEY= +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= diff --git a/atd-etl/populate_cr3_file_metadata/populate_cr3_file_metadata.py b/atd-etl/populate_cr3_file_metadata/populate_cr3_file_metadata.py index f968828fe..6af157cdf 100755 --- a/atd-etl/populate_cr3_file_metadata/populate_cr3_file_metadata.py +++ b/atd-etl/populate_cr3_file_metadata/populate_cr3_file_metadata.py @@ -3,14 +3,50 @@ import os, json, boto3, datetime, requests from typing import Optional, Set from string import Template +import argparse +from tqdm import tqdm +import tempfile +from concurrent.futures import ThreadPoolExecutor PDF_MIME_COMMAND = Template("/usr/bin/file -b --mime $PDF_FILE") -PDF_MAX_RECORDS = os.getenv("PDF_MAX_RECORDS", 100) - AWS_BUCKET_NAME = os.getenv("AWS_BUCKET_NAME", "") AWS_BUCKET_ENVIRONMENT = os.getenv("AWS_BUCKET_ENVIRONMENT", "") AWS_S3_CLIENT = boto3.client("s3") +def main(): + """ + Main Loop + """ + parser = argparse.ArgumentParser(description='Process some CR3s for metadata') + parser.add_argument('-t', '--threads', type=int, default=25, help='Number of threads') + parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') + parser.add_argument('-s', '--batch-size', type=int, default=os.getenv('PDF_MAX_RECORDS', 100), help='Batch size') + parser.add_argument('-a', '--all', action='store_true', help='Process all records') + + args = parser.parse_args() + + if args.threads <= 0: + raise argparse.ArgumentTypeError("Number of threads must be a positive integer") + + if args.batch_size < 0: + raise argparse.ArgumentTypeError("Batch size must be a non-negative integer") + + PDF_MAX_RECORDS = 0 if args.all else args.batch_size + + todo_count = get_todo_count() if args.all else args.batch_size + print(f"\nItems to process: {todo_count}\n") + + with ThreadPoolExecutor(max_workers=args.threads) as executor: + futures = [] + for crash_id in get_records(limit=PDF_MAX_RECORDS): + futures.append(executor.submit(process_record, crash_id=crash_id, args=args)) + + if not args.verbose: + for future in tqdm(futures, total=todo_count): + future.result() + else: + for future in futures: + future.result() def is_crash_id(crash_id: int) -> bool: """ @@ -146,7 +182,7 @@ def get_file_metadata(crash_id: int) -> dict: } -def process_record(crash_id: int) -> bool: +def process_record(crash_id: int, args) -> bool: """ Controls the process for a single file """ @@ -154,7 +190,8 @@ def process_record(crash_id: int) -> bool: print(f"Invalid crash_id: {crash_id}") return False - print(f"Processing crash_id: {crash_id}") + if args.verbose: + print(f"Processing crash_id: {crash_id}") # 1. Download file to disk if not download_file(crash_id): @@ -171,10 +208,11 @@ def process_record(crash_id: int) -> bool: print(f"Invalid metadata for file for crash_id: {crash_id}") return False - print("Metadata: " + json.dumps(metadata)) + if args.verbose: + print("Metadata: " + json.dumps(metadata)) # 3. Execute GraphQL with new metadata - update_metadata(crash_id, metadata) + update_metadata(crash_id, metadata, args) # 4. Delete the file from disk delete_file(crash_id) @@ -214,23 +252,69 @@ def get_file_metadata_cloud(crash_id: int) -> Optional[dict]: except (IndexError, TypeError, KeyError): return None +def get_todo_count() -> int: + """ + Returns the count of all CR3 crashes without CR3 PDF metadata + :return int: + """ + query_get_unverified_cr3_count = """ + query getUnverifiedCr3Count { + atd_txdot_crashes_aggregate(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}) { + aggregate { + count + } + } + } + """ -def get_records(limit: int = 100) -> Set[int]: + response = requests.post( + url=os.getenv("HASURA_ENDPOINT"), + headers={ + "Accept": "*/*", + "content-type": "application/json", + "x-hasura-admin-secret": os.getenv("HASURA_ADMIN_KEY"), + }, + json={"query": query_get_unverified_cr3_count}, + ) + + try: + response.encoding = "utf-8" + count = response.json() + + except Exception as e: + print("Error: " + str(e)) + count = {"data": {"atd_txdot_crashes_aggregate": {"aggregate": {"count": 0}}}} + + return count['data']['atd_txdot_crashes_aggregate']['aggregate']['count'] + +def get_records(limit: int = 0) -> Set[int]: """ Returns a list of all CR3 crashes without CR3 PDF metadata :return Set[int]: """ - if not str(limit).isdigit() or limit == 0: + if not str(limit).isdigit(): return set() # Executes GraphQL to fetch any crashes without CR3 PDF metadata, with a limit as indicated - query_get_records = """ - query getUnverifiedCr3($limit:Int!) { - atd_txdot_crashes(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}, limit: $limit, order_by: {crash_id: desc}) { - crash_id - } - } - """ + if limit > 0: + query_get_records = """ + query getUnverifiedCr3($limit:Int!) { + atd_txdot_crashes(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}, limit: $limit, order_by: {crash_id: desc}) { + crash_id + } + } + """ + variables = {"limit": int(limit)} + else: + query_get_records = """ + query getUnverifiedCr3 { + atd_txdot_crashes(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}, order_by: {crash_id: desc}) { + crash_id + } + } + """ + variables = {} + response = requests.post( url=os.getenv("HASURA_ENDPOINT"), headers={ @@ -238,7 +322,7 @@ def get_records(limit: int = 100) -> Set[int]: "content-type": "application/json", "x-hasura-admin-secret": os.getenv("HASURA_ADMIN_KEY"), }, - json={"query": query_get_records, "variables": {"limit": int(limit)}}, + json={"query": query_get_records, "variables": variables}, ) try: @@ -249,14 +333,9 @@ def get_records(limit: int = 100) -> Set[int]: print("Error: " + str(e)) records = {"data": {"atd_txdot_crashes": []}} - try: - return set(map(lambda x: x["crash_id"], records["data"]["atd_txdot_crashes"])) - except (KeyError, TypeError): - print("No data available. Response: ", records) - return set() - + return {record['crash_id'] for record in records['data']['atd_txdot_crashes']} -def update_metadata(crash_id: int, metadata: dict) -> bool: +def update_metadata(crash_id: int, metadata: dict, args) -> bool: """ Returns True if it manages to update the metadata for an object. :param int crash_id: The crash id to be updated @@ -270,7 +349,8 @@ def update_metadata(crash_id: int, metadata: dict) -> bool: ): return False - print(f"Updating crash_id: {crash_id}, with metadata: {json.dumps(metadata)}") + if args.verbose: + print(f"Updating crash_id: {crash_id}, with metadata: {json.dumps(metadata)}") mutation_update_crash_cr3_metadata = """ mutation updateCrashCr3Metadata($crash_id:Int!, $metadata: jsonb) { @@ -306,14 +386,5 @@ def update_metadata(crash_id: int, metadata: dict) -> bool: return False -def main(): - """ - Main Loop - """ - # Processes each one of these crashes using process_record function - for crash_id in get_records(PDF_MAX_RECORDS): - process_record(crash_id) - - if __name__ == "__main__": main() diff --git a/atd-etl/populate_cr3_file_metadata/requirements.txt b/atd-etl/populate_cr3_file_metadata/requirements.txt index 2e08e9be3..2634628b7 100644 --- a/atd-etl/populate_cr3_file_metadata/requirements.txt +++ b/atd-etl/populate_cr3_file_metadata/requirements.txt @@ -1,2 +1,3 @@ boto3==1.33.* requests==2.31.* +tqdm==4.* diff --git a/docker-compose-docker-volume.yml b/docker-compose-docker-volume.yml index b91395d90..01d32d1cb 100644 --- a/docker-compose-docker-volume.yml +++ b/docker-compose-docker-volume.yml @@ -1,4 +1,3 @@ -version: '3.7' services: postgis: container_name: visionzero-postgis diff --git a/docker-compose-ram-disk.yml b/docker-compose-ram-disk.yml index c962c04f5..fc87c1e54 100644 --- a/docker-compose-ram-disk.yml +++ b/docker-compose-ram-disk.yml @@ -1,4 +1,3 @@ -version: '3.7' services: postgis: container_name: visionzero-postgis diff --git a/docker-compose.yml b/docker-compose.yml index b4ad1fe86..505da6aed 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.7' services: graphql-engine: image: hasura/graphql-engine:v2.38.1