Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/nicke adpsfc prepbufr #10

Open
wants to merge 78 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
48b4a3f
added dump diur
rmclaren Oct 8, 2024
df49114
ADPSFC mapping and encoder file
Oct 31, 2024
f13cc3d
remove path appends
Oct 31, 2024
51928b5
CAT mnemonic
Oct 31, 2024
d3478fd
Add mapping file, python scripts, and shell scripts for satwind goes
Nov 13, 2024
7277cc5
remove test
Nov 18, 2024
a4efbb2
Update components (mapping, Python script, configuration) for satwind
Nov 18, 2024
5689332
remove old yaml
Nov 19, 2024
16db06f
Add configuration files and test shell script
Nov 19, 2024
927bae2
rename configuration yaml
emilyhcliu Nov 19, 2024
9dfd16d
Add temporary testing script
emilyhcliu Nov 19, 2024
54305c9
Add README file
emilyhcliu Nov 19, 2024
89f429a
rename README to README.md
emilyhcliu Nov 19, 2024
b115130
Update README.md
emilyhcliu Nov 19, 2024
f0ba20f
Update readme file
emilyhcliu Nov 19, 2024
8d324d0
Update README.md
emilyhcliu Nov 19, 2024
d70867f
Update README
emilyhcliu Nov 19, 2024
e5a95e7
rename process_bufr2ioda to bufr2ioda.sh
emilyhcliu Nov 19, 2024
7dbc826
Update README.md
emilyhcliu Nov 19, 2024
6346844
Update README.md
emilyhcliu Nov 19, 2024
f91d1d1
Update README.md
emilyhcliu Nov 19, 2024
8ba4d7b
Update README.md
emilyhcliu Nov 19, 2024
87096eb
Update readme
emilyhcliu Nov 19, 2024
3e7789c
Add comments
emilyhcliu Nov 19, 2024
a645370
update comments
emilyhcliu Nov 19, 2024
c0fdb95
update bufr2ioda.sh
emilyhcliu Nov 19, 2024
c262e3d
update usage
emilyhcliu Nov 19, 2024
dde4d27
Add comments
emilyhcliu Nov 19, 2024
d080c23
add cycle in input path
emilyhcliu Nov 20, 2024
09d0061
Modify global attribute in the mapping file
emilyhcliu Nov 20, 2024
8f23b5b
Merge branch 'develop' into feature/dump_satwind_goes
emilyhcliu Nov 21, 2024
cd74eee
remove . before bufr2ioda.sh
emilyhcliu Nov 21, 2024
6351d34
Update README.md
emilyhcliu Nov 21, 2024
b2ea3ae
Update README.md
emilyhcliu Nov 22, 2024
d678190
Update README.md
emilyhcliu Nov 22, 2024
67311d6
remove wxflow from the test script
emilyhcliu Nov 22, 2024
428e600
remove wxflow from input
emilyhcliu Nov 22, 2024
6564a18
Update README.md
emilyhcliu Nov 22, 2024
f757393
Add comment block for logger
emilyhcliu Nov 24, 2024
77f2e92
add bufrtype (this is bufr dump list)
emilyhcliu Nov 24, 2024
00cd504
Update documentation
emilyhcliu Nov 24, 2024
9d9e821
Add bufrtype and update README
emilyhcliu Nov 24, 2024
e9b8833
Add split_by_category input
emilyhcliu Nov 25, 2024
aec8e56
Update README
emilyhcliu Nov 25, 2024
5679a32
Update README.md
emilyhcliu Nov 25, 2024
70f90d9
Rename bufr2ioda to encodeBufr and update readme
emilyhcliu Nov 25, 2024
1365772
Update README.md
emilyhcliu Nov 26, 2024
9a8487a
Merge branch 'feature/dump_satwind_goes' into feature/NICKE_adpsfc_pr…
nicholasesposito Nov 26, 2024
0f5dce2
adpsfc changes. works
nicholasesposito Dec 5, 2024
e523fdb
update 1
nicholasesposito Dec 5, 2024
eba41c2
name changes
nicholasesposito Dec 5, 2024
414b489
encodeBufr_Nick.sh
nicholasesposito Dec 10, 2024
4f92d5c
all works now
nicholasesposito Dec 10, 2024
282e177
small changes for script
nicholasesposito Dec 10, 2024
16d8586
update encode_Nick.sh from sfcshp
nicholasesposito Dec 12, 2024
a907f3d
add datetime, cycle_time
nicholasesposito Dec 12, 2024
3770d7e
remove longitude range for now, encodeBufr_Nick python 3.10/3.7
Dec 20, 2024
8b52af4
filename changes, mpi additions
Dec 26, 2024
3cc6f79
bufr_backend script_backend -> bufr4backend, script4backend
Dec 26, 2024
b216579
encodeBufr_Nick.sh _backend -> 4backend
Dec 26, 2024
109c39f
_backend -> 4backend
Dec 26, 2024
b39382b
cp to mapping config
Dec 26, 2024
9a83e0d
py to mapping
Dec 26, 2024
68bf722
add reference time
Dec 27, 2024
9d0e653
subsets, input filename, global attributes, dateTime min
Jan 2, 2025
bdeab47
rm some comments
Jan 2, 2025
b3e6a7a
update files in config
Jan 15, 2025
c0aec91
rm testdir files
Jan 15, 2025
0110977
rm encodeBufr_Nick.sh
Jan 15, 2025
281001b
fix
Jan 15, 2025
1e73890
move configs
Jan 15, 2025
68fbb81
Merge branch 'develop' into feature/NICKE_adpsfc_prepbufr
Jan 15, 2025
e2e31e2
cleanup
Jan 15, 2025
b2356f7
chmod encodeBufr.sh, remove bufr2ioda*satwnd*
Jan 15, 2025
788dbdd
Merge branch 'develop' into feature/NICKE_adpsfc_prepbufr
emilyhcliu Jan 22, 2025
33fa954
function names, some coding norms
Jan 23, 2025
e6222b9
function name
Jan 23, 2025
a18ab32
rm add_global referenceDateTime fornow
nicholasesposito Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 255 additions & 0 deletions dump/mapping/bufr_adpsfc_prepbufr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
#!/usr/bin/env python3
import os
import sys
import bufr
import argparse
import copy
import numpy as np
import numpy.ma as ma
import math
import calendar
import time
from datetime import datetime
from pyioda.ioda.Engines.Bufr import Encoder as iodaEncoder
from bufr.encoders.netcdf import Encoder as netcdfEncoder
from wxflow import Logger

# Initialize Logger
# Get log level from the environment variable, default to 'INFO it not set
log_level = os.getenv('LOG_LEVEL', 'INFO')
logger = Logger('bufr_adpsfc_prepbufr.py', level=log_level, colored_log=False)

def logging(comm, level, message):
"""
Logs a message to the console or log file, based on the specified logging level.

This function ensures that logging is only performed by the root process (`rank 0`)
in a distributed computing environment. The function maps the logging level to
appropriate logger methods and defaults to the 'INFO' level if an invalid level is provided.

Parameters:
comm: object
The communicator object, typically from a distributed computing framework
(e.g., MPI). It must have a `rank()` method to determine the process rank.
level: str
The logging level as a string. Supported levels are:
- 'DEBUG'
- 'INFO'
- 'WARNING'
- 'ERROR'
- 'CRITICAL'
If an invalid level is provided, a warning will be logged, and the level
will default to 'INFO'.
message: str
The message to be logged.

Behavior:
- Logs messages only on the root process (`comm.rank() == 0`).
- Maps the provided logging level to a method of the logger object.
- Defaults to 'INFO' and logs a warning if an invalid logging level is given.
- Supports standard logging levels for granular control over log verbosity.

Example:
>>> logging(comm, 'DEBUG', 'This is a debug message.')
>>> logging(comm, 'ERROR', 'An error occurred!')

Notes:
- Ensure that a global `logger` object is configured before using this function.
- The `comm` object should conform to MPI-like conventions (e.g., `rank()` method).
"""

if comm.rank() == 0:
# Define a dictionary to map levels to logger methods
log_methods = {
'DEBUG': logger.debug,
'INFO': logger.info,
'WARNING': logger.warning,
'ERROR': logger.error,
'CRITICAL': logger.critical,
}

# Get the appropriate logging method, default to 'INFO'
log_method = log_methods.get(level.upper(), logger.info)

if log_method == logger.info and level.upper() not in log_methods:
# Log a warning if the level is invalid
logger.warning(f'log level = {level}: not a valid level --> set to INFO')

# Call the logging method
log_method(message)


def _compute_datetime(cycleTimeSinceEpoch, dhr):
"""
Compute dateTime using the cycleTimeSinceEpoch and Cycle Time
minus Cycle Time

Parameters:
cycleTimeSinceEpoch: Time of cycle in Epoch Time
dhr: Observation Time Minus Cycle Time

Returns:
Masked array of dateTime values
"""

int64_fill_value = np.int64(0)

dateTime = np.zeros(dhr.shape, dtype=np.int64)
for i in range(len(dateTime)):
if ma.is_masked(dhr[i]):
continue
else:
dateTime[i] = np.int64(dhr[i]*3600) + cycleTimeSinceEpoch

dateTime = ma.array(dateTime)
dateTime = ma.masked_values(dateTime, int64_fill_value)

return dateTime


def _make_description(mapping_path, cycle_time, update=False):
description = bufr.encoders.Description(mapping_path)

ReferenceTime = np.int64(calendar.timegm(time.strptime(str(int(cycle_time)), '%Y%m%d%H')))

if update:
# Define the variables to be added in a list of dictionaries
variables = [
{
'name': 'MetaData/sequenceNumber',
'source': 'variables/sequenceNumber',
'units': '1',
'longName': 'Sequence Number (Obs Subtype)',
}
]

# Loop through each variable and add it to the description
for var in variables:
description.add_variable(
name=var['name'],
source=var['source'],
units=var['units'],
longName=var['longName']
)

#description.add_global(name='datetimeReference', value=str(ReferenceTime))

return description


def _make_obs(comm, input_path, mapping_path, cycle_time):
"""
Create the ioda adpsfc prepbufr observations:
- reads values
- adds sequenceNum

Parameters
----------
comm: object
The communicator object (e.g., MPI)
input_path: str
The input bufr file
mapping_path: str
The input bufr2ioda mapping file
cycle_time: str
The cycle in YYYYMMDDHH format
"""

# Get container from mapping file first
logging(comm, 'INFO', 'Get container from bufr')
container = bufr.Parser(input_path, mapping_path).parse(comm)

logging(comm, 'DEBUG', f'container list (original): {container.list()}')
logging(comm, 'DEBUG', f'Change longitude range from [0,360] to [-180,180]')
lon = container.get('variables/longitude')
lon_paths = container.get_paths('variables/longitude')
lon[lon>180] -= 360
lon = ma.round(lon, decimals=2)
logging(comm, 'DEBUG', f'longitude max and min are {lon.max()}, {lon.min()}')

logging(comm, 'DEBUG', f'Do DateTime calculation')
otmct = container.get('variables/obsTimeMinusCycleTime')
otmct_paths = container.get_paths('variables/obsTimeMinusCycleTime')
otmct2 = np.array(otmct)
cycleTimeSinceEpoch = np.int64(calendar.timegm(time.strptime(str(int(cycle_time)), '%Y%m%d%H')))
dateTime = _compute_datetime(cycleTimeSinceEpoch, otmct2)
min_dateTime_ge_zero = min(x for x in dateTime if x > -1)
logging(comm, 'DEBUG', f'dateTime min/max = {min_dateTime_ge_zero} {dateTime.max()}')

logging(comm, 'DEBUG', f'Make an array of 0s for MetaData/sequenceNumber')
sequenceNum = np.zeros(lon.shape, dtype=np.int32)
logging(comm, 'DEBUG', f' sequenceNummin/max = {sequenceNum.min()} {sequenceNum.max()}')

logging(comm, 'DEBUG', f'Update variables in container')
container.replace('variables/longitude', lon)
container.replace('variables/timestamp', dateTime)

logging(comm, 'DEBUG', f'Add variables to container')
container.add('variables/sequenceNumber', sequenceNum, lon_paths)

# Check
logging(comm, 'DEBUG', f'container list (updated): {container.list()}')

return container


def create_obs_group(input_path, mapping_path, cycle_time, env):

comm = bufr.mpi.Comm(env["comm_name"])

logging(comm, 'INFO', f'Make description and make obs')

container = _make_obs(comm, input_path, mapping_path, cycle_time)
description = _make_description(mapping_path, cycle_time, update=True)

# Gather data from all tasks into all tasks. Each task will have the complete record
logging(comm, 'INFO', f'Gather data from all tasks into all tasks')
container.all_gather(comm)

logging(comm, 'INFO', f'Encode the data')
data = next(iter(iodaEncoder(description).encode(container).values()))

logging(comm, 'INFO', f'Return the encoded data.')

return data


def create_obs_file(input_path, mapping_path, output_path, cycle_time):

comm = bufr.mpi.Comm("world")
container = _make_obs(comm, input_path, mapping_path, cycle_time)
container.gather(comm)

description = _make_description(mapping_path, cycle_time, update=True)

# Encode the data
if comm.rank() == 0:
netcdfEncoder(description).encode(container, output_path)

logging(comm, 'INFO', f'Return the encoded data')


if __name__ == '__main__':
start_time = time.time()

bufr.mpi.App(sys.argv)
comm = bufr.mpi.Comm("world")

# Required input arguments as positional arguments
parser = argparse.ArgumentParser(description="Convert BUFR to NetCDF using a mapping file.")
parser.add_argument('input', type=str, help='Input BUFR file')
parser.add_argument('mapping', type=str, help='BUFR2IODA Mapping File')
parser.add_argument('output', type=str, help='Output NetCDF file')
parser.add_argument('cycle_time', type=str, help='cycle time in YYYYMMDDHH format')

args = parser.parse_args()
infile = args.input
mapping = args.mapping
output = args.output
cycle_time = args.cycle_time

create_obs_file(infile, mapping, output, cycle_time)

end_time = time.time()
running_time = end_time - start_time
logging(comm, 'INFO', f'Total running time: {running_time}')
Loading