Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

394 refactor a few ingest scripts from r to dbt python #434

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import pandas as pd # noqa: E402
import pyarrow as pa # noqa: E402
import pyarrow.parquet as pq # noqa: E402
import requests # noqa: E402
from openpyxl import load_workbook # noqa: E402

# URL of the webpage
url = "https://price-index.housingstudies.org"

# Make a request to fetch the webpage content
response = requests.get(url)
content = response.text

# Locate the Excel file link within the fetched HTML content
link_start = content.find("/data/")
link_end = content.find(".xlsx", link_start) + len(".xlsx")
xlsx_link = content[link_start:link_end]

# Form the complete URL for the Excel file
most_recent_ihs_data_url = url + \
xlsx_link if xlsx_link.startswith("/") else xlsx_link

# Print the URL
print(most_recent_ihs_data_url)

# Download the Excel file
response = requests.get(most_recent_ihs_data_url)
with open("temp.xlsx", "wb") as f:
f.write(response.content)

# Load the Excel file using openpyxl
data = pd.read_excel(response.content, engine="openpyxl", sheet_name=1)

data = data.toPandas()

data = data.drop(columns="Unnamed: 1")
data = data.drop(columns="Unnamed: 2")
data = data.drop(columns="Unnamed: 3")

data = data.transpose()

data = data.astype(str)

data.reset_index(inplace=True)

data = data.replace({"Unnamed: 0": "puma", "YEARQ": "name"})

data.to_parquet("output.parquet")


def model(dbt, spark_session):
dbt.config(materialized="table")

input = dbt.ref("ihs_housing_input")

spark_df = spark_session.createDataFrame(data)

return spark_df
96 changes: 96 additions & 0 deletions aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import os
import shutil
import tempfile
import urllib.request

import boto3
import geopandas as gpd
import pandas as pd
from botocore.exceptions import ClientError
from dotenv import load_dotenv

dotenv_path = ".py.env"
load_result = load_dotenv(dotenv_path)

load_dotenv("./data-architecture/aws-s3/py.env")
# Set AWS S3 variables
AWS_S3_RAW_BUCKET = os.getenv("AWS_S3_RAW_BUCKET")
output_bucket = os.path.join(print(AWS_S3_RAW_BUCKET), "spatial", "access")

# List APIs from city site
sources_list = [
# INDUSTRIAL CORRIDORS
{
"source": "https://data.cityofchicago.org/api/geospatial/",
"api_url": "e6xh-nr8w?method=export&format=GeoJSON",
"boundary": "industrial_corridor",
"year": "2013",
},
# PARKS
{
"source": "https://opendata.arcgis.com/datasets/",
"api_url": "74d19d6bd7f646ecb34c252ae17cd2f7_7.geojson",
"boundary": "park",
"year": "2021",
},
]


# Function to call referenced API, pull requested data, and write it to S3
def get_data_to_s3(source, api_url, boundary, year):
s3 = boto3.client("s3")

response = urllib.request.urlopen(source + api_url)
data = response.read().decode("utf-8")

s3_path = os.path.join(output_bucket, boundary, f"{year}.geojson")

try:
s3.put_object(
Bucket=os.environ["AWS_S3_RAW_BUCKET"],
Key=s3_path,
Body=data)
except ClientError as e:
print(e)


for source in sources_list:
get_data_to_s3(**source)

# CMAP WALKABILITY
# 2017 Data is no longer available online
raw_walk = pd.DataFrame(
{
"url": [
"https://services5.arcgis.com/LcMXE3TFhi1BSaCY/arcgis/rest/services/Walkability/FeatureServer/0/query?outFields=*&where=1%3D1&f=geojson"
],
"year": ["2018"],
}
)


def get_walkability(url, year):
s3_uri = os.path.join(output_bucket, "walkability", f"{year}.geojson")

if not os.path.exists(s3_uri):
tmp_file = tempfile.NamedTemporaryFile(suffix=".geojson", delete=False)
tmp_dir = os.path.join(tempfile.gettempdir(), "walkability")

# Grab file from CTA, recompress without .htm file
with urllib.request.urlopen(url) as response, open(
tmp_file.name, "wb"
) as out_file:
shutil.copyfileobj(response, out_file)

s3 = boto3.client("s3")
s3_path = os.path.join(output_bucket, "walkability", f"{year}.geojson")

with open(tmp_file.name, "rb") as file:
s3.upload_fileobj(file, os.environ["AWS_S3_RAW_BUCKET"], s3_path)

os.unlink(tmp_file.name)
shutil.rmtree(tmp_dir, ignore_errors=True)


for index, row in raw_walk.iterrows():
get_walkability(row["url"], row["year"])
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import re

import boto3
import pandas as pd
import pyarrow.parquet as pq

# Load environment variables
AWS_S3_RAW_BUCKET = os.getenv("AWS_S3_RAW_BUCKET")
AWS_S3_WAREHOUSE_BUCKET = os.getenv("AWS_S3_WAREHOUSE_BUCKET")
output_bucket = f"{AWS_S3_WAREHOUSE_BUCKET}/housing/ihs_index"

# Initialize S3 client
s3 = boto3.client("s3")


# Get the list of files from the raw S3 bucket
def get_bucket_keys(bucket, prefix):
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
if "Contents" in response:
return [content["Key"] for content in response["Contents"]]
return []


# Find the raw file
raw_keys = get_bucket_keys(AWS_S3_RAW_BUCKET, "housing/ihs_index/")
raw_file_key = raw_keys[0] if raw_keys else None

if raw_file_key:
# Download the raw file locally
local_raw_file = "/tmp/ihs_index.parquet"
s3.download_file(AWS_S3_RAW_BUCKET, raw_file_key, local_raw_file)

# Read the parquet file
df = pd.read_parquet(local_raw_file)

# Rename columns and modify geoid
df.rename(columns={"puma": "geoid"}, inplace=True)
df["geoid"] = df["geoid"].apply(lambda x: re.sub(r"p", "170", x))

# Convert from wide to long
df_long = df.melt(
id_vars=["geoid", "name"], var_name="time", value_name="ihs_index"
)

# Split 'time' column into 'year' and 'quarter'
df_long[["year", "quarter"]] = df_long["time"].str.split("Q", expand=True)
df_long.drop(columns=["time"], inplace=True)

# Reorder columns
df_long = df_long[["geoid", "name", "ihs_index", "quarter", "year"]]


def model(dbt, spark_session):
dbt.config(materialized="table")

input = dbt.ref("housing/ihs_index")

spark_df = spark_session.createDataFrame(df)

return spark_df
136 changes: 136 additions & 0 deletions aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import datetime
import os
from datetime import datetime
import tempfile
import boto3
import geopandas as gpd
import pyarrow.parquet as pq

# Set up AWS credentials
AWS_S3_RAW_BUCKET = os.getenv("AWS_S3_RAW_BUCKET")
AWS_S3_WAREHOUSE_BUCKET = os.getenv("AWS_S3_WAREHOUSE_BUCKET")
s3 = boto3.client("s3")

current_year = datetime.now().strftime("%Y")


# Helper function to save to S3
def upload_to_s3(local_file, bucket, s3_key):
s3.upload_file(local_file, bucket, s3_key)


# Helper function to check if an S3 object exists
def object_exists(bucket, key):
try:
s3.head_object(Bucket=bucket, Key=key)
return True
except:
return False

# Bike Trail Data Processing
def process_bike_trail(spark_session):
bike_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/bike_trail/2021.geojson"
bike_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/bike_trail/year=2021/part-0.parquet"

if not object_exists(AWS_S3_WAREHOUSE_BUCKET, bike_key_warehouse):
with tempfile.NamedTemporaryFile(suffix=".geojson") as temp:
s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/bike_trail/2021.geojson", temp.name)
df_bike = gpd.read_file(temp.name).to_crs(epsg=4326)
df_bike.columns = map(str.lower, df_bike.columns)
df_bike["geometry_3435"] = df_bike["geometry"].to_crs(epsg=3435)
df_bike = df_bike.rename(
columns={
"spdlimit": "speed_limit",
"onstreet": "on_street",
"edtdate": "edit_date",
"trailwdth": "trail_width",
"trailtype": "trail_type",
"trailsurfa": "trail_surface",
}
).drop(columns=["created_us", "shape_stle"])

spark_df = spark_session.createDataFrame(df_bike)
spark_df.write.parquet(bike_key_warehouse)

# Park Data Processing
def process_parks(spark_session):
park_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/park/year=2021/part-0.parquet"

if not object_exists(AWS_S3_WAREHOUSE_BUCKET, park_key_warehouse):
with tempfile.NamedTemporaryFile(suffix=".geojson") as temp:
cook_boundary_key = "spatial/ccao/county/2019.parquet"
s3.download_file(AWS_S3_WAREHOUSE_BUCKET, cook_boundary_key, temp.name)
cook_boundary = gpd.read_parquet(temp.name).to_crs(epsg=4326)

parks_df = gpd.read_file("path/to/local/park_data.geojson").to_crs(epsg=4326)
parks_df["geometry_3435"] = parks_df["geometry"].to_crs(epsg=3435)
parks_df_filtered = parks_df.loc[parks_df.intersects(cook_boundary.unary_union)]

spark_df = spark_session.createDataFrame(parks_df_filtered)
spark_df.write.parquet(park_key_warehouse)

# Industrial Corridor Data Processing
def process_industrial_corridor(spark_session):
indc_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/industrial_corridor/2013.geojson"
indc_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/industrial_corridor/year=2013/part-0.parquet"

if not object_exists(AWS_S3_WAREHOUSE_BUCKET, indc_key_warehouse):
with tempfile.NamedTemporaryFile(suffix=".geojson") as temp:
s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/industrial_corridor/2013.geojson", temp.name)
df_indc = gpd.read_file(temp.name).to_crs(epsg=4326)
df_indc.columns = map(str.lower, df_indc.columns)
df_indc["geometry_3435"] = df_indc["geometry"].to_crs(epsg=3435)
df_indc = df_indc.rename(
columns={
"name": "name",
"region": "region",
"no": "num",
"hud_qualif": "hud_qualif",
}
).loc[:, ["name", "region", "num", "hud_qualif", "acres", "geometry", "geometry_3435"]]

spark_df = spark_session.createDataFrame(df_indc)
spark_df.write.parquet(indc_key_warehouse)

# Cemetery Data Processing
def process_cemetery(spark_session):
ceme_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/cemetery/2021.geojson"
ceme_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/cemetery/year=2021/part-0.parquet"

if not object_exists(AWS_S3_WAREHOUSE_BUCKET, ceme_key_warehouse):
with tempfile.NamedTemporaryFile(suffix=".geojson") as temp:
s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/cemetery/2021.geojson", temp.name)
df_ceme = gpd.read_file(temp.name).to_crs(epsg=4326)
df_ceme.columns = map(str.lower, df_ceme.columns)
df_ceme["geometry_3435"] = df_ceme["geometry"].to_crs(epsg=3435)
df_ceme = df_ceme.rename(columns={"cfname": "name"}).loc[
:, ["name", "address", "gniscode", "source", "community", "comment", "mergeid", "geometry", "geometry_3435"]
]

spark_df = spark_session.createDataFrame(df_ceme)
spark_df.write.parquet(ceme_key_warehouse)

# Walkability Data Processing
def process_walkability(spark_session):
walk_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/walkability/2017.geojson"
walk_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/walkability/year=2017/part-0.parquet"

if not object_exists(AWS_S3_WAREHOUSE_BUCKET, walk_key_warehouse):
with tempfile.NamedTemporaryFile(suffix=".geojson") as temp:
s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/walkability/2017.geojson", temp.name)
df_walk = gpd.read_file(temp.name).to_crs(epsg=4326)
df_walk.columns = map(str.lower, df_walk.columns)
df_walk.columns = [col.replace("sc", "_score") for col in df_walk.columns]
df_walk.rename(
columns={
"walkabilit": "walkability_rating",
"amenities": "amenities_score",
"transitacc": "transitaccess",
},
inplace=True,
)
df_walk = gpd.standardize_expand_geo(df_walk)
df_walk["year"] = "2017"

spark_df = spark_session.createDataFrame(df_walk)
spark_df.write.parquet(walk_key_warehouse)
Loading