From 68a9f9d800199ff4172d7aeffde7001513a5f756 Mon Sep 17 00:00:00 2001 From: Damonamajor Date: Tue, 7 May 2024 17:02:22 +0000 Subject: [PATCH 01/12] Add IHS Index --- dbt/models/model/housing_ihs-index.py | 45 +++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 dbt/models/model/housing_ihs-index.py diff --git a/dbt/models/model/housing_ihs-index.py b/dbt/models/model/housing_ihs-index.py new file mode 100644 index 000000000..68aebb7f5 --- /dev/null +++ b/dbt/models/model/housing_ihs-index.py @@ -0,0 +1,45 @@ +import pandas as pd # noqa: E402 +import requests # noqa: E402 +from openpyxl import load_workbook # noqa: E402 +import pyarrow as pa # noqa: E402 +import pyarrow.parquet as pq # noqa: E402 + +# URL of the webpage +url = "https://price-index.housingstudies.org" + +# Make a request to fetch the webpage content +response = requests.get(url) +content = response.text + +# Locate the Excel file link within the fetched HTML content +link_start = content.find('/data/') +link_end = content.find('.xlsx', link_start) + len('.xlsx') +xlsx_link = content[link_start:link_end] + +# Form the complete URL for the Excel file +most_recent_ihs_data_url = url + xlsx_link if xlsx_link.startswith('/') else xlsx_link + +# Print the URL +print(most_recent_ihs_data_url) + +# Download the Excel file +response = requests.get(most_recent_ihs_data_url) +with open('temp.xlsx', 'wb') as f: + f.write(response.content) + +# Load the Excel file using openpyxl +data = pd.read_excel(response.content, engine='openpyxl', sheet_name=1) + +data = data.drop(columns="Unnamed: 1") +data = data.drop(columns="Unnamed: 2") +data = data.drop(columns="Unnamed: 3") + +data = data.transpose() + +data = data.astype(str) + +data.reset_index(inplace=True) + +data = data.replace({'Unnamed: 0': 'puma', 'YEARQ': 'name'}) + +data.to_parquet('output.parquet') \ No newline at end of file From eada07971d0a58b6000a2f02049a3f1d25c9db5a Mon Sep 17 00:00:00 2001 From: Damonamajor Date: Tue, 7 May 2024 18:13:21 +0000 Subject: [PATCH 02/12] Move IHS --- .../scripts-ccao-data-warehouse-us-east-1/housing_ihs_index.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename dbt/models/model/housing_ihs-index.py => aws-s3/scripts-ccao-data-warehouse-us-east-1/housing_ihs_index.py (100%) diff --git a/dbt/models/model/housing_ihs-index.py b/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing_ihs_index.py similarity index 100% rename from dbt/models/model/housing_ihs-index.py rename to aws-s3/scripts-ccao-data-warehouse-us-east-1/housing_ihs_index.py From 5d3c3b5425e2db5eba9432b6be55988939ace125 Mon Sep 17 00:00:00 2001 From: Damonamajor Date: Tue, 7 May 2024 18:35:40 +0000 Subject: [PATCH 03/12] Add spatial-access --- .../spatial-access.py | 84 +++++++++++++++++++ .../spatial-access.py | 82 ++++++++++++++++++ 2 files changed, 166 insertions(+) create mode 100644 aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py create mode 100644 aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py diff --git a/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py b/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py new file mode 100644 index 000000000..5b4375328 --- /dev/null +++ b/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py @@ -0,0 +1,84 @@ +import os +import tempfile +import urllib.request +import shutil +import pandas as pd +import boto3 +from botocore.exceptions import ClientError +import geopandas as gpd + +from dotenv import load_dotenv + +dotenv_path = '.py.env' +load_result = load_dotenv(dotenv_path) + +load_dotenv("./data-architecture/aws-s3/py.env") +# Set AWS S3 variables +AWS_S3_RAW_BUCKET = os.getenv("AWS_S3_RAW_BUCKET") +output_bucket = os.path.join( + print(AWS_S3_RAW_BUCKET),"spatial", "access") + +# List APIs from city site +sources_list = [ + # INDUSTRIAL CORRIDORS + { + "source": "https://data.cityofchicago.org/api/geospatial/", + "api_url": "e6xh-nr8w?method=export&format=GeoJSON", + "boundary": "industrial_corridor", + "year": "2013" + }, + # PARKS + { + "source": "https://opendata.arcgis.com/datasets/", + "api_url": "74d19d6bd7f646ecb34c252ae17cd2f7_7.geojson", + "boundary": "park", + "year": "2021" + } +] + +# Function to call referenced API, pull requested data, and write it to S3 +def get_data_to_s3(source, api_url, boundary, year): + s3 = boto3.client('s3') + + response = urllib.request.urlopen(source + api_url) + data = response.read().decode('utf-8') + + s3_path = os.path.join(output_bucket, boundary, f"{year}.geojson") + + try: + s3.put_object(Bucket=os.environ['AWS_S3_RAW_BUCKET'], Key=s3_path, Body=data) + except ClientError as e: + print(e) + +for source in sources_list: + get_data_to_s3(**source) + +# CMAP WALKABILITY +# 2017 Data is no longer available online +raw_walk = pd.DataFrame({ + "url": ["https://services5.arcgis.com/LcMXE3TFhi1BSaCY/arcgis/rest/services/Walkability/FeatureServer/0/query?outFields=*&where=1%3D1&f=geojson"], + "year": ["2018"] +}) + +def get_walkability(url, year): + s3_uri = os.path.join(output_bucket, "walkability", f"{year}.geojson") + + if not os.path.exists(s3_uri): + tmp_file = tempfile.NamedTemporaryFile(suffix=".geojson", delete=False) + tmp_dir = os.path.join(tempfile.gettempdir(), "walkability") + + # Grab file from CTA, recompress without .htm file + with urllib.request.urlopen(url) as response, open(tmp_file.name, 'wb') as out_file: + shutil.copyfileobj(response, out_file) + + s3 = boto3.client('s3') + s3_path = os.path.join(output_bucket, "walkability", f"{year}.geojson") + + with open(tmp_file.name, 'rb') as file: + s3.upload_fileobj(file, os.environ['AWS_S3_RAW_BUCKET'], s3_path) + + os.unlink(tmp_file.name) + shutil.rmtree(tmp_dir, ignore_errors=True) + +for index, row in raw_walk.iterrows(): + get_walkability(row["url"], row["year"]) \ No newline at end of file diff --git a/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py b/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py new file mode 100644 index 000000000..646a95846 --- /dev/null +++ b/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py @@ -0,0 +1,82 @@ +import boto3 +import geopandas as gpd +import pyarrow.parquet as pq +import tempfile +import os +import datetime +from utils import standardize_expand_geo + +# Set up AWS credentials +AWS_S3_RAW_BUCKET = os.getenv("AWS_S3_RAW_BUCKET") +AWS_S3_WAREHOUSE_BUCKET = os.getenv("AWS_S3_WAREHOUSE_BUCKET") +s3 = boto3.client('s3') +current_year = datetime.strftime("%Y") + +# Helper function to save to S3 +def upload_to_s3(local_file, bucket, s3_key): + s3.upload_file(local_file, bucket, s3_key) + +# Helper function to check if an S3 object exists +def object_exists(bucket, key): + try: + s3.head_object(Bucket=bucket, Key=key) + return True + except: + return False + +##### BIKE TRAIL ##### +bike_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/bike_trail/2021.geojson" +bike_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/bike_trail/year=2021/part-0.parquet" + +if not object_exists(AWS_S3_WAREHOUSE_BUCKET, bike_key_warehouse): + with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: + s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/bike_trail/2021.geojson", temp.name) + df_bike = gpd.read_file(temp.name).to_crs(epsg=4326) + df_bike.columns = map(str.lower, df_bike.columns) + df_bike['geometry_3435'] = df_bike['geometry'].to_crs(epsg=3435) + df_bike = df_bike.rename(columns={ + 'spdlimit': 'speed_limit', + 'onstreet': 'on_street', + 'edtdate': 'edit_date', + 'trailwdth': 'trail_width', + 'trailtype': 'trail_type', + 'trailsurfa': 'trail_surface' + }).drop(columns=['created_us', 'shape_stle']) + pq.write_table(df_bike.to_parquet(), temp.name) + upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, bike_key_warehouse) + +# Similar structures follow for other datasets (e.g., cemetery, hospital, park, industrial corridor, and walkability). + +##### CEMETERY ##### +ceme_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/cemetery/2021.geojson" +ceme_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/cemetery/year=2021/part-0.parquet" + +if not object_exists(AWS_S3_WAREHOUSE_BUCKET, ceme_key_warehouse): + with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: + s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/cemetery/2021.geojson", temp.name) + df_ceme = gpd.read_file(temp.name).to_crs(epsg=4326) + df_ceme.columns = map(str.lower, df_ceme.columns) + df_ceme['geometry_3435'] = df_ceme['geometry'].to_crs(epsg=3435) + df_ceme = df_ceme.rename(columns={ + 'cfname': 'name' + }).loc[:, ['name', 'address', 'gniscode', 'source', 'community', 'comment', 'mergeid', 'geometry', 'geometry_3435']] + pq.write_table(df_ceme.to_parquet(), temp.name) + upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, ceme_key_warehouse) + +# And continue with other datasets... + +##### WALKABILITY ##### +walk_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/walkability/2017.geojson" +walk_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/walkability/year=2017/part-0.parquet" + +if not object_exists(AWS_S3_WAREHOUSE_BUCKET, walk_key_warehouse): + with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: + s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/walkability/2017.geojson", temp.name) + df_walk = gpd.read_file(temp.name).to_crs(epsg=4326) + df_walk.columns = map(str.lower, df_walk.columns) + df_walk.columns = [col.replace('sc', '_score') for col in df_walk.columns] + df_walk.rename(columns={'walkabilit': 'walkability_rating', 'amenities': 'amenities_score', 'transitacc': 'transitaccess'}, inplace=True) + df_walk = standardize_expand_geo(df_walk) + df_walk['year'] = '2017' + pq.write_table(df_walk.to_parquet(), temp.name) + upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, walk_key_warehouse) From 1ad02a01d1afa78c99f0026bc01967a7a17fc3d1 Mon Sep 17 00:00:00 2001 From: Damonamajor Date: Tue, 7 May 2024 19:03:56 +0000 Subject: [PATCH 04/12] Add second ihs index --- .../housing-ihs-index.py | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 aws-s3/scripts-ccao-data-warehouse-us-east-1/housing-ihs-index.py diff --git a/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing-ihs-index.py b/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing-ihs-index.py new file mode 100644 index 000000000..0d3600458 --- /dev/null +++ b/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing-ihs-index.py @@ -0,0 +1,59 @@ +import boto3 +import os +import re +import pandas as pd +import pyarrow.parquet as pq + +# Load environment variables +AWS_S3_RAW_BUCKET = os.getenv("AWS_S3_RAW_BUCKET") +AWS_S3_WAREHOUSE_BUCKET = os.getenv("AWS_S3_WAREHOUSE_BUCKET") +output_bucket = f"{AWS_S3_WAREHOUSE_BUCKET}/housing/ihs_index" + +# Initialize S3 client +s3 = boto3.client("s3") + +# Get the list of files from the raw S3 bucket +def get_bucket_keys(bucket, prefix): + response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) + if "Contents" in response: + return [content["Key"] for content in response["Contents"]] + return [] + +# Find the raw file +raw_keys = get_bucket_keys(AWS_S3_RAW_BUCKET, "housing/ihs_index/") +raw_file_key = raw_keys[0] if raw_keys else None + +if raw_file_key: + # Download the raw file locally + local_raw_file = "/tmp/ihs_index.parquet" + s3.download_file(AWS_S3_RAW_BUCKET, raw_file_key, local_raw_file) + + # Read the parquet file + df = pd.read_parquet(local_raw_file) + + # Rename columns and modify geoid + df.rename(columns={"puma": "geoid"}, inplace=True) + df["geoid"] = df["geoid"].apply(lambda x: re.sub(r"p", "170", x)) + + # Convert from wide to long + df_long = df.melt( + id_vars=["geoid", "name"], + var_name="time", + value_name="ihs_index" + ) + + # Split 'time' column into 'year' and 'quarter' + df_long[["year", "quarter"]] = df_long["time"].str.split("Q", expand=True) + df_long.drop(columns=["time"], inplace=True) + + # Reorder columns + df_long = df_long[["geoid", "name", "ihs_index", "quarter", "year"]] + +def model(dbt, spark_session): + dbt.config(materialized="table") + + input = dbt.ref("housing/ihs_index") + + spark_df = spark_session.createDataFrame(df) + + return spark_df \ No newline at end of file From e5c5b0176438e3315b428f7b0d05a3ef5f03f114 Mon Sep 17 00:00:00 2001 From: Damonamajor Date: Tue, 7 May 2024 19:14:54 +0000 Subject: [PATCH 05/12] Modify spatial access --- .../spatial-access.py | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py b/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py index 646a95846..d7be3810a 100644 --- a/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py +++ b/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py @@ -45,7 +45,46 @@ def object_exists(bucket, key): pq.write_table(df_bike.to_parquet(), temp.name) upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, bike_key_warehouse) -# Similar structures follow for other datasets (e.g., cemetery, hospital, park, industrial corridor, and walkability). +##### PARK ##### + +park_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/park/year=2021/part-0.parquet" + +if not object_exists(AWS_S3_WAREHOUSE_BUCKET, park_key_warehouse): + with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: + # Assuming you have the Cook County boundary file locally + cook_boundary_key = "spatial/ccao/county/2019.parquet" + s3.download_file(AWS_S3_WAREHOUSE_BUCKET, cook_boundary_key, temp.name) + cook_boundary = gpd.read_parquet(temp.name).to_crs(epsg=4326) + + # Replace with the actual park data processing steps or access a local GeoJSON file + parks_df = gpd.read_file("path/to/local/park_data.geojson").to_crs(epsg=4326) + + parks_df['geometry_3435'] = parks_df['geometry'].to_crs(epsg=3435) + parks_df_filtered = parks_df.loc[parks_df.intersects(cook_boundary.unary_union)] + + pq.write_table(parks_df_filtered.to_parquet(), temp.name) + upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, park_key_warehouse) + + +##### INDUSTRIAL CORRIDOR ##### + +indc_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/industrial_corridor/2013.geojson" +indc_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/industrial_corridor/year=2013/part-0.parquet" + +if not object_exists(AWS_S3_WAREHOUSE_BUCKET, indc_key_warehouse): + with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: + s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/industrial_corridor/2013.geojson", temp.name) + df_indc = gpd.read_file(temp.name).to_crs(epsg=4326) + df_indc.columns = map(str.lower, df_indc.columns) + df_indc['geometry_3435'] = df_indc['geometry'].to_crs(epsg=3435) + df_indc = df_indc.rename(columns={ + 'name': 'name', + 'region': 'region', + 'no': 'num', + 'hud_qualif': 'hud_qualif' + }).loc[:, ['name', 'region', 'num', 'hud_qualif', 'acres', 'geometry', 'geometry_3435']] + pq.write_table(df_indc.to_parquet(), temp.name) + upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, indc_key_warehouse) ##### CEMETERY ##### ceme_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/cemetery/2021.geojson" From 5bbda0e857ec0a9d5849513f63aa36c88299b39e Mon Sep 17 00:00:00 2001 From: Damonamajor Date: Tue, 7 May 2024 19:25:05 +0000 Subject: [PATCH 06/12] lintr --- .../spatial-access.py | 55 ++++--- .../housing-ihs-index.py | 12 +- .../housing_ihs_index.py | 20 +-- .../spatial-access.py | 135 +++++++++++++----- 4 files changed, 150 insertions(+), 72 deletions(-) diff --git a/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py b/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py index 5b4375328..e20323516 100644 --- a/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py +++ b/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py @@ -1,22 +1,21 @@ import os +import shutil import tempfile import urllib.request -import shutil -import pandas as pd + import boto3 -from botocore.exceptions import ClientError import geopandas as gpd - +import pandas as pd +from botocore.exceptions import ClientError from dotenv import load_dotenv -dotenv_path = '.py.env' +dotenv_path = ".py.env" load_result = load_dotenv(dotenv_path) load_dotenv("./data-architecture/aws-s3/py.env") # Set AWS S3 variables AWS_S3_RAW_BUCKET = os.getenv("AWS_S3_RAW_BUCKET") -output_bucket = os.path.join( - print(AWS_S3_RAW_BUCKET),"spatial", "access") +output_bucket = os.path.join(print(AWS_S3_RAW_BUCKET), "spatial", "access") # List APIs from city site sources_list = [ @@ -25,40 +24,49 @@ "source": "https://data.cityofchicago.org/api/geospatial/", "api_url": "e6xh-nr8w?method=export&format=GeoJSON", "boundary": "industrial_corridor", - "year": "2013" + "year": "2013", }, # PARKS { "source": "https://opendata.arcgis.com/datasets/", "api_url": "74d19d6bd7f646ecb34c252ae17cd2f7_7.geojson", "boundary": "park", - "year": "2021" - } + "year": "2021", + }, ] + # Function to call referenced API, pull requested data, and write it to S3 def get_data_to_s3(source, api_url, boundary, year): - s3 = boto3.client('s3') + s3 = boto3.client("s3") response = urllib.request.urlopen(source + api_url) - data = response.read().decode('utf-8') + data = response.read().decode("utf-8") s3_path = os.path.join(output_bucket, boundary, f"{year}.geojson") try: - s3.put_object(Bucket=os.environ['AWS_S3_RAW_BUCKET'], Key=s3_path, Body=data) + s3.put_object( + Bucket=os.environ["AWS_S3_RAW_BUCKET"], Key=s3_path, Body=data + ) except ClientError as e: print(e) + for source in sources_list: get_data_to_s3(**source) # CMAP WALKABILITY # 2017 Data is no longer available online -raw_walk = pd.DataFrame({ - "url": ["https://services5.arcgis.com/LcMXE3TFhi1BSaCY/arcgis/rest/services/Walkability/FeatureServer/0/query?outFields=*&where=1%3D1&f=geojson"], - "year": ["2018"] -}) +raw_walk = pd.DataFrame( + { + "url": [ + "https://services5.arcgis.com/LcMXE3TFhi1BSaCY/arcgis/rest/services/Walkability/FeatureServer/0/query?outFields=*&where=1%3D1&f=geojson" + ], + "year": ["2018"], + } +) + def get_walkability(url, year): s3_uri = os.path.join(output_bucket, "walkability", f"{year}.geojson") @@ -68,17 +76,20 @@ def get_walkability(url, year): tmp_dir = os.path.join(tempfile.gettempdir(), "walkability") # Grab file from CTA, recompress without .htm file - with urllib.request.urlopen(url) as response, open(tmp_file.name, 'wb') as out_file: + with urllib.request.urlopen(url) as response, open( + tmp_file.name, "wb" + ) as out_file: shutil.copyfileobj(response, out_file) - s3 = boto3.client('s3') + s3 = boto3.client("s3") s3_path = os.path.join(output_bucket, "walkability", f"{year}.geojson") - with open(tmp_file.name, 'rb') as file: - s3.upload_fileobj(file, os.environ['AWS_S3_RAW_BUCKET'], s3_path) + with open(tmp_file.name, "rb") as file: + s3.upload_fileobj(file, os.environ["AWS_S3_RAW_BUCKET"], s3_path) os.unlink(tmp_file.name) shutil.rmtree(tmp_dir, ignore_errors=True) + for index, row in raw_walk.iterrows(): - get_walkability(row["url"], row["year"]) \ No newline at end of file + get_walkability(row["url"], row["year"]) diff --git a/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing-ihs-index.py b/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing-ihs-index.py index 0d3600458..45f11bddd 100644 --- a/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing-ihs-index.py +++ b/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing-ihs-index.py @@ -1,6 +1,7 @@ -import boto3 import os import re + +import boto3 import pandas as pd import pyarrow.parquet as pq @@ -12,6 +13,7 @@ # Initialize S3 client s3 = boto3.client("s3") + # Get the list of files from the raw S3 bucket def get_bucket_keys(bucket, prefix): response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) @@ -19,6 +21,7 @@ def get_bucket_keys(bucket, prefix): return [content["Key"] for content in response["Contents"]] return [] + # Find the raw file raw_keys = get_bucket_keys(AWS_S3_RAW_BUCKET, "housing/ihs_index/") raw_file_key = raw_keys[0] if raw_keys else None @@ -37,9 +40,7 @@ def get_bucket_keys(bucket, prefix): # Convert from wide to long df_long = df.melt( - id_vars=["geoid", "name"], - var_name="time", - value_name="ihs_index" + id_vars=["geoid", "name"], var_name="time", value_name="ihs_index" ) # Split 'time' column into 'year' and 'quarter' @@ -49,6 +50,7 @@ def get_bucket_keys(bucket, prefix): # Reorder columns df_long = df_long[["geoid", "name", "ihs_index", "quarter", "year"]] + def model(dbt, spark_session): dbt.config(materialized="table") @@ -56,4 +58,4 @@ def model(dbt, spark_session): spark_df = spark_session.createDataFrame(df) - return spark_df \ No newline at end of file + return spark_df diff --git a/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing_ihs_index.py b/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing_ihs_index.py index 68aebb7f5..c4e0ea069 100644 --- a/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing_ihs_index.py +++ b/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing_ihs_index.py @@ -1,8 +1,8 @@ import pandas as pd # noqa: E402 -import requests # noqa: E402 -from openpyxl import load_workbook # noqa: E402 import pyarrow as pa # noqa: E402 import pyarrow.parquet as pq # noqa: E402 +import requests # noqa: E402 +from openpyxl import load_workbook # noqa: E402 # URL of the webpage url = "https://price-index.housingstudies.org" @@ -12,23 +12,25 @@ content = response.text # Locate the Excel file link within the fetched HTML content -link_start = content.find('/data/') -link_end = content.find('.xlsx', link_start) + len('.xlsx') +link_start = content.find("/data/") +link_end = content.find(".xlsx", link_start) + len(".xlsx") xlsx_link = content[link_start:link_end] # Form the complete URL for the Excel file -most_recent_ihs_data_url = url + xlsx_link if xlsx_link.startswith('/') else xlsx_link +most_recent_ihs_data_url = ( + url + xlsx_link if xlsx_link.startswith("/") else xlsx_link +) # Print the URL print(most_recent_ihs_data_url) # Download the Excel file response = requests.get(most_recent_ihs_data_url) -with open('temp.xlsx', 'wb') as f: +with open("temp.xlsx", "wb") as f: f.write(response.content) # Load the Excel file using openpyxl -data = pd.read_excel(response.content, engine='openpyxl', sheet_name=1) +data = pd.read_excel(response.content, engine="openpyxl", sheet_name=1) data = data.drop(columns="Unnamed: 1") data = data.drop(columns="Unnamed: 2") @@ -40,6 +42,6 @@ data.reset_index(inplace=True) -data = data.replace({'Unnamed: 0': 'puma', 'YEARQ': 'name'}) +data = data.replace({"Unnamed: 0": "puma", "YEARQ": "name"}) -data.to_parquet('output.parquet') \ No newline at end of file +data.to_parquet("output.parquet") diff --git a/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py b/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py index d7be3810a..6930969c8 100644 --- a/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py +++ b/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py @@ -1,21 +1,24 @@ +import datetime +import os +import tempfile + import boto3 import geopandas as gpd import pyarrow.parquet as pq -import tempfile -import os -import datetime from utils import standardize_expand_geo # Set up AWS credentials AWS_S3_RAW_BUCKET = os.getenv("AWS_S3_RAW_BUCKET") AWS_S3_WAREHOUSE_BUCKET = os.getenv("AWS_S3_WAREHOUSE_BUCKET") -s3 = boto3.client('s3') +s3 = boto3.client("s3") current_year = datetime.strftime("%Y") + # Helper function to save to S3 def upload_to_s3(local_file, bucket, s3_key): s3.upload_file(local_file, bucket, s3_key) + # Helper function to check if an S3 object exists def object_exists(bucket, key): try: @@ -24,30 +27,39 @@ def object_exists(bucket, key): except: return False + ##### BIKE TRAIL ##### bike_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/bike_trail/2021.geojson" bike_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/bike_trail/year=2021/part-0.parquet" if not object_exists(AWS_S3_WAREHOUSE_BUCKET, bike_key_warehouse): with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: - s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/bike_trail/2021.geojson", temp.name) + s3.download_file( + AWS_S3_RAW_BUCKET, + "spatial/access/bike_trail/2021.geojson", + temp.name, + ) df_bike = gpd.read_file(temp.name).to_crs(epsg=4326) df_bike.columns = map(str.lower, df_bike.columns) - df_bike['geometry_3435'] = df_bike['geometry'].to_crs(epsg=3435) - df_bike = df_bike.rename(columns={ - 'spdlimit': 'speed_limit', - 'onstreet': 'on_street', - 'edtdate': 'edit_date', - 'trailwdth': 'trail_width', - 'trailtype': 'trail_type', - 'trailsurfa': 'trail_surface' - }).drop(columns=['created_us', 'shape_stle']) + df_bike["geometry_3435"] = df_bike["geometry"].to_crs(epsg=3435) + df_bike = df_bike.rename( + columns={ + "spdlimit": "speed_limit", + "onstreet": "on_street", + "edtdate": "edit_date", + "trailwdth": "trail_width", + "trailtype": "trail_type", + "trailsurfa": "trail_surface", + } + ).drop(columns=["created_us", "shape_stle"]) pq.write_table(df_bike.to_parquet(), temp.name) upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, bike_key_warehouse) ##### PARK ##### -park_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/park/year=2021/part-0.parquet" +park_key_warehouse = ( + f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/park/year=2021/part-0.parquet" +) if not object_exists(AWS_S3_WAREHOUSE_BUCKET, park_key_warehouse): with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: @@ -57,10 +69,14 @@ def object_exists(bucket, key): cook_boundary = gpd.read_parquet(temp.name).to_crs(epsg=4326) # Replace with the actual park data processing steps or access a local GeoJSON file - parks_df = gpd.read_file("path/to/local/park_data.geojson").to_crs(epsg=4326) + parks_df = gpd.read_file("path/to/local/park_data.geojson").to_crs( + epsg=4326 + ) - parks_df['geometry_3435'] = parks_df['geometry'].to_crs(epsg=3435) - parks_df_filtered = parks_df.loc[parks_df.intersects(cook_boundary.unary_union)] + parks_df["geometry_3435"] = parks_df["geometry"].to_crs(epsg=3435) + parks_df_filtered = parks_df.loc[ + parks_df.intersects(cook_boundary.unary_union) + ] pq.write_table(parks_df_filtered.to_parquet(), temp.name) upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, park_key_warehouse) @@ -68,21 +84,40 @@ def object_exists(bucket, key): ##### INDUSTRIAL CORRIDOR ##### -indc_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/industrial_corridor/2013.geojson" +indc_key_raw = ( + f"{AWS_S3_RAW_BUCKET}/spatial/access/industrial_corridor/2013.geojson" +) indc_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/industrial_corridor/year=2013/part-0.parquet" if not object_exists(AWS_S3_WAREHOUSE_BUCKET, indc_key_warehouse): with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: - s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/industrial_corridor/2013.geojson", temp.name) + s3.download_file( + AWS_S3_RAW_BUCKET, + "spatial/access/industrial_corridor/2013.geojson", + temp.name, + ) df_indc = gpd.read_file(temp.name).to_crs(epsg=4326) df_indc.columns = map(str.lower, df_indc.columns) - df_indc['geometry_3435'] = df_indc['geometry'].to_crs(epsg=3435) - df_indc = df_indc.rename(columns={ - 'name': 'name', - 'region': 'region', - 'no': 'num', - 'hud_qualif': 'hud_qualif' - }).loc[:, ['name', 'region', 'num', 'hud_qualif', 'acres', 'geometry', 'geometry_3435']] + df_indc["geometry_3435"] = df_indc["geometry"].to_crs(epsg=3435) + df_indc = df_indc.rename( + columns={ + "name": "name", + "region": "region", + "no": "num", + "hud_qualif": "hud_qualif", + } + ).loc[ + :, + [ + "name", + "region", + "num", + "hud_qualif", + "acres", + "geometry", + "geometry_3435", + ], + ] pq.write_table(df_indc.to_parquet(), temp.name) upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, indc_key_warehouse) @@ -92,13 +127,28 @@ def object_exists(bucket, key): if not object_exists(AWS_S3_WAREHOUSE_BUCKET, ceme_key_warehouse): with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: - s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/cemetery/2021.geojson", temp.name) + s3.download_file( + AWS_S3_RAW_BUCKET, + "spatial/access/cemetery/2021.geojson", + temp.name, + ) df_ceme = gpd.read_file(temp.name).to_crs(epsg=4326) df_ceme.columns = map(str.lower, df_ceme.columns) - df_ceme['geometry_3435'] = df_ceme['geometry'].to_crs(epsg=3435) - df_ceme = df_ceme.rename(columns={ - 'cfname': 'name' - }).loc[:, ['name', 'address', 'gniscode', 'source', 'community', 'comment', 'mergeid', 'geometry', 'geometry_3435']] + df_ceme["geometry_3435"] = df_ceme["geometry"].to_crs(epsg=3435) + df_ceme = df_ceme.rename(columns={"cfname": "name"}).loc[ + :, + [ + "name", + "address", + "gniscode", + "source", + "community", + "comment", + "mergeid", + "geometry", + "geometry_3435", + ], + ] pq.write_table(df_ceme.to_parquet(), temp.name) upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, ceme_key_warehouse) @@ -110,12 +160,25 @@ def object_exists(bucket, key): if not object_exists(AWS_S3_WAREHOUSE_BUCKET, walk_key_warehouse): with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: - s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/walkability/2017.geojson", temp.name) + s3.download_file( + AWS_S3_RAW_BUCKET, + "spatial/access/walkability/2017.geojson", + temp.name, + ) df_walk = gpd.read_file(temp.name).to_crs(epsg=4326) df_walk.columns = map(str.lower, df_walk.columns) - df_walk.columns = [col.replace('sc', '_score') for col in df_walk.columns] - df_walk.rename(columns={'walkabilit': 'walkability_rating', 'amenities': 'amenities_score', 'transitacc': 'transitaccess'}, inplace=True) + df_walk.columns = [ + col.replace("sc", "_score") for col in df_walk.columns + ] + df_walk.rename( + columns={ + "walkabilit": "walkability_rating", + "amenities": "amenities_score", + "transitacc": "transitaccess", + }, + inplace=True, + ) df_walk = standardize_expand_geo(df_walk) - df_walk['year'] = '2017' + df_walk["year"] = "2017" pq.write_table(df_walk.to_parquet(), temp.name) upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, walk_key_warehouse) From f7edef7839e6a28793c5d451030613fd17f10731 Mon Sep 17 00:00:00 2001 From: Damonamajor Date: Tue, 7 May 2024 19:40:20 +0000 Subject: [PATCH 07/12] black --- .../spatial-access.py | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py b/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py index 6930969c8..1c567cf11 100644 --- a/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py +++ b/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py @@ -30,7 +30,9 @@ def object_exists(bucket, key): ##### BIKE TRAIL ##### bike_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/bike_trail/2021.geojson" -bike_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/bike_trail/year=2021/part-0.parquet" +bike_key_warehouse = ( + f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/bike_trail/year=2021/part-0.parquet" +) if not object_exists(AWS_S3_WAREHOUSE_BUCKET, bike_key_warehouse): with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: @@ -69,14 +71,10 @@ def object_exists(bucket, key): cook_boundary = gpd.read_parquet(temp.name).to_crs(epsg=4326) # Replace with the actual park data processing steps or access a local GeoJSON file - parks_df = gpd.read_file("path/to/local/park_data.geojson").to_crs( - epsg=4326 - ) + parks_df = gpd.read_file("path/to/local/park_data.geojson").to_crs(epsg=4326) parks_df["geometry_3435"] = parks_df["geometry"].to_crs(epsg=3435) - parks_df_filtered = parks_df.loc[ - parks_df.intersects(cook_boundary.unary_union) - ] + parks_df_filtered = parks_df.loc[parks_df.intersects(cook_boundary.unary_union)] pq.write_table(parks_df_filtered.to_parquet(), temp.name) upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, park_key_warehouse) @@ -84,9 +82,7 @@ def object_exists(bucket, key): ##### INDUSTRIAL CORRIDOR ##### -indc_key_raw = ( - f"{AWS_S3_RAW_BUCKET}/spatial/access/industrial_corridor/2013.geojson" -) +indc_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/industrial_corridor/2013.geojson" indc_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/industrial_corridor/year=2013/part-0.parquet" if not object_exists(AWS_S3_WAREHOUSE_BUCKET, indc_key_warehouse): @@ -123,7 +119,9 @@ def object_exists(bucket, key): ##### CEMETERY ##### ceme_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/cemetery/2021.geojson" -ceme_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/cemetery/year=2021/part-0.parquet" +ceme_key_warehouse = ( + f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/cemetery/year=2021/part-0.parquet" +) if not object_exists(AWS_S3_WAREHOUSE_BUCKET, ceme_key_warehouse): with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: @@ -156,7 +154,9 @@ def object_exists(bucket, key): ##### WALKABILITY ##### walk_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/walkability/2017.geojson" -walk_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/walkability/year=2017/part-0.parquet" +walk_key_warehouse = ( + f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/walkability/year=2017/part-0.parquet" +) if not object_exists(AWS_S3_WAREHOUSE_BUCKET, walk_key_warehouse): with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: @@ -167,9 +167,7 @@ def object_exists(bucket, key): ) df_walk = gpd.read_file(temp.name).to_crs(epsg=4326) df_walk.columns = map(str.lower, df_walk.columns) - df_walk.columns = [ - col.replace("sc", "_score") for col in df_walk.columns - ] + df_walk.columns = [col.replace("sc", "_score") for col in df_walk.columns] df_walk.rename( columns={ "walkabilit": "walkability_rating", From e77d86e68630ef0c0bd2d4c27615f4d65eff70fa Mon Sep 17 00:00:00 2001 From: Damonamajor Date: Tue, 7 May 2024 19:42:20 +0000 Subject: [PATCH 08/12] Black --- .../housing-ihs_index.py} | 4 +--- aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) rename aws-s3/{scripts-ccao-data-warehouse-us-east-1/housing_ihs_index.py => scripts-ccao-data-raw-us-east-1/housing-ihs_index.py} (92%) diff --git a/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing_ihs_index.py b/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py similarity index 92% rename from aws-s3/scripts-ccao-data-warehouse-us-east-1/housing_ihs_index.py rename to aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py index c4e0ea069..1132ec53c 100644 --- a/aws-s3/scripts-ccao-data-warehouse-us-east-1/housing_ihs_index.py +++ b/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py @@ -17,9 +17,7 @@ xlsx_link = content[link_start:link_end] # Form the complete URL for the Excel file -most_recent_ihs_data_url = ( - url + xlsx_link if xlsx_link.startswith("/") else xlsx_link -) +most_recent_ihs_data_url = url + xlsx_link if xlsx_link.startswith("/") else xlsx_link # Print the URL print(most_recent_ihs_data_url) diff --git a/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py b/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py index e20323516..227d5ecea 100644 --- a/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py +++ b/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py @@ -46,9 +46,7 @@ def get_data_to_s3(source, api_url, boundary, year): s3_path = os.path.join(output_bucket, boundary, f"{year}.geojson") try: - s3.put_object( - Bucket=os.environ["AWS_S3_RAW_BUCKET"], Key=s3_path, Body=data - ) + s3.put_object(Bucket=os.environ["AWS_S3_RAW_BUCKET"], Key=s3_path, Body=data) except ClientError as e: print(e) From 33ec6576dcfe28de7d77fb73e854ae411ee95483 Mon Sep 17 00:00:00 2001 From: Damonamajor Date: Tue, 7 May 2024 19:50:08 +0000 Subject: [PATCH 09/12] lintr --- .../housing-ihs_index.py | 4 ++- .../spatial-access.py | 4 ++- .../spatial-access.py | 28 ++++++++++--------- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py b/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py index 1132ec53c..c4e0ea069 100644 --- a/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py +++ b/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py @@ -17,7 +17,9 @@ xlsx_link = content[link_start:link_end] # Form the complete URL for the Excel file -most_recent_ihs_data_url = url + xlsx_link if xlsx_link.startswith("/") else xlsx_link +most_recent_ihs_data_url = ( + url + xlsx_link if xlsx_link.startswith("/") else xlsx_link +) # Print the URL print(most_recent_ihs_data_url) diff --git a/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py b/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py index 227d5ecea..e20323516 100644 --- a/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py +++ b/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py @@ -46,7 +46,9 @@ def get_data_to_s3(source, api_url, boundary, year): s3_path = os.path.join(output_bucket, boundary, f"{year}.geojson") try: - s3.put_object(Bucket=os.environ["AWS_S3_RAW_BUCKET"], Key=s3_path, Body=data) + s3.put_object( + Bucket=os.environ["AWS_S3_RAW_BUCKET"], Key=s3_path, Body=data + ) except ClientError as e: print(e) diff --git a/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py b/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py index 1c567cf11..6930969c8 100644 --- a/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py +++ b/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py @@ -30,9 +30,7 @@ def object_exists(bucket, key): ##### BIKE TRAIL ##### bike_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/bike_trail/2021.geojson" -bike_key_warehouse = ( - f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/bike_trail/year=2021/part-0.parquet" -) +bike_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/bike_trail/year=2021/part-0.parquet" if not object_exists(AWS_S3_WAREHOUSE_BUCKET, bike_key_warehouse): with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: @@ -71,10 +69,14 @@ def object_exists(bucket, key): cook_boundary = gpd.read_parquet(temp.name).to_crs(epsg=4326) # Replace with the actual park data processing steps or access a local GeoJSON file - parks_df = gpd.read_file("path/to/local/park_data.geojson").to_crs(epsg=4326) + parks_df = gpd.read_file("path/to/local/park_data.geojson").to_crs( + epsg=4326 + ) parks_df["geometry_3435"] = parks_df["geometry"].to_crs(epsg=3435) - parks_df_filtered = parks_df.loc[parks_df.intersects(cook_boundary.unary_union)] + parks_df_filtered = parks_df.loc[ + parks_df.intersects(cook_boundary.unary_union) + ] pq.write_table(parks_df_filtered.to_parquet(), temp.name) upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, park_key_warehouse) @@ -82,7 +84,9 @@ def object_exists(bucket, key): ##### INDUSTRIAL CORRIDOR ##### -indc_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/industrial_corridor/2013.geojson" +indc_key_raw = ( + f"{AWS_S3_RAW_BUCKET}/spatial/access/industrial_corridor/2013.geojson" +) indc_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/industrial_corridor/year=2013/part-0.parquet" if not object_exists(AWS_S3_WAREHOUSE_BUCKET, indc_key_warehouse): @@ -119,9 +123,7 @@ def object_exists(bucket, key): ##### CEMETERY ##### ceme_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/cemetery/2021.geojson" -ceme_key_warehouse = ( - f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/cemetery/year=2021/part-0.parquet" -) +ceme_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/cemetery/year=2021/part-0.parquet" if not object_exists(AWS_S3_WAREHOUSE_BUCKET, ceme_key_warehouse): with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: @@ -154,9 +156,7 @@ def object_exists(bucket, key): ##### WALKABILITY ##### walk_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/walkability/2017.geojson" -walk_key_warehouse = ( - f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/walkability/year=2017/part-0.parquet" -) +walk_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/walkability/year=2017/part-0.parquet" if not object_exists(AWS_S3_WAREHOUSE_BUCKET, walk_key_warehouse): with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: @@ -167,7 +167,9 @@ def object_exists(bucket, key): ) df_walk = gpd.read_file(temp.name).to_crs(epsg=4326) df_walk.columns = map(str.lower, df_walk.columns) - df_walk.columns = [col.replace("sc", "_score") for col in df_walk.columns] + df_walk.columns = [ + col.replace("sc", "_score") for col in df_walk.columns + ] df_walk.rename( columns={ "walkabilit": "walkability_rating", From 52df06657663f8116ed28602e209dda63339ba4f Mon Sep 17 00:00:00 2001 From: Damonamajor Date: Tue, 7 May 2024 21:42:17 +0000 Subject: [PATCH 10/12] Add sparksession --- .../housing-ihs_index.py | 15 +- .../spatial-access.py | 268 +++++++----------- 2 files changed, 122 insertions(+), 161 deletions(-) diff --git a/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py b/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py index c4e0ea069..2f4d0190b 100644 --- a/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py +++ b/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py @@ -17,9 +17,7 @@ xlsx_link = content[link_start:link_end] # Form the complete URL for the Excel file -most_recent_ihs_data_url = ( - url + xlsx_link if xlsx_link.startswith("/") else xlsx_link -) +most_recent_ihs_data_url = url + xlsx_link if xlsx_link.startswith("/") else xlsx_link # Print the URL print(most_recent_ihs_data_url) @@ -32,6 +30,8 @@ # Load the Excel file using openpyxl data = pd.read_excel(response.content, engine="openpyxl", sheet_name=1) +data = data.toPandas() + data = data.drop(columns="Unnamed: 1") data = data.drop(columns="Unnamed: 2") data = data.drop(columns="Unnamed: 3") @@ -45,3 +45,12 @@ data = data.replace({"Unnamed: 0": "puma", "YEARQ": "name"}) data.to_parquet("output.parquet") + +def model(dbt, spark_session): + dbt.config(materialized="table") + + input = dbt.ref("ihs_housing_input") + + spark_df = spark_session.createDataFrame(data) + + return spark_df \ No newline at end of file diff --git a/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py b/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py index 6930969c8..19cbcbfdb 100644 --- a/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py +++ b/aws-s3/scripts-ccao-data-warehouse-us-east-1/spatial-access.py @@ -1,17 +1,17 @@ import datetime import os +from datetime import datetime import tempfile - import boto3 import geopandas as gpd import pyarrow.parquet as pq -from utils import standardize_expand_geo # Set up AWS credentials AWS_S3_RAW_BUCKET = os.getenv("AWS_S3_RAW_BUCKET") AWS_S3_WAREHOUSE_BUCKET = os.getenv("AWS_S3_WAREHOUSE_BUCKET") s3 = boto3.client("s3") -current_year = datetime.strftime("%Y") + +current_year = datetime.now().strftime("%Y") # Helper function to save to S3 @@ -27,158 +27,110 @@ def object_exists(bucket, key): except: return False - -##### BIKE TRAIL ##### -bike_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/bike_trail/2021.geojson" -bike_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/bike_trail/year=2021/part-0.parquet" - -if not object_exists(AWS_S3_WAREHOUSE_BUCKET, bike_key_warehouse): - with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: - s3.download_file( - AWS_S3_RAW_BUCKET, - "spatial/access/bike_trail/2021.geojson", - temp.name, - ) - df_bike = gpd.read_file(temp.name).to_crs(epsg=4326) - df_bike.columns = map(str.lower, df_bike.columns) - df_bike["geometry_3435"] = df_bike["geometry"].to_crs(epsg=3435) - df_bike = df_bike.rename( - columns={ - "spdlimit": "speed_limit", - "onstreet": "on_street", - "edtdate": "edit_date", - "trailwdth": "trail_width", - "trailtype": "trail_type", - "trailsurfa": "trail_surface", - } - ).drop(columns=["created_us", "shape_stle"]) - pq.write_table(df_bike.to_parquet(), temp.name) - upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, bike_key_warehouse) - -##### PARK ##### - -park_key_warehouse = ( - f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/park/year=2021/part-0.parquet" -) - -if not object_exists(AWS_S3_WAREHOUSE_BUCKET, park_key_warehouse): - with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: - # Assuming you have the Cook County boundary file locally - cook_boundary_key = "spatial/ccao/county/2019.parquet" - s3.download_file(AWS_S3_WAREHOUSE_BUCKET, cook_boundary_key, temp.name) - cook_boundary = gpd.read_parquet(temp.name).to_crs(epsg=4326) - - # Replace with the actual park data processing steps or access a local GeoJSON file - parks_df = gpd.read_file("path/to/local/park_data.geojson").to_crs( - epsg=4326 - ) - - parks_df["geometry_3435"] = parks_df["geometry"].to_crs(epsg=3435) - parks_df_filtered = parks_df.loc[ - parks_df.intersects(cook_boundary.unary_union) - ] - - pq.write_table(parks_df_filtered.to_parquet(), temp.name) - upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, park_key_warehouse) - - -##### INDUSTRIAL CORRIDOR ##### - -indc_key_raw = ( - f"{AWS_S3_RAW_BUCKET}/spatial/access/industrial_corridor/2013.geojson" -) -indc_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/industrial_corridor/year=2013/part-0.parquet" - -if not object_exists(AWS_S3_WAREHOUSE_BUCKET, indc_key_warehouse): - with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: - s3.download_file( - AWS_S3_RAW_BUCKET, - "spatial/access/industrial_corridor/2013.geojson", - temp.name, - ) - df_indc = gpd.read_file(temp.name).to_crs(epsg=4326) - df_indc.columns = map(str.lower, df_indc.columns) - df_indc["geometry_3435"] = df_indc["geometry"].to_crs(epsg=3435) - df_indc = df_indc.rename( - columns={ - "name": "name", - "region": "region", - "no": "num", - "hud_qualif": "hud_qualif", - } - ).loc[ - :, - [ - "name", - "region", - "num", - "hud_qualif", - "acres", - "geometry", - "geometry_3435", - ], - ] - pq.write_table(df_indc.to_parquet(), temp.name) - upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, indc_key_warehouse) - -##### CEMETERY ##### -ceme_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/cemetery/2021.geojson" -ceme_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/cemetery/year=2021/part-0.parquet" - -if not object_exists(AWS_S3_WAREHOUSE_BUCKET, ceme_key_warehouse): - with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: - s3.download_file( - AWS_S3_RAW_BUCKET, - "spatial/access/cemetery/2021.geojson", - temp.name, - ) - df_ceme = gpd.read_file(temp.name).to_crs(epsg=4326) - df_ceme.columns = map(str.lower, df_ceme.columns) - df_ceme["geometry_3435"] = df_ceme["geometry"].to_crs(epsg=3435) - df_ceme = df_ceme.rename(columns={"cfname": "name"}).loc[ - :, - [ - "name", - "address", - "gniscode", - "source", - "community", - "comment", - "mergeid", - "geometry", - "geometry_3435", - ], - ] - pq.write_table(df_ceme.to_parquet(), temp.name) - upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, ceme_key_warehouse) - -# And continue with other datasets... - -##### WALKABILITY ##### -walk_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/walkability/2017.geojson" -walk_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/walkability/year=2017/part-0.parquet" - -if not object_exists(AWS_S3_WAREHOUSE_BUCKET, walk_key_warehouse): - with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: - s3.download_file( - AWS_S3_RAW_BUCKET, - "spatial/access/walkability/2017.geojson", - temp.name, - ) - df_walk = gpd.read_file(temp.name).to_crs(epsg=4326) - df_walk.columns = map(str.lower, df_walk.columns) - df_walk.columns = [ - col.replace("sc", "_score") for col in df_walk.columns - ] - df_walk.rename( - columns={ - "walkabilit": "walkability_rating", - "amenities": "amenities_score", - "transitacc": "transitaccess", - }, - inplace=True, - ) - df_walk = standardize_expand_geo(df_walk) - df_walk["year"] = "2017" - pq.write_table(df_walk.to_parquet(), temp.name) - upload_to_s3(temp.name, AWS_S3_WAREHOUSE_BUCKET, walk_key_warehouse) +# Bike Trail Data Processing +def process_bike_trail(spark_session): + bike_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/bike_trail/2021.geojson" + bike_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/bike_trail/year=2021/part-0.parquet" + + if not object_exists(AWS_S3_WAREHOUSE_BUCKET, bike_key_warehouse): + with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: + s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/bike_trail/2021.geojson", temp.name) + df_bike = gpd.read_file(temp.name).to_crs(epsg=4326) + df_bike.columns = map(str.lower, df_bike.columns) + df_bike["geometry_3435"] = df_bike["geometry"].to_crs(epsg=3435) + df_bike = df_bike.rename( + columns={ + "spdlimit": "speed_limit", + "onstreet": "on_street", + "edtdate": "edit_date", + "trailwdth": "trail_width", + "trailtype": "trail_type", + "trailsurfa": "trail_surface", + } + ).drop(columns=["created_us", "shape_stle"]) + + spark_df = spark_session.createDataFrame(df_bike) + spark_df.write.parquet(bike_key_warehouse) + +# Park Data Processing +def process_parks(spark_session): + park_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/park/year=2021/part-0.parquet" + + if not object_exists(AWS_S3_WAREHOUSE_BUCKET, park_key_warehouse): + with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: + cook_boundary_key = "spatial/ccao/county/2019.parquet" + s3.download_file(AWS_S3_WAREHOUSE_BUCKET, cook_boundary_key, temp.name) + cook_boundary = gpd.read_parquet(temp.name).to_crs(epsg=4326) + + parks_df = gpd.read_file("path/to/local/park_data.geojson").to_crs(epsg=4326) + parks_df["geometry_3435"] = parks_df["geometry"].to_crs(epsg=3435) + parks_df_filtered = parks_df.loc[parks_df.intersects(cook_boundary.unary_union)] + + spark_df = spark_session.createDataFrame(parks_df_filtered) + spark_df.write.parquet(park_key_warehouse) + +# Industrial Corridor Data Processing +def process_industrial_corridor(spark_session): + indc_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/industrial_corridor/2013.geojson" + indc_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/industrial_corridor/year=2013/part-0.parquet" + + if not object_exists(AWS_S3_WAREHOUSE_BUCKET, indc_key_warehouse): + with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: + s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/industrial_corridor/2013.geojson", temp.name) + df_indc = gpd.read_file(temp.name).to_crs(epsg=4326) + df_indc.columns = map(str.lower, df_indc.columns) + df_indc["geometry_3435"] = df_indc["geometry"].to_crs(epsg=3435) + df_indc = df_indc.rename( + columns={ + "name": "name", + "region": "region", + "no": "num", + "hud_qualif": "hud_qualif", + } + ).loc[:, ["name", "region", "num", "hud_qualif", "acres", "geometry", "geometry_3435"]] + + spark_df = spark_session.createDataFrame(df_indc) + spark_df.write.parquet(indc_key_warehouse) + +# Cemetery Data Processing +def process_cemetery(spark_session): + ceme_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/cemetery/2021.geojson" + ceme_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/cemetery/year=2021/part-0.parquet" + + if not object_exists(AWS_S3_WAREHOUSE_BUCKET, ceme_key_warehouse): + with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: + s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/cemetery/2021.geojson", temp.name) + df_ceme = gpd.read_file(temp.name).to_crs(epsg=4326) + df_ceme.columns = map(str.lower, df_ceme.columns) + df_ceme["geometry_3435"] = df_ceme["geometry"].to_crs(epsg=3435) + df_ceme = df_ceme.rename(columns={"cfname": "name"}).loc[ + :, ["name", "address", "gniscode", "source", "community", "comment", "mergeid", "geometry", "geometry_3435"] + ] + + spark_df = spark_session.createDataFrame(df_ceme) + spark_df.write.parquet(ceme_key_warehouse) + +# Walkability Data Processing +def process_walkability(spark_session): + walk_key_raw = f"{AWS_S3_RAW_BUCKET}/spatial/access/walkability/2017.geojson" + walk_key_warehouse = f"{AWS_S3_WAREHOUSE_BUCKET}/spatial/access/walkability/year=2017/part-0.parquet" + + if not object_exists(AWS_S3_WAREHOUSE_BUCKET, walk_key_warehouse): + with tempfile.NamedTemporaryFile(suffix=".geojson") as temp: + s3.download_file(AWS_S3_RAW_BUCKET, "spatial/access/walkability/2017.geojson", temp.name) + df_walk = gpd.read_file(temp.name).to_crs(epsg=4326) + df_walk.columns = map(str.lower, df_walk.columns) + df_walk.columns = [col.replace("sc", "_score") for col in df_walk.columns] + df_walk.rename( + columns={ + "walkabilit": "walkability_rating", + "amenities": "amenities_score", + "transitacc": "transitaccess", + }, + inplace=True, + ) + df_walk = gpd.standardize_expand_geo(df_walk) + df_walk["year"] = "2017" + + spark_df = spark_session.createDataFrame(df_walk) + spark_df.write.parquet(walk_key_warehouse) From 278cbfc77cc8259f313dfa86360fd76a451cf4b6 Mon Sep 17 00:00:00 2001 From: Damonamajor Date: Tue, 7 May 2024 21:52:37 +0000 Subject: [PATCH 11/12] black --- aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py | 3 ++- aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py | 4 +--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py b/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py index 2f4d0190b..5d71abf99 100644 --- a/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py +++ b/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py @@ -46,6 +46,7 @@ data.to_parquet("output.parquet") + def model(dbt, spark_session): dbt.config(materialized="table") @@ -53,4 +54,4 @@ def model(dbt, spark_session): spark_df = spark_session.createDataFrame(data) - return spark_df \ No newline at end of file + return spark_df diff --git a/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py b/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py index e20323516..227d5ecea 100644 --- a/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py +++ b/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py @@ -46,9 +46,7 @@ def get_data_to_s3(source, api_url, boundary, year): s3_path = os.path.join(output_bucket, boundary, f"{year}.geojson") try: - s3.put_object( - Bucket=os.environ["AWS_S3_RAW_BUCKET"], Key=s3_path, Body=data - ) + s3.put_object(Bucket=os.environ["AWS_S3_RAW_BUCKET"], Key=s3_path, Body=data) except ClientError as e: print(e) From 21beabfc982ba09fe4fc23a3eeedc4a1ed487e3f Mon Sep 17 00:00:00 2001 From: Damonamajor Date: Tue, 7 May 2024 22:10:05 +0000 Subject: [PATCH 12/12] autopep8 --- aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py | 3 ++- aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py b/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py index 5d71abf99..e86a3e347 100644 --- a/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py +++ b/aws-s3/scripts-ccao-data-raw-us-east-1/housing-ihs_index.py @@ -17,7 +17,8 @@ xlsx_link = content[link_start:link_end] # Form the complete URL for the Excel file -most_recent_ihs_data_url = url + xlsx_link if xlsx_link.startswith("/") else xlsx_link +most_recent_ihs_data_url = url + \ + xlsx_link if xlsx_link.startswith("/") else xlsx_link # Print the URL print(most_recent_ihs_data_url) diff --git a/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py b/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py index 227d5ecea..a733d21a9 100644 --- a/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py +++ b/aws-s3/scripts-ccao-data-raw-us-east-1/spatial-access.py @@ -46,7 +46,10 @@ def get_data_to_s3(source, api_url, boundary, year): s3_path = os.path.join(output_bucket, boundary, f"{year}.geojson") try: - s3.put_object(Bucket=os.environ["AWS_S3_RAW_BUCKET"], Key=s3_path, Body=data) + s3.put_object( + Bucket=os.environ["AWS_S3_RAW_BUCKET"], + Key=s3_path, + Body=data) except ClientError as e: print(e)