Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread cr3 file metadata ETL #1436

Merged
merged 13 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions atd-etl/populate_cr3_file_metadata/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
*pdf
env
frankhereford marked this conversation as resolved.
Show resolved Hide resolved

2 changes: 0 additions & 2 deletions atd-etl/populate_cr3_file_metadata/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,5 @@ RUN apt-get install -y file
WORKDIR /app
COPY . /app

RUN chmod -R 755 /app/*

# Install requirements
RUN pip install -r requirements.txt
10 changes: 10 additions & 0 deletions atd-etl/populate_cr3_file_metadata/docker-compose.yaml
frankhereford marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
services:
metadata:
build:
context: .
dockerfile: Dockerfile
volumes:
- .:/app
entrypoint: /bin/bash
env_file:
- env
7 changes: 7 additions & 0 deletions atd-etl/populate_cr3_file_metadata/env_template
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's where you would get these values: https://github.com/cityofaustin/atd-airflow/blob/production/dags/vz_populate_cr3_metadata.py#L20-L45. Also, you can point it at your local stack with something like this for the endpoint:

HASURA_ENDPOINT=http://host.docker.internal:8084/v1/graphql

NB: It takes a little extra fiddling to work around that magic DNS name on linux. I have no idea why Docker desktop provides it but good ol' normal docker does not. 🤯

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
PDF_MAX_RECORDS=100
AWS_BUCKET_NAME=
AWS_BUCKET_ENVIRONMENT=
HASURA_ENDPOINT=
HASURA_ADMIN_KEY=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
139 changes: 105 additions & 34 deletions atd-etl/populate_cr3_file_metadata/populate_cr3_file_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,50 @@
import os, json, boto3, datetime, requests
from typing import Optional, Set
from string import Template
import argparse
from tqdm import tqdm
import tempfile
from concurrent.futures import ThreadPoolExecutor

PDF_MIME_COMMAND = Template("/usr/bin/file -b --mime $PDF_FILE")
PDF_MAX_RECORDS = os.getenv("PDF_MAX_RECORDS", 100)

AWS_BUCKET_NAME = os.getenv("AWS_BUCKET_NAME", "")
AWS_BUCKET_ENVIRONMENT = os.getenv("AWS_BUCKET_ENVIRONMENT", "")
AWS_S3_CLIENT = boto3.client("s3")

def main():
"""
Main Loop
"""
parser = argparse.ArgumentParser(description='Process some CR3s for metadata')
parser.add_argument('-t', '--threads', type=int, default=25, help='Number of threads')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
parser.add_argument('-s', '--batch-size', type=int, default=os.getenv('PDF_MAX_RECORDS', 100), help='Batch size')
parser.add_argument('-a', '--all', action='store_true', help='Process all records')
Comment on lines +20 to +24
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming arguments and picking the right ones is always a challenge, but I like these OK. In airflow, we'll run this like:

populate_cr3_file_metadata.py -v 

And while we're catching up manually on our local machines, something like:

./populate_cr3_file_metadata.py -a


args = parser.parse_args()

if args.threads <= 0:
raise argparse.ArgumentTypeError("Number of threads must be a positive integer")

if args.batch_size < 0:
raise argparse.ArgumentTypeError("Batch size must be a non-negative integer")

PDF_MAX_RECORDS = 0 if args.all else args.batch_size

todo_count = get_todo_count() if args.all else args.batch_size
print(f"\nItems to process: {todo_count}\n")

with ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = []
for crash_id in get_records(limit=PDF_MAX_RECORDS):
futures.append(executor.submit(process_record, crash_id=crash_id, args=args))

if not args.verbose:
for future in tqdm(futures, total=todo_count):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✨ fancy progress bar. this library is a gem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL!

future.result()
else:
for future in futures:
future.result()

def is_crash_id(crash_id: int) -> bool:
"""
Expand Down Expand Up @@ -146,15 +182,16 @@ def get_file_metadata(crash_id: int) -> dict:
}


def process_record(crash_id: int) -> bool:
def process_record(crash_id: int, args) -> bool:
"""
Controls the process for a single file
"""
if not is_crash_id(crash_id):
print(f"Invalid crash_id: {crash_id}")
return False

print(f"Processing crash_id: {crash_id}")
if args.verbose:
print(f"Processing crash_id: {crash_id}")
Comment on lines +193 to +194
Copy link
Contributor

@mddilley mddilley Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a showstopper but I like john's comment on the other threading PR about introducing logging levels to control verbosity. I think it would be a good goal for us to do that when possible and could totally be a spin-off since this PR was aimed at getting this up and running to get caught up.

Copy link
Member Author

@frankhereford frankhereford Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea i totally like the idea of returning to these scripts and giving them a once-over in terms of logging. It'd be good, I think, to chat as a group about what we want to see in our logging and then, with that shared goal, come back through all the VZ etls and make them similar where we can. Adopt argparse, logging, threading, and docker compose for local dev 🤔 . @johnclary - I'm curious how you feel about that - maybe something we can put on the slow cooker in the backlog for when we're caught up?


# 1. Download file to disk
if not download_file(crash_id):
Expand All @@ -171,10 +208,11 @@ def process_record(crash_id: int) -> bool:
print(f"Invalid metadata for file for crash_id: {crash_id}")
return False

print("Metadata: " + json.dumps(metadata))
if args.verbose:
print("Metadata: " + json.dumps(metadata))

# 3. Execute GraphQL with new metadata
update_metadata(crash_id, metadata)
update_metadata(crash_id, metadata, args)

# 4. Delete the file from disk
delete_file(crash_id)
Expand Down Expand Up @@ -214,31 +252,77 @@ def get_file_metadata_cloud(crash_id: int) -> Optional[dict]:
except (IndexError, TypeError, KeyError):
return None

def get_todo_count() -> int:
"""
Returns the count of all CR3 crashes without CR3 PDF metadata
:return int:
"""
query_get_unverified_cr3_count = """
query getUnverifiedCr3Count {
atd_txdot_crashes_aggregate(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}) {
aggregate {
count
}
}
}
"""

def get_records(limit: int = 100) -> Set[int]:
response = requests.post(
url=os.getenv("HASURA_ENDPOINT"),
headers={
"Accept": "*/*",
"content-type": "application/json",
"x-hasura-admin-secret": os.getenv("HASURA_ADMIN_KEY"),
},
json={"query": query_get_unverified_cr3_count},
)

try:
response.encoding = "utf-8"
count = response.json()

except Exception as e:
print("Error: " + str(e))
count = {"data": {"atd_txdot_crashes_aggregate": {"aggregate": {"count": 0}}}}

return count['data']['atd_txdot_crashes_aggregate']['aggregate']['count']

def get_records(limit: int = 0) -> Set[int]:
"""
Returns a list of all CR3 crashes without CR3 PDF metadata
:return Set[int]:
"""
if not str(limit).isdigit() or limit == 0:
if not str(limit).isdigit():
return set()

# Executes GraphQL to fetch any crashes without CR3 PDF metadata, with a limit as indicated
query_get_records = """
query getUnverifiedCr3($limit:Int!) {
atd_txdot_crashes(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}, limit: $limit, order_by: {crash_id: desc}) {
crash_id
}
}
"""
if limit > 0:
query_get_records = """
query getUnverifiedCr3($limit:Int!) {
atd_txdot_crashes(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}, limit: $limit, order_by: {crash_id: desc}) {
crash_id
}
}
"""
variables = {"limit": int(limit)}
else:
query_get_records = """
query getUnverifiedCr3 {
atd_txdot_crashes(where: {cr3_file_metadata: {_is_null: true}, cr3_stored_flag: {_eq: "Y"}}, order_by: {crash_id: desc}) {
crash_id
}
}
"""
variables = {}

response = requests.post(
url=os.getenv("HASURA_ENDPOINT"),
headers={
"Accept": "*/*",
"content-type": "application/json",
"x-hasura-admin-secret": os.getenv("HASURA_ADMIN_KEY"),
},
json={"query": query_get_records, "variables": {"limit": int(limit)}},
json={"query": query_get_records, "variables": variables},
)

try:
Expand All @@ -249,14 +333,9 @@ def get_records(limit: int = 100) -> Set[int]:
print("Error: " + str(e))
records = {"data": {"atd_txdot_crashes": []}}

try:
return set(map(lambda x: x["crash_id"], records["data"]["atd_txdot_crashes"]))
except (KeyError, TypeError):
print("No data available. Response: ", records)
return set()

return {record['crash_id'] for record in records['data']['atd_txdot_crashes']}

def update_metadata(crash_id: int, metadata: dict) -> bool:
def update_metadata(crash_id: int, metadata: dict, args) -> bool:
"""
Returns True if it manages to update the metadata for an object.
:param int crash_id: The crash id to be updated
Expand All @@ -270,7 +349,8 @@ def update_metadata(crash_id: int, metadata: dict) -> bool:
):
return False

print(f"Updating crash_id: {crash_id}, with metadata: {json.dumps(metadata)}")
if args.verbose:
print(f"Updating crash_id: {crash_id}, with metadata: {json.dumps(metadata)}")

mutation_update_crash_cr3_metadata = """
mutation updateCrashCr3Metadata($crash_id:Int!, $metadata: jsonb) {
Expand Down Expand Up @@ -306,14 +386,5 @@ def update_metadata(crash_id: int, metadata: dict) -> bool:
return False


def main():
"""
Main Loop
"""
# Processes each one of these crashes using process_record function
for crash_id in get_records(PDF_MAX_RECORDS):
process_record(crash_id)


if __name__ == "__main__":
main()
main()
1 change: 1 addition & 0 deletions atd-etl/populate_cr3_file_metadata/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
boto3==1.33.*
requests==2.31.*
tqdm==4.*
1 change: 0 additions & 1 deletion docker-compose-docker-volume.yml
Copy link
Member Author

@frankhereford frankhereford Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tag-along change. We need to, non-urgently, remove the deprecated version designation from our compose files. The same goes for the other two compose files.

Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.7'
services:
postgis:
container_name: visionzero-postgis
Expand Down
1 change: 0 additions & 1 deletion docker-compose-ram-disk.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.7'
services:
postgis:
container_name: visionzero-postgis
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.7'
services:
graphql-engine:
image: hasura/graphql-engine:v2.38.1
Expand Down
Loading