Skip to content

Commit

Permalink
Merge pull request #1436 from cityofaustin/thread-cr3-file-metadata
Browse files Browse the repository at this point in the history
Thread cr3 file metadata ETL
  • Loading branch information
frankhereford authored May 2, 2024
2 parents 66ac440 + 0510864 commit 4b689f5
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 38 deletions.
1 change: 1 addition & 0 deletions atd-etl/populate_cr3_file_metadata/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
env
2 changes: 2 additions & 0 deletions atd-etl/populate_cr3_file_metadata/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
*pdf
env

2 changes: 0 additions & 2 deletions atd-etl/populate_cr3_file_metadata/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,5 @@ RUN apt-get install -y file
WORKDIR /app
COPY . /app

RUN chmod -R 755 /app/*

# Install requirements
RUN pip install -r requirements.txt
29 changes: 29 additions & 0 deletions atd-etl/populate_cr3_file_metadata/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# CR3 metadata generator ETL

## Intent

This program is used to inspect the CR3s on file in S3 and to generate some metadata about them.
This metadata is then stored in the database and used by the various portions of the stack to determine
if and when a CR3 can be made available to users.

## Airflow DAG

This program is a python script which is bundled up into a docker image and to be run by [Airflow](https://github.com/cityofaustin/atd-airflow/blob/production/dags/vz_populate_cr3_metadata.py).

## Local use

This program also comes with a docker compose stack. To run it locally, you can do something along these lines:

```bash
cp env_template env;

# Edit the env file to include the necessary environment variables

# The following command will drop you in a bash shell in the metadata container.
docker compose run metadata;

# some example invocations
./populate_cr3_file_metadata.py -h; # see the options
./populate_cr3_file_metadata.py -a; # process the entire queue, quietly, with a progress bar
./populate_cr3_file_metadata.py -t 10 -s 100 -v; # process the first 100 CR3s in the queue, verbosely, using 10 threads.
```
10 changes: 10 additions & 0 deletions atd-etl/populate_cr3_file_metadata/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
services:
metadata:
build:
context: .
dockerfile: Dockerfile
volumes:
- .:/app
entrypoint: /bin/bash
env_file:
- env
7 changes: 7 additions & 0 deletions atd-etl/populate_cr3_file_metadata/env_template
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
PDF_MAX_RECORDS=100
AWS_BUCKET_NAME=
AWS_BUCKET_ENVIRONMENT=
HASURA_ENDPOINT=
HASURA_ADMIN_KEY=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
137 changes: 104 additions & 33 deletions atd-etl/populate_cr3_file_metadata/populate_cr3_file_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,50 @@
import os, json, boto3, datetime, requests
from typing import Optional, Set
from string import Template
import argparse
from tqdm import tqdm
import tempfile
from concurrent.futures import ThreadPoolExecutor

PDF_MIME_COMMAND = Template("/usr/bin/file -b --mime $PDF_FILE")
PDF_MAX_RECORDS = os.getenv("PDF_MAX_RECORDS", 100)

AWS_BUCKET_NAME = os.getenv("AWS_BUCKET_NAME", "")
AWS_BUCKET_ENVIRONMENT = os.getenv("AWS_BUCKET_ENVIRONMENT", "")
AWS_S3_CLIENT = boto3.client("s3")

def main():
"""
Main Loop
"""
parser = argparse.ArgumentParser(description='Process some CR3s for metadata')
parser.add_argument('-t', '--threads', type=int, default=25, help='Number of threads')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
parser.add_argument('-s', '--batch-size', type=int, default=os.getenv('PDF_MAX_RECORDS', 100), help='Batch size')
parser.add_argument('-a', '--all', action='store_true', help='Process all records')

args = parser.parse_args()

if args.threads <= 0:
raise argparse.ArgumentTypeError("Number of threads must be a positive integer")

if args.batch_size < 0:
raise argparse.ArgumentTypeError("Batch size must be a non-negative integer")

PDF_MAX_RECORDS = 0 if args.all else args.batch_size

todo_count = get_todo_count() if args.all else args.batch_size
print(f"\nItems to process: {todo_count}\n")

with ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = []
for crash_id in get_records(limit=PDF_MAX_RECORDS):
futures.append(executor.submit(process_record, crash_id=crash_id, args=args))

if not args.verbose:
for future in tqdm(futures, total=todo_count):
future.result()
else:
for future in futures:
future.result()

def is_crash_id(crash_id: int) -> bool:
"""
Expand Down Expand Up @@ -146,15 +182,16 @@ def get_file_metadata(crash_id: int) -> dict:
}


def process_record(crash_id: int) -> bool:
def process_record(crash_id: int, args) -> bool:
"""
Controls the process for a single file
"""
if not is_crash_id(crash_id):
print(f"Invalid crash_id: {crash_id}")
return False

print(f"Processing crash_id: {crash_id}")
if args.verbose:
print(f"Processing crash_id: {crash_id}")

# 1. Download file to disk
if not download_file(crash_id):
Expand All @@ -171,10 +208,11 @@ def process_record(crash_id: int) -> bool:
print(f"Invalid metadata for file for crash_id: {crash_id}")
return False

print("Metadata: " + json.dumps(metadata))
if args.verbose:
print("Metadata: " + json.dumps(metadata))

# 3. Execute GraphQL with new metadata
update_metadata(crash_id, metadata)
update_metadata(crash_id, metadata, args)

# 4. Delete the file from disk
delete_file(crash_id)
Expand Down Expand Up @@ -214,31 +252,77 @@ def get_file_metadata_cloud(crash_id: int) -> Optional[dict]:
except (IndexError, TypeError, KeyError):
return None

def get_todo_count() -> int:
"""
Returns the count of all CR3 crashes without CR3 PDF metadata
:return int:
"""
query_get_unverified_cr3_count = """
query getUnverifiedCr3Count {
atd_txdot_crashes_aggregate(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}) {
aggregate {
count
}
}
}
"""

def get_records(limit: int = 100) -> Set[int]:
response = requests.post(
url=os.getenv("HASURA_ENDPOINT"),
headers={
"Accept": "*/*",
"content-type": "application/json",
"x-hasura-admin-secret": os.getenv("HASURA_ADMIN_KEY"),
},
json={"query": query_get_unverified_cr3_count},
)

try:
response.encoding = "utf-8"
count = response.json()

except Exception as e:
print("Error: " + str(e))
count = {"data": {"atd_txdot_crashes_aggregate": {"aggregate": {"count": 0}}}}

return count['data']['atd_txdot_crashes_aggregate']['aggregate']['count']

def get_records(limit: int = 0) -> Set[int]:
"""
Returns a list of all CR3 crashes without CR3 PDF metadata
:return Set[int]:
"""
if not str(limit).isdigit() or limit == 0:
if not str(limit).isdigit():
return set()

# Executes GraphQL to fetch any crashes without CR3 PDF metadata, with a limit as indicated
query_get_records = """
query getUnverifiedCr3($limit:Int!) {
atd_txdot_crashes(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}, limit: $limit, order_by: {crash_id: desc}) {
crash_id
}
}
"""
if limit > 0:
query_get_records = """
query getUnverifiedCr3($limit:Int!) {
atd_txdot_crashes(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}, limit: $limit, order_by: {crash_id: desc}) {
crash_id
}
}
"""
variables = {"limit": int(limit)}
else:
query_get_records = """
query getUnverifiedCr3 {
atd_txdot_crashes(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}, order_by: {crash_id: desc}) {
crash_id
}
}
"""
variables = {}

response = requests.post(
url=os.getenv("HASURA_ENDPOINT"),
headers={
"Accept": "*/*",
"content-type": "application/json",
"x-hasura-admin-secret": os.getenv("HASURA_ADMIN_KEY"),
},
json={"query": query_get_records, "variables": {"limit": int(limit)}},
json={"query": query_get_records, "variables": variables},
)

try:
Expand All @@ -249,14 +333,9 @@ def get_records(limit: int = 100) -> Set[int]:
print("Error: " + str(e))
records = {"data": {"atd_txdot_crashes": []}}

try:
return set(map(lambda x: x["crash_id"], records["data"]["atd_txdot_crashes"]))
except (KeyError, TypeError):
print("No data available. Response: ", records)
return set()

return {record['crash_id'] for record in records['data']['atd_txdot_crashes']}

def update_metadata(crash_id: int, metadata: dict) -> bool:
def update_metadata(crash_id: int, metadata: dict, args) -> bool:
"""
Returns True if it manages to update the metadata for an object.
:param int crash_id: The crash id to be updated
Expand All @@ -270,7 +349,8 @@ def update_metadata(crash_id: int, metadata: dict) -> bool:
):
return False

print(f"Updating crash_id: {crash_id}, with metadata: {json.dumps(metadata)}")
if args.verbose:
print(f"Updating crash_id: {crash_id}, with metadata: {json.dumps(metadata)}")

mutation_update_crash_cr3_metadata = """
mutation updateCrashCr3Metadata($crash_id:Int!, $metadata: jsonb) {
Expand Down Expand Up @@ -306,14 +386,5 @@ def update_metadata(crash_id: int, metadata: dict) -> bool:
return False


def main():
"""
Main Loop
"""
# Processes each one of these crashes using process_record function
for crash_id in get_records(PDF_MAX_RECORDS):
process_record(crash_id)


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions atd-etl/populate_cr3_file_metadata/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
boto3==1.33.*
requests==2.31.*
tqdm==4.*
1 change: 0 additions & 1 deletion docker-compose-docker-volume.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.7'
services:
postgis:
container_name: visionzero-postgis
Expand Down
1 change: 0 additions & 1 deletion docker-compose-ram-disk.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.7'
services:
postgis:
container_name: visionzero-postgis
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.7'
services:
graphql-engine:
image: hasura/graphql-engine:v2.38.1
Expand Down

0 comments on commit 4b689f5

Please sign in to comment.