diff --git a/src/python/CMSSpark/wmarchive_crab_file_access.py b/src/python/CMSSpark/wmarchive_crab_file_access.py index 48e6669e..d12b73b3 100644 --- a/src/python/CMSSpark/wmarchive_crab_file_access.py +++ b/src/python/CMSSpark/wmarchive_crab_file_access.py @@ -13,16 +13,14 @@ """ # system modules -import json import logging -import os import sys import time from datetime import date, datetime, timezone - +from collections import defaultdict import click from pyspark.sql.functions import ( - col as _col, countDistinct, first, greatest, lit, lower, when, + col as _col, countDistinct, explode, posexplode, first, greatest, lit, lower, upper, when, avg as _avg, count as _count, hex as _hex, @@ -35,13 +33,7 @@ # CMSSpark modules from CMSSpark.spark_utils import get_spark_session, get_candidate_files - -# CMSMonitoring modules -try: - from CMSMonitoring.StompAMQ7 import StompAMQ7 -except ImportError: - print("ERROR: Could not import StompAMQ") - sys.exit(1) +from es_opensearch.es import EsInterface # global variables TODAY = datetime.today().strftime('%Y-%m-%d') @@ -90,9 +82,13 @@ def info_logs(): def get_df_ds_general_info(spark): """Calculate real size and total file counts of dataset: RealSize, TotalFileCnt + + Explanations: + - LastCreation date of a DATASET is deliberatelycalculated by getting max of FILE's LAST_MODIFICATION_DATE + because CREATION_DATE fields are null in most of the files. """ - dbs_files = spark.read.format('avro').load(HDFS_DBS_FILES).select(['DATASET_ID', 'FILE_SIZE', - 'LOGICAL_FILE_NAME', 'CREATION_DATE']) + dbs_files = spark.read.format('avro').load(HDFS_DBS_FILES) \ + .select(['DATASET_ID', 'FILE_SIZE', 'LOGICAL_FILE_NAME', 'LAST_MODIFICATION_DATE']) dbs_datasets = spark.read.format('avro').load(HDFS_DBS_DATASETS).select(['DATASET_ID']) dbs_data_tiers = spark.read.format('avro').load(HDFS_DBS_DATA_TIERS) dbs_physics_group = spark.read.format('avro').load(HDFS_DBS_PHYSICS_GROUPS) @@ -102,7 +98,7 @@ def get_df_ds_general_info(spark): .groupby('DATASET_ID') \ .agg(_sum('FILE_SIZE').alias('RealSize'), countDistinct(_col('LOGICAL_FILE_NAME')).alias('TotalFileCnt'), - _max('CREATION_DATE').alias('LastCreation'), + _max('LAST_MODIFICATION_DATE').alias('LastCreation'), ) \ .select(['DATASET_ID', 'RealSize', 'TotalFileCnt', 'LastCreation']) dbs_datasets = spark.read.format('avro').load(HDFS_DBS_DATASETS) @@ -123,6 +119,51 @@ def get_df_ds_general_info(spark): 'PhysicsGroupName', 'AcquisitionEraName', 'DatasetAccessType']) +def get_tier_ids(spark, tiers): + """Returns dataset tier ids of given tier names""" + df_data_tiers = spark.read.format('avro').load(HDFS_DBS_DATA_TIERS).toPandas() + # Get tier ids of given filtered data tier names + return df_data_tiers[df_data_tiers['DATA_TIER_NAME'].isin(tiers)]['DATA_TIER_ID'].to_list() + + +def get_all_parents_of_datasets(spark): + """Returns datasets and their all parents in a list (grand parents, grand grand parents, etc.) + + Examples: + Firstly gather all: + Row(id=1, pids=[2]), Row(id=2, pids=[3, 4]), + Then flatten again + Row(id=1, pid=2), Row(id=1, pid=3), Row(id=1, pid=4), + Row(id=2, pid=3), Row(id=2, pid=4), + Operation is not costly + """ + schema_in = StructType([ + StructField("this_dataset_id", LongType(), True), + StructField("parent_dataset_id", LongType(), True)]) + dbs_dataset_parents = spark.read.csv('/tmp/cmssqoop/dbs_global/2022-10-04/DATASET_PARENTS/*.gz', header=False, + schema=schema_in) + dct = defaultdict(set) + for ds_id, parent_id in dbs_dataset_parents.toPandas().to_records(index=False).tolist(): + if dct[parent_id]: + # let's say: ds_id:3, parent_id:2 has also a parent as {1}, we should add {1, 2} to ds_id 3's values + t = dct[parent_id].copy() + t.add(parent_id) + dct[ds_id].update(t) + else: + # Firs addition to empty set object + dct[ds_id].add(parent_id) + + schema = StructType([ + StructField('DatasetId', LongType(), True), + StructField('ParentDatasetId', ArrayType(LongType()), True) + ]) + # convert set to list for pyspark + dct = {k: list(v) for k, v in dct.items() if v is not None} + df = spark.createDataFrame(data=dct.items(), schema=schema) + df = df.select(df.DatasetId, explode(df.ParentDatasetId).alias('ParentDatasetId')) + return df + + def get_df_dbs_f_d(spark): """Create a dataframe for FILE-DATASET membership/ownership map @@ -146,20 +187,20 @@ def get_df_dbs_f_d(spark): # ===================================================================================================================== # WMArchive Data Preparation # ===================================================================================================================== -def get_schema(): +def get_wm_agg_schema(): """Final schema for LFNArray files""" return StructType( [ StructField('wmaid', StringType(), nullable=False), StructField('access_ts', LongType(), nullable=False), - StructField('jobtype', StringType(), nullable=False), + # StructField('jobtype', StringType(), nullable=False), StructField('jobstate', StringType(), nullable=True), StructField('file', StringType(), nullable=False), ] ) -def get_raw_schema(): +def get_wma_raw_read_schema(): """Raw schema while reading WMArchive data WMArchive schema had changed in years, we need to use a custom schema while reading it to not get errors like @@ -182,7 +223,7 @@ def get_raw_schema(): ]) -def udf_lfn_extract(row): +def udf_wma_lfn_extract(row): """ Borrowed from wmarchive.py @@ -190,50 +231,60 @@ def udf_lfn_extract(row): Returns list of files from LFNArray with its metadata """ # Spark reads only data.meta_data, data.LFNArray, data.wmaid and data.timestamp - if row['jobtype'] and row['LFNArray']: + if row['LFNArray']: # prepare metadata - result = {'wmaid': row.wmaid, 'access_ts': row.ts * 1000, 'jobtype': row.jobtype, 'jobstate': row.jobstate} + result = {'wmaid': row.wmaid, 'access_ts': row.ts * 1000, 'jobstate': row.jobstate} if result: - # if file is not empty, for each file create a new record - # [{'wmaid':x, 'access_ts':x, 'jobtype':x, 'jobstate':x, 'file':x},] - return [{**result, **{'file': file}} for file in row['LFNArray'] if file] + # In case of any duplicate files in LFNArray, use set + + # if file is not empty, for each file create a new record like below + # ==== [{'wmaid':x, 'access_ts':x, 'jobtype':x, 'jobstate':x, 'file':x}, ...] ==== + return [{**result, **{'file': file}} for file in set(row['LFNArray']) if file] else: return [] def get_df_main_wma(spark, start_date, end_date): + """Read WMArchive HDFS data, join with DBS for file/dataset map and make calculations + + Explanations: + - WmaLastAccess: Last time that WMA CRAB3 jobtype had LFNArray which contains a file of the dataset + - WmaFirstAccess: First time that WMA CRAB3 jobtype had LFNArray which contains a file of the dataset + - WmaAccessCnt: Unique count of wmaid of WMA CRAB3 jobs had LFNArray which contains a file of the dataset + """ print(f"WMArchive data will be processed between {str(start_date)} - {str(end_date)}") - # Get WMArchive data - df_raw = spark.read.schema(get_raw_schema()).option("basePath", HDFS_WMA_WMARCHIVE) \ + # Get WMArchive data only for CRAB3 jobs + df_raw = spark.read.schema(get_wma_raw_read_schema()).option("basePath", HDFS_WMA_WMARCHIVE) \ .json(get_candidate_files(start_date, end_date, spark, base=HDFS_WMA_WMARCHIVE, day_delta=1)) \ .select(["data.meta_data.*", "data.LFNArray", "data.wmaid", "metadata.timestamp"]) \ - .filter(f"""data.wmats >= {start_date.timestamp()} AND data.wmats < {end_date.timestamp()}""") + .filter(f"""data.wmats >= {start_date.timestamp()} AND data.wmats < {end_date.timestamp()}""") \ + .filter(upper(_col('data.meta_data.jobtype')).startswith('CRAB')) - df_rdd = df_raw.rdd.flatMap(lambda r: udf_lfn_extract(r)) - df_wma_files = spark.createDataFrame(df_rdd, schema=get_schema()).where(_col("file").isNotNull()) + df_rdd = df_raw.rdd.flatMap(lambda r: udf_wma_lfn_extract(r)) + df_wma_files = spark.createDataFrame(df_rdd, schema=get_wm_agg_schema()).where(_col("file").isNotNull()) # Get file:dataset map from DBS df_dbs_f_d = get_df_dbs_f_d(spark) - # Join wma LFNArray file data with DBS to find dataset names of files + # Join wma LFNArray file data with DBS to find dataset names of wma files (CRAB jobs that requested) df_wma_and_dbs = df_wma_files.join(df_dbs_f_d, df_dbs_f_d.f_name == df_wma_files.file, how='left') \ .fillna("UnknownDatasetNameOfFiles_MonitoringTag", subset=['dataset']) \ - .select(['jobtype', 'access_ts', 'dataset_id', 'dataset']) + .select(['dataset_id', 'dataset', 'f_name', 'wmaid', 'access_ts']) # Calculate last and first access of files using LFNArray files - df = df_wma_and_dbs.groupby(['dataset_id', 'jobtype']) \ + df = df_wma_and_dbs.groupby(['dataset_id']) \ .agg(_max(_col('access_ts')).cast(LongType()).alias('WmaLastAccess'), _min(_col('access_ts')).cast(LongType()).alias('WmaFirstAccess'), first(_col('dataset_id')).cast(LongType()).alias('Id'), + countDistinct(_col('wmaid')).cast(LongType()).alias('WmaAccessCnt'), ) \ .withColumnRenamed('dataset_id', 'DatasetId') \ - .withColumnRenamed('jobtype', 'WmaJobtype') \ - .select(['DatasetId', 'WmaJobtype', 'WmaLastAccess', 'WmaFirstAccess']) + .withColumn('WmaJobtype', lit('CRAB')) \ + .select(['DatasetId', 'WmaJobtype', 'WmaLastAccess', 'WmaFirstAccess', 'WmaAccessCnt']) print('WMA Schema:') df.printSchema() - # ['DatasetId', 'WmaJobtype', 'WmaLastAccess', 'WmaFirstAccess'] return df @@ -440,6 +491,27 @@ def get_df_main_rucio(spark): return df +def get_parent_dataset_calculations(spark, df_rucio_and_wma): + """Calculate parent datasets' children's access count + + To better understand the logic, please hava a look https://gist.github.com/mrceyhun/b2b13dab8bc401d7f6b0e6035866d154 + """ + # WmaAccessCnt0 will be used to calculate higher hierarchy of datasets(grand parents, etc.) iteratively + df_rucio_and_wma = df_rucio_and_wma.select(['DatasetId', 'WmaAccessCnt']) \ + .withColumnRenamed('WmaAccessCnt', 'WmaAccessCnt0') + + df_dataset_parents = get_all_parents_of_datasets(spark=spark) + df = df_dataset_parents.join(df_rucio_and_wma, ['DatasetId'], 'left').fillna(0, ['WmaAccessCnt0']) + + # hierarchy rank is at most 3, NANOAOD, MINIAOD, AOD, RAW + for i in range(5): + df_tmp = df.groupby('ParentDatasetId').agg(_sum("WmaAccessCnt" + str(i)).alias('WmaAccessCnt' + str(i + 1))) \ + .withColumnRenamed('ParentDatasetId', 'DatasetId') + df = df.join(df_tmp, ['DatasetId'], 'left').fillna(0, ['WmaAccessCnt' + str(i + 1)]) + df = df.withColumn('WmaTotalAccessCnt', sum(df[c] for c in df.columns if c.startswith('WmaAccessCnt'))) + return df.select(['DatasetId', 'WmaTotalAccessCnt']) + + # ===================================================================================================================== # Join WMArchive and Rucio # ===================================================================================================================== @@ -447,7 +519,9 @@ def get_df_main(spark, df_rucio, df_wma): """Joins WMArchive results and Rucio results, and then adds DBS tags to datasets""" # To reduce the Spark memory heap, we used DatasetId as common column instead of dataset name - df = df_rucio.join(df_wma, ['DatasetId'], how='outer') + df_rucio_and_wma = df_rucio.join(df_wma, ['DatasetId'], how='outer') + df_parent_children_acc_cnt = get_parent_dataset_calculations(spark, df_rucio_and_wma) + df = df_rucio_and_wma.join(df_parent_children_acc_cnt, ['DatasetId'], how='left') # Important logic, # - LastAccess will be filled firstly with WmaLastAccess and then with RucioLastAccess @@ -463,25 +537,17 @@ def get_df_main(spark, df_rucio, df_wma): .select(['Id', 'Dataset', 'LastAccess', 'LastCreation', 'DatasetAccessType', 'RealSize', 'TotalFileCnt', 'IsDatasetValid', 'TierName', 'PhysicsGroupName', 'AcquisitionEraName', 'RucioLastAccess', 'Max', 'Min', 'Avg', 'Sum', - 'WmaJobtype', 'WmaLastAccess', 'WmaFirstAccess']) + 'WmaJobtype', 'WmaLastAccess', 'WmaFirstAccess', 'WmaAccessCnt', 'WmaTotalAccessCnt']) df = df.fillna(value=NULL_STR_TYPE_COLUMN_VALUE, subset=STR_TYPE_COLUMNS) + print('Main Schema:') + df.printSchema() return df # ===================================================================================================================== -# Send data with STOMP AMQ +# Send data to ES # ===================================================================================================================== -def credentials(f_name): - """Reads AMQ credentials JSON file and returns its python dict object. - - See required keys in `send_to_amq` function - """ - if os.path.exists(f_name): - return json.load(open(f_name)) - return {} - - def drop_nulls_in_dict(d): # d: dict """Drops the dict key if the value is None @@ -490,51 +556,16 @@ def drop_nulls_in_dict(d): # d: dict return {k: v for k, v in d.items() if v is not None} # dict -def to_chunks(data, samples=1000): - length = len(data) - for i in range(0, length, samples): - yield data[i:i + samples] - - -def send_to_amq(data, confs, batch_size): - """Sends list of dictionary in chunks""" - wait_seconds = 0.001 - if confs: - username = confs.get('username', '') - password = confs.get('password', '') - producer = confs.get('producer') - doc_type = confs.get('type', None) - topic = confs.get('topic') - host = confs.get('host') - port = int(confs.get('port')) - cert = confs.get('cert', None) - ckey = confs.get('ckey', None) - stomp_amq = StompAMQ7(username=username, password=password, producer=producer, topic=topic, - key=ckey, cert=cert, validation_schema=None, host_and_ports=[(host, port)], - loglevel=logging.WARNING) - # Slow: stomp_amq.send_as_tx(chunk, docType=doc_type) - # - for chunk in to_chunks(data, batch_size): - messages = [] - for msg in chunk: - notif, _, _ = stomp_amq.make_notification(payload=msg, doc_type=doc_type, producer=producer) - messages.append(notif) - if messages: - stomp_amq.send(messages) - time.sleep(wait_seconds) - time.sleep(1) - print("Message sending is finished") - - @click.command() @click.option("--start_date", default=None, type=click.DateTime(_VALID_DATE_FORMATS)) @click.option("--end_date", default=None, type=click.DateTime(_VALID_DATE_FORMATS)) -@click.option("--creds", required=True, help="amq credentials json file") -@click.option("--amq_batch_size", type=click.INT, required=False, help="AMQ producer chunk size", default=1000) +@click.option("--es_conf", required=True, help="ES credentials json file, see es_opensearch: CMSSpark.src/python") +@click.option("--index", required=True, help="Es index name", default="test-dima") +@click.option("--batch_size", type=click.INT, required=False, help="ES producer chunk size", default=1000) @click.option("--test", is_flag=True, default=False, required=False, help="It will send only 10 documents to ElasticSearch. " "[!Attention!] Please provide test/training AMQ topic.") -def main(start_date, end_date, creds, amq_batch_size, test): +def main(start_date, end_date, index, es_conf, batch_size, test): info_logs() spark = get_spark_session(app_name='wmarchive_crab_file_access') @@ -549,32 +580,70 @@ def main(start_date, end_date, creds, amq_batch_size, test): df_wma = get_df_main_wma(spark, start_date, end_date) df_rucio = get_df_main_rucio(spark) df = get_df_main(spark, df_rucio, df_wma) + # spark.sparkContext.addPyFile('/eos/user/c/cuzunogl/CMSSpark/src/python/es_opensearch.zip') + # spark.sparkContext.addFile(es_conf) - creds_json = credentials(f_name=creds) - print('Final Schema:') - df.printSchema() total_size = 0 - if test: - _topic = creds_json["topic"].lower() - if ("train" not in _topic) and ("test" not in _topic): - print(f"Test failed. Topic \"{_topic}\" is not training or test topic.") - sys.exit(1) - - for part in df.rdd.mapPartitions(lambda p: [[drop_nulls_in_dict(x.asDict()) for x in p]]).toLocalIterator(): - part_size = len(part) - print(f"Length of partition: {part_size}") - send_to_amq(data=part[:10], confs=creds_json, batch_size=amq_batch_size) - print(f"Test successfully finished and sent 10 documents to {_topic} AMQ topic.") - sys.exit(0) - else: - # Iterate over list of dicts returned from spark - for part in df.rdd.mapPartitions(lambda p: [[drop_nulls_in_dict(x.asDict()) for x in p]]).toLocalIterator(): - part_size = len(part) - print(f"Length of partition: {part_size}") - send_to_amq(data=part, confs=creds_json, batch_size=amq_batch_size) - total_size += part_size - print(f"Total document size: {total_size}") + for part in df.rdd.mapPartitions(lambda p: [[drop_nulls_in_dict(x.asDict()) for x in p]]).toLocalIterator(): + part_size = len(part) + print(f"Length of partition: {part_size}") + es = EsInterface(es_conf, logging_level=logging.WARNING) + es.post_bulk(index=index, data=part, metadata={"timestamp": round(time.time() * 1000)}, + is_daily_index=True, batch_size=batch_size) + total_size += part_size + print(f"Total document size: {total_size}") if __name__ == '__main__': main() + +# TODO: temporary mapping creaytion +# - ES producer is not ready, there are python "elasticsearch" module version differences in Spark nodes, +# so we cannot create mapping via python rest call for now. +# - ES OpenSearch index templates is not working as expected with current mappings. +# - Because of these issues, for this unfinished task, we can create mapping via curl for now: + +# curl -X PUT --negotiate -u : https://es-cms1.cern.ch/es/_index_template/test-dima -H 'Content-Type: application/json' -d' +# { +# "index_patterns": [ +# "test-dima-*" +# ], +# "template": { +# "aliases": { +# "test-dima": {} +# }, +# "settings": { +# "number_of_shards": 2, +# "number_of_replicas": 1 +# }, +# "mappings": { +# "properties": { +# "LastCreation": { +# "type": "date", +# "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_second" +# }, +# "LastAccess": { +# "type": "date", +# "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" +# }, +# "WmaLastAccess": { +# "type": "date", +# "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" +# }, +# "WmaFirstAccess": { +# "type": "date", +# "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" +# }, +# "metadata": { +# "properties": { +# "timestamp": { +# "type": "date", +# "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" +# } +# } +# } +# } +# } +# } +# } +# ' diff --git a/src/python/es-opensearch/es.py b/src/python/es_opensearch/es.py similarity index 81% rename from src/python/es-opensearch/es.py rename to src/python/es_opensearch/es.py index 2840128b..f09069ef 100644 --- a/src/python/es-opensearch/es.py +++ b/src/python/es_opensearch/es.py @@ -9,7 +9,6 @@ import datetime import json import logging -import socket import sys import time @@ -22,7 +21,7 @@ class EsInterface(object): It uses port 443 HTTPS port and url should end with /es, see `prepare_hostname` function """ # Logging format - logging_fmt = '[%(asctime)s' + time.strftime('%z') + '] [%(levelname)s] %(message)s' + logging_fmt = '[%(asctime)s' + time.strftime('%z') + '] [ES] [%(levelname)s] %(message)s' # This producer adds this reserved timestamp key to all documents metadata_time_field = "EsProducerTime" @@ -54,30 +53,30 @@ def __init__(self, es_conf, logging_level=logging.INFO, logger=None): if not es_conf: self.logger.error("Failed to create ElasticSearch interface, please provide es-conf parameter") sys.exit(1) - domain = socket.getfqdn().split(".", 1)[-1] - if domain == "cern.ch": - try: - with open(es_conf) as f: - es_creds = json.load(f) - except Exception as e: - self.logger.error("Failed to read es_conf: %s - err: %s", es_conf, str(e)) - sys.exit(1) - - if any((key not in es_creds) for key in ["username", "password", "hostname"]): - self.logger.error("Not all required keys are provided: username, password, hostname. Conf file: %s", - es_conf) - sys.exit(1) - hostname = self.prepare_hostname(es_creds["hostname"]) - self.logger.info("ElasticSearch host: %s", hostname) - self.handle = elasticsearch.Elasticsearch( - hosts=[hostname], - http_auth=(es_creds["username"], es_creds["password"]), - verify_certs=True, - ca_certs="/etc/pki/tls/certs/ca-bundle.trust.crt", - ) - else: - # Default localhost ES connection interface - self.handle = elasticsearch.Elasticsearch() + + try: + with open(es_conf) as f: + es_creds = json.load(f) + except Exception as e: + self.logger.error("Failed to read es_conf: %s - err: %s", es_conf, str(e)) + sys.exit(1) + + self.logger.info("Will be connected to ElasticSearch host: %s", es_creds['hostname']) + self.logger.info("ElasticSearch py version:%s", str(elasticsearch.__version__)) + + if any((key not in es_creds) for key in ["username", "password", "hostname"]): + self.logger.error("Not all required keys are provided: username, password, hostname. Conf file: %s", + es_conf) + sys.exit(1) + + hostname = self.prepare_hostname(es_creds["hostname"]) + self.logger.info("ElasticSearch host: %s", hostname) + self.handle = elasticsearch.Elasticsearch( + hosts=[hostname], + http_auth=(es_creds["username"], es_creds["password"]), + verify_certs=True, + ca_certs="/etc/pki/tls/certs/ca-bundle.trust.crt", + ) @staticmethod def prepare_hostname(hostname): @@ -130,6 +129,7 @@ def prepare_mappings(self, int_vals=(), text_vals=(), keyword_vals=(), date_vals } } mappings = {"dynamic_templates": [dynamic_string_template], "properties": props} + self.logger.debug("Mappings will be put: %s", str(mappings)) return mappings @staticmethod @@ -223,11 +223,11 @@ def put_mapping_and_setting(self, index, mappings, settings=None): try: if settings and mappings: self.handle.indices.put_mapping(mappings, index=index) - self.handle.indices.put_settings(index=index, settings=settings) + self.handle.indices.put_settings(settings, index=index) elif mappings: self.handle.indices.put_mapping(mappings, index=index) elif settings: - self.handle.indices.put_settings(index=index, settings=settings) + self.handle.indices.put_settings(settings, index=index) else: return except Exception as e: @@ -238,6 +238,8 @@ def put_mapping_and_setting(self, index, mappings, settings=None): def prepare_daily_index(self, index_template, **kwargs): """Special function to create index, settings and mappings + Supports only one flat data, not nested data. + Args: index_template: kwargs: @@ -284,37 +286,43 @@ def prepare_daily_index(self, index_template, **kwargs): self.put_mapping_and_setting(index=idx, mappings=mappings, settings=settings) self.logger.info("Index mappings and settings are ready: %s", idx) - def post_bulk(self, index, data, metadata, is_daily_index, **kwargs): - """Send data + @staticmethod + def drop_nulls_in_dict(d): # d: dict + """Drops the dict key if the value is None - If index is daily and mapping/setting provided in kwargs, it creates mapping and settings for daily index. - This option should be considered. + ES mapping does not allow None values and drops the document completely. + """ + return {k: v for k, v in d.items() if v is not None} # dict - ATTENTION: If index template is used, do not use kwargs to set mappings. + @staticmethod + def to_chunks(data, batch_size): + length = len(data) + for i in range(0, length, batch_size): + yield data[i:i + batch_size] + + def post_bulk(self, index, data, metadata, is_daily_index, batch_size=100): + """Send data Args: index: index template name if is_daily_index True, otherwise full index name data: list is_daily_index: if it is True, daily index will be created from `index` metadata: general metadata for documents + batch_size: chunk size """ if is_daily_index: idx = self.get_daily_index(time.time(), index) - if kwargs: - self.prepare_daily_index(index_template=index, **kwargs) else: idx = index # Check and create daily index self.create_index(idx) - # Check and create daily index - self.create_index(idx) - data = self.prepare_body(idx, data, metadata) try: - res = self.handle.bulk(body=data, index=idx, request_timeout=60) - if res.get("errors"): - return self.parse_errors(res) + for chunk in self.to_chunks(data, batch_size): + res = self.handle.bulk(body=chunk, index=idx, request_timeout=60) + if res.get("errors"): + self.logger.error(self.parse_errors(res)) except Exception as e: self.logger.error("Bulk post finished with error: %s", str(e)) diff --git a/src/python/es-opensearch/test-es.py b/src/python/es_opensearch/test-es.py similarity index 100% rename from src/python/es-opensearch/test-es.py rename to src/python/es_opensearch/test-es.py