Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread cr3 file metadata ETL #1436

Merged
merged 13 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions atd-etl/populate_cr3_file_metadata/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
env
2 changes: 2 additions & 0 deletions atd-etl/populate_cr3_file_metadata/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
*pdf
env
frankhereford marked this conversation as resolved.
Show resolved Hide resolved

2 changes: 0 additions & 2 deletions atd-etl/populate_cr3_file_metadata/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,5 @@ RUN apt-get install -y file
WORKDIR /app
COPY . /app

RUN chmod -R 755 /app/*

# Install requirements
RUN pip install -r requirements.txt
29 changes: 29 additions & 0 deletions atd-etl/populate_cr3_file_metadata/README.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# CR3 metadata generator ETL

## Intent

This program is used to inspect the CR3s on file in S3 and to generate some metadata about them.
This metadata is then stored in the database and used by the various portions of the stack to determine
if and when a CR3 can be made available to users.

## Airflow DAG

This program is a python script which is bundled up into a docker image and to be run by [Airflow](https://github.com/cityofaustin/atd-airflow/blob/production/dags/vz_populate_cr3_metadata.py).

## Local use

This program also comes with a docker compose stack. To run it locally, you can do something along these lines:

```bash
cp env_template env;

# Edit the env file to include the necessary environment variables

# The following command will drop you in a bash shell in the metadata container.
docker compose run metadata;

# some example invocations
./populate_cr3_file_metadata.py -h; # see the options
./populate_cr3_file_metadata.py -a; # process the entire queue, quietly, with a progress bar
./populate_cr3_file_metadata.py -t 10 -s 100 -v; # process the first 100 CR3s in the queue, verbosely, using 10 threads.
```
10 changes: 10 additions & 0 deletions atd-etl/populate_cr3_file_metadata/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
services:
metadata:
build:
context: .
dockerfile: Dockerfile
volumes:
- .:/app
entrypoint: /bin/bash
env_file:
- env
7 changes: 7 additions & 0 deletions atd-etl/populate_cr3_file_metadata/env_template
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
PDF_MAX_RECORDS=100
AWS_BUCKET_NAME=
AWS_BUCKET_ENVIRONMENT=
HASURA_ENDPOINT=
HASURA_ADMIN_KEY=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
137 changes: 104 additions & 33 deletions atd-etl/populate_cr3_file_metadata/populate_cr3_file_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,50 @@
import os, json, boto3, datetime, requests
from typing import Optional, Set
from string import Template
import argparse
from tqdm import tqdm
import tempfile
from concurrent.futures import ThreadPoolExecutor

PDF_MIME_COMMAND = Template("/usr/bin/file -b --mime $PDF_FILE")
PDF_MAX_RECORDS = os.getenv("PDF_MAX_RECORDS", 100)

AWS_BUCKET_NAME = os.getenv("AWS_BUCKET_NAME", "")
AWS_BUCKET_ENVIRONMENT = os.getenv("AWS_BUCKET_ENVIRONMENT", "")
AWS_S3_CLIENT = boto3.client("s3")

def main():
"""
Main Loop
"""
parser = argparse.ArgumentParser(description='Process some CR3s for metadata')
parser.add_argument('-t', '--threads', type=int, default=25, help='Number of threads')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
parser.add_argument('-s', '--batch-size', type=int, default=os.getenv('PDF_MAX_RECORDS', 100), help='Batch size')
parser.add_argument('-a', '--all', action='store_true', help='Process all records')
Comment on lines +20 to +24
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming arguments and picking the right ones is always a challenge, but I like these OK. In airflow, we'll run this like:

populate_cr3_file_metadata.py -v 

And while we're catching up manually on our local machines, something like:

./populate_cr3_file_metadata.py -a


args = parser.parse_args()

if args.threads <= 0:
raise argparse.ArgumentTypeError("Number of threads must be a positive integer")

if args.batch_size < 0:
raise argparse.ArgumentTypeError("Batch size must be a non-negative integer")

PDF_MAX_RECORDS = 0 if args.all else args.batch_size

todo_count = get_todo_count() if args.all else args.batch_size
print(f"\nItems to process: {todo_count}\n")

with ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = []
for crash_id in get_records(limit=PDF_MAX_RECORDS):
futures.append(executor.submit(process_record, crash_id=crash_id, args=args))

if not args.verbose:
for future in tqdm(futures, total=todo_count):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✨ fancy progress bar. this library is a gem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL!

future.result()
else:
for future in futures:
future.result()

def is_crash_id(crash_id: int) -> bool:
"""
Expand Down Expand Up @@ -146,15 +182,16 @@ def get_file_metadata(crash_id: int) -> dict:
}


def process_record(crash_id: int) -> bool:
def process_record(crash_id: int, args) -> bool:
"""
Controls the process for a single file
"""
if not is_crash_id(crash_id):
print(f"Invalid crash_id: {crash_id}")
return False

print(f"Processing crash_id: {crash_id}")
if args.verbose:
print(f"Processing crash_id: {crash_id}")
Comment on lines +193 to +194
Copy link
Contributor

@mddilley mddilley Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a showstopper but I like john's comment on the other threading PR about introducing logging levels to control verbosity. I think it would be a good goal for us to do that when possible and could totally be a spin-off since this PR was aimed at getting this up and running to get caught up.

Copy link
Member Author

@frankhereford frankhereford Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea i totally like the idea of returning to these scripts and giving them a once-over in terms of logging. It'd be good, I think, to chat as a group about what we want to see in our logging and then, with that shared goal, come back through all the VZ etls and make them similar where we can. Adopt argparse, logging, threading, and docker compose for local dev 🤔 . @johnclary - I'm curious how you feel about that - maybe something we can put on the slow cooker in the backlog for when we're caught up?


# 1. Download file to disk
if not download_file(crash_id):
Expand All @@ -171,10 +208,11 @@ def process_record(crash_id: int) -> bool:
print(f"Invalid metadata for file for crash_id: {crash_id}")
return False

print("Metadata: " + json.dumps(metadata))
if args.verbose:
print("Metadata: " + json.dumps(metadata))

# 3. Execute GraphQL with new metadata
update_metadata(crash_id, metadata)
update_metadata(crash_id, metadata, args)

# 4. Delete the file from disk
delete_file(crash_id)
Expand Down Expand Up @@ -214,31 +252,77 @@ def get_file_metadata_cloud(crash_id: int) -> Optional[dict]:
except (IndexError, TypeError, KeyError):
return None

def get_todo_count() -> int:
"""
Returns the count of all CR3 crashes without CR3 PDF metadata
:return int:
"""
query_get_unverified_cr3_count = """
query getUnverifiedCr3Count {
atd_txdot_crashes_aggregate(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}) {
aggregate {
count
}
}
}
"""

def get_records(limit: int = 100) -> Set[int]:
response = requests.post(
url=os.getenv("HASURA_ENDPOINT"),
headers={
"Accept": "*/*",
"content-type": "application/json",
"x-hasura-admin-secret": os.getenv("HASURA_ADMIN_KEY"),
},
json={"query": query_get_unverified_cr3_count},
)

try:
response.encoding = "utf-8"
count = response.json()

except Exception as e:
print("Error: " + str(e))
count = {"data": {"atd_txdot_crashes_aggregate": {"aggregate": {"count": 0}}}}

return count['data']['atd_txdot_crashes_aggregate']['aggregate']['count']

def get_records(limit: int = 0) -> Set[int]:
"""
Returns a list of all CR3 crashes without CR3 PDF metadata
:return Set[int]:
"""
if not str(limit).isdigit() or limit == 0:
if not str(limit).isdigit():
return set()

# Executes GraphQL to fetch any crashes without CR3 PDF metadata, with a limit as indicated
query_get_records = """
query getUnverifiedCr3($limit:Int!) {
atd_txdot_crashes(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}, limit: $limit, order_by: {crash_id: desc}) {
crash_id
}
}
"""
if limit > 0:
query_get_records = """
query getUnverifiedCr3($limit:Int!) {
atd_txdot_crashes(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}, limit: $limit, order_by: {crash_id: desc}) {
crash_id
}
}
"""
variables = {"limit": int(limit)}
else:
query_get_records = """
query getUnverifiedCr3 {
atd_txdot_crashes(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}, order_by: {crash_id: desc}) {
crash_id
}
}
"""
variables = {}

response = requests.post(
url=os.getenv("HASURA_ENDPOINT"),
headers={
"Accept": "*/*",
"content-type": "application/json",
"x-hasura-admin-secret": os.getenv("HASURA_ADMIN_KEY"),
},
json={"query": query_get_records, "variables": {"limit": int(limit)}},
json={"query": query_get_records, "variables": variables},
)

try:
Expand All @@ -249,14 +333,9 @@ def get_records(limit: int = 100) -> Set[int]:
print("Error: " + str(e))
records = {"data": {"atd_txdot_crashes": []}}

try:
return set(map(lambda x: x["crash_id"], records["data"]["atd_txdot_crashes"]))
except (KeyError, TypeError):
print("No data available. Response: ", records)
return set()

return {record['crash_id'] for record in records['data']['atd_txdot_crashes']}

def update_metadata(crash_id: int, metadata: dict) -> bool:
def update_metadata(crash_id: int, metadata: dict, args) -> bool:
"""
Returns True if it manages to update the metadata for an object.
:param int crash_id: The crash id to be updated
Expand All @@ -270,7 +349,8 @@ def update_metadata(crash_id: int, metadata: dict) -> bool:
):
return False

print(f"Updating crash_id: {crash_id}, with metadata: {json.dumps(metadata)}")
if args.verbose:
print(f"Updating crash_id: {crash_id}, with metadata: {json.dumps(metadata)}")

mutation_update_crash_cr3_metadata = """
mutation updateCrashCr3Metadata($crash_id:Int!, $metadata: jsonb) {
Expand Down Expand Up @@ -306,14 +386,5 @@ def update_metadata(crash_id: int, metadata: dict) -> bool:
return False


def main():
"""
Main Loop
"""
# Processes each one of these crashes using process_record function
for crash_id in get_records(PDF_MAX_RECORDS):
process_record(crash_id)


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions atd-etl/populate_cr3_file_metadata/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
boto3==1.33.*
requests==2.31.*
tqdm==4.*
1 change: 0 additions & 1 deletion docker-compose-docker-volume.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.7'
services:
postgis:
container_name: visionzero-postgis
Expand Down
1 change: 0 additions & 1 deletion docker-compose-ram-disk.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.7'
services:
postgis:
container_name: visionzero-postgis
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.7'
services:
graphql-engine:
image: hasura/graphql-engine:v2.38.1
Expand Down
Loading