-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathstream_file_validator_prefect.py
92 lines (80 loc) · 2.75 KB
/
stream_file_validator_prefect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from prefect import flow
from stream_file_validator import main
from bento.common.secret_manager import get_secret
SUBMISSION_BUCKET = "submission_bucket"
@flow(name="CRDC Data Hub File Validator", log_prints=True)
def data_hub_file_validator(
organization_id,
submission_id,
manifest_name,
secret_name,
file_name_column="file_name",
file_size_column="file_size",
file_md5_column="md5sum"
):
secret = get_secret(secret_name)
bucket_name = secret[SUBMISSION_BUCKET]
file_prefix = f"{organization_id}/{submission_id}/file/"
upload_s3_url = f"s3://{bucket_name}/{organization_id}/{submission_id}/file/logs"
manifest_file = f"s3://{bucket_name}/{organization_id}/{submission_id}/metadata/{manifest_name}"
stream_file_validator(
manifest_file,
None,
bucket_name,
file_prefix,
upload_s3_url,
file_name_column,
file_size_column,
file_md5_column
)
@flow(name="CRDC Stream File Validator", log_prints=True)
def stream_file_validator(
manifest_file, # "s3://<bucket_name>/<file_key>",
file_url_column,
# If file urls are not available in the manifest, then bucket name and prefix (folder name) need to be provided
validation_s3_bucket,
validation_prefix,
upload_s3_url, # "s3://<upload_bucket_name>/<upload_file_location>"
# Column names in the manifest file
file_name_column = "file_name",
file_size_column = "file_size",
file_md5_column = "md5sum"
):
params = Config(
manifest_file,
file_name_column,
file_url_column,
file_size_column,
file_md5_column,
validation_s3_bucket,
validation_prefix,
upload_s3_url
)
print("Start stream file validating")
main(params)
print("Finish stream file validating")
class Config:
def __init__(
self,
manifest_file,
file_name_column,
file_url_column,
file_size_column,
file_md5_column,
validation_s3_bucket,
validation_prefix,
upload_s3_url
):
self.manifest_file = manifest_file
self.file_name_column = file_name_column
self.file_url_column = file_url_column
self.file_size_column = file_size_column
self.file_md5_column = file_md5_column
self.validation_s3_bucket = validation_s3_bucket
self.validation_prefix = validation_prefix
self.upload_s3_url = upload_s3_url
self.config_file = None
if __name__ == "__main__":
# create your first deployment
stream_file_validator.serve(name="local-stream-file-validator-deployment")
#stream_file_validator()