diff --git a/src/blosc2/core.py b/src/blosc2/core.py index 90d4a505..d948e92b 100644 --- a/src/blosc2/core.py +++ b/src/blosc2/core.py @@ -1229,7 +1229,7 @@ def get_cbuffer_sizes(src: object) -> tuple[(int, int, int)]: # Compute a decent value for chunksize based on L3 and/or heuristics -def get_chunksize(blocksize, l3_minimum=2**20, l3_maximum=2**26): +def get_chunksize(blocksize, l3_minimum=16*2**20, l3_maximum=2**26): # Find a decent default when L3 cannot be detected by cpuinfo # Based mainly in heuristics chunksize = blocksize @@ -1406,10 +1406,15 @@ def compute_chunks_blocks( # noqa: C901 max_blocksize = blocksize if platform.machine() == "x86_64": # For modern Intel/AMD archs, experiments say to use half of the L2 cache size - max_blocksize = blosc2.cpu_info["l2_cache_size"] // 2 + # max_blocksize = blosc2.cpu_info["l2_cache_size"] // 2 + # New experiments say that using the 2x of the L1 cache size is also good + max_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 2 elif platform.system() == "Darwin" and "arm" in platform.machine(): - # For Apple Silicon, experiments say we can use the full L1 data cache size - max_blocksize = blosc2.cpu_info["l1_data_cache_size"] + # For Apple Silicon, experiments say we can use 2x the L1 data cache size + max_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 2 + else: + # For other archs, use 2x the L1 cache size + max_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 2 if "clevel" in cparams and cparams["clevel"] == 0: # Experiments show that, when no compression is used, it is not a good idea # to exceed half of private cache for the blocksize because speed suffers diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index be7be1f7..019f1e19 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -439,7 +439,7 @@ def _compute_smaller_slice(larger_shape, smaller_shape, larger_slice): if smaller_shape[i - diff_dims] != 1: smaller_slice.append(larger_slice[i]) else: - smaller_slice.append(slice(None)) + smaller_slice.append(slice(0, larger_shape[i])) return tuple(smaller_slice) @@ -451,7 +451,7 @@ def compute_smaller_slice(larger_shape, smaller_shape, larger_slice): """ diff_dims = len(larger_shape) - len(smaller_shape) return tuple( - larger_slice[i] if smaller_shape[i - diff_dims] != 1 else slice(None) + larger_slice[i] if smaller_shape[i - diff_dims] != 1 else slice(0, larger_shape[i]) for i in range(diff_dims, len(larger_shape)) ) @@ -601,6 +601,7 @@ def validate_inputs(inputs: dict, out=None) -> tuple: # noqa: C901 return out.shape, None, None, True inputs = [input for input in inputs.values() if hasattr(input, "shape") and input is not np] + # This will raise an exception if the input shapes are not compatible shape = compute_broadcast_shape(inputs) if not all(np.array_equal(shape, input.shape) for input in inputs): @@ -833,14 +834,6 @@ def fill_chunk_operands( # noqa: C901 chunk_operands[key] = value[slice_] continue - # TODO: broadcast is not in the fast path yet, so no need to check for it - # slice_shape = tuple(s.stop - s.start for s in slice_) - # if check_smaller_shape(value, shape, slice_shape): - # # We need to fetch the part of the value that broadcasts with the operand - # smaller_slice = compute_smaller_slice(shape, value.shape, slice_) - # chunk_operands[key] = value[smaller_slice] - # continue - if not full_chunk or not isinstance(value, blosc2.NDArray): # The chunk is not a full one, or has padding, or is not a blosc2.NDArray, # so we need to go the slow path @@ -919,10 +912,9 @@ def fast_eval( # noqa: C901 ) iter_disk = all_ndarray and any_persisted - chunk_operands = {} - chunks_idx, nchunks = get_chunks_idx(shape, chunks) - # Iterate over the chunks and evaluate the expression + chunks_idx, nchunks = get_chunks_idx(shape, chunks) + chunk_operands = {} for nchunk in range(nchunks): coords = tuple(np.unravel_index(nchunk, chunks_idx)) slice_ = tuple( @@ -933,8 +925,6 @@ def fast_eval( # noqa: C901 chunks_ = tuple(s.stop - s.start for s in slice_) full_chunk = chunks_ == chunks - # To avoid overbooking memory, we need to clear the chunk_operands dict - chunk_operands.clear() fill_chunk_operands( operands, slice_, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands ) @@ -1080,19 +1070,20 @@ def slices_eval( # noqa: C901 orig_slice = _slice if chunks is None: - # Any out or operand with `chunks` will be used to get the chunks - operands_ = [o for o in operands.values() if hasattr(o, "chunks")] + # Either out, or operand with `chunks`, can be used to get the chunks + operands_ = [o for o in operands.values() if hasattr(o, "chunks") and o.shape == shape] if out is not None and hasattr(out, "chunks"): chunks = out.chunks - elif out is None or len(operands_) == 0: + elif len(operands_) > 0: + # Use the first operand with chunks to get the necessary chunking information + chunks = operands_[0].chunks + else: + # Typically, we enter here when using UDFs, and out is a NumPy array. + # Use operands to get the shape and chunks # operand will be a 'fake' NDArray just to get the necessary chunking information temp = blosc2.empty(shape, dtype=dtype) chunks = temp.chunks del temp - else: - # Typically, we enter here when using UDFs, and out is a NumPy array. - # Use operands to get the shape and chunks - chunks = operands_[0].chunks # Get the indexes for chunks chunks_idx, nchunks = get_chunks_idx(shape, chunks) @@ -1109,9 +1100,10 @@ def slices_eval( # noqa: C901 # Now, use only the fields that are necessary for the sorting dtype_ = np.dtype([(f, dtype_[f]) for f in _order]) + # Iterate over the operands and get the chunks + chunk_operands = {} for nchunk in range(nchunks): coords = tuple(np.unravel_index(nchunk, chunks_idx)) - chunk_operands = {} # Calculate the shape of the (chunk) slice_ (specially at the end of the array) slice_ = tuple( slice(c * s, min((c + 1) * s, shape[i])) @@ -1135,6 +1127,7 @@ def slices_eval( # noqa: C901 ) slice_shape = tuple(s.stop - s.start for s in slice_) len_chunk = math.prod(slice_shape) + # Get the slice of each operand for key, value in operands.items(): if np.isscalar(value): @@ -1339,8 +1332,6 @@ def reduce_slices( # noqa: C901 # Choose the array with the largest shape as the reference for chunks operand = max((o for o in operands.values() if hasattr(o, "chunks")), key=lambda x: len(x.shape)) chunks = operand.chunks - aligned = blosc2.are_partitions_aligned(shape, chunks, operand.blocks) - behaved = blosc2.are_partitions_behaved(shape, chunks, operand.blocks) # Check if the partitions are aligned (i.e. all operands have the same shape, # chunks and blocks, and have no padding). This will allow us to take the fast path. @@ -1368,8 +1359,6 @@ def reduce_slices( # noqa: C901 # Iterate over the operands and get the chunks chunks_idx, nchunks = get_chunks_idx(shape, chunks) chunk_operands = {} - - # Iterate over the operands and get the chunks for nchunk in range(nchunks): coords = tuple(np.unravel_index(nchunk, chunks_idx)) # Calculate the shape of the (chunk) slice_ (specially at the end of the array) @@ -1397,8 +1386,6 @@ def reduce_slices( # noqa: C901 ) chunks_ = tuple(s.stop - s.start for s in slice_) - # To avoid overbooking memory, we need to clear the chunk_operands dict - chunk_operands.clear() if _slice in (None, ()) and fast_path: # Fast path full_chunk = chunks_ == chunks @@ -1407,8 +1394,6 @@ def reduce_slices( # noqa: C901 ) else: # Get the slice of each operand - chunk_operands = {} - for key, value in operands.items(): if np.isscalar(value): chunk_operands[key] = value @@ -1421,13 +1406,6 @@ def reduce_slices( # noqa: C901 smaller_slice = compute_smaller_slice(operand.shape, value.shape, slice_) chunk_operands[key] = value[smaller_slice] continue - if isinstance(value, blosc2.NDArray): - if aligned and behaved: - # Decompress the whole chunk - buff = value.schunk.decompress_chunk(nchunk) - bsize = value.dtype.itemsize * math.prod(chunks_) - chunk_operands[key] = np.frombuffer(buff[:bsize], dtype=value.dtype).reshape(chunks_) - continue chunk_operands[key] = value[slice_] # Evaluate and reduce the expression using chunks of operands diff --git a/src/blosc2/proxy.py b/src/blosc2/proxy.py index de7ab818..e06a9c99 100644 --- a/src/blosc2/proxy.py +++ b/src/blosc2/proxy.py @@ -625,9 +625,9 @@ def wrapper(*args, **kwargs): val = func(*new_args, **kwargs) # Always return NumPy array(s) if isinstance(val, tuple): - return tuple([v[:] if not isinstance(v, np.ndarray) else v for v in val]) + return tuple([v[()] if not isinstance(v, np.ndarray) else v for v in val]) elif not isinstance(val, np.ndarray): - return val[:] + return val[()] return val return wrapper diff --git a/tests/ndarray/test_lazyexpr_fields.py b/tests/ndarray/test_lazyexpr_fields.py index 20e1f751..e20fdcd8 100644 --- a/tests/ndarray/test_lazyexpr_fields.py +++ b/tests/ndarray/test_lazyexpr_fields.py @@ -224,11 +224,19 @@ def test_where_one_param(array_fixture): # Test with eval res = expr.where(a1).compute() nres = na1[ne.evaluate("na1**2 + na2**2 > 2 * na1 * na2 + 1")] + # On general chunked ndim arrays, we cannot guarantee the order of the results + if not (len(a1.shape) == 1 or a1.chunks == a1.shape): + res = np.sort(res) + nres = np.sort(nres) np.testing.assert_allclose(res[:], nres) # Test with getitem sl = slice(100) res = expr.where(a1)[sl] - np.testing.assert_allclose(res, nres[sl]) + if len(a1.shape) == 1 or a1.chunks == a1.shape: + np.testing.assert_allclose(res, nres[sl]) + else: + # In this case, we cannot compare results, only the length + assert len(res) == len(nres[sl]) # Test where indirectly via a condition in getitem in a NDArray @@ -238,22 +246,53 @@ def test_where_getitem(array_fixture): # Test with complete slice res = sa1[a1**2 + a2**2 > 2 * a1 * a2 + 1].compute() nres = nsa1[ne.evaluate("na1**2 + na2**2 > 2 * na1 * na2 + 1")] - np.testing.assert_allclose(res["a"][:], nres["a"]) - np.testing.assert_allclose(res["b"][:], nres["b"]) + resa = res["a"][:] + resb = res["b"][:] + nresa = nres["a"] + nresb = nres["b"] + # On general chunked ndim arrays, we cannot guarantee the order of the results + if not (len(a1.shape) == 1 or a1.chunks == a1.shape): + resa = np.sort(resa) + resb = np.sort(resb) + nresa = np.sort(nresa) + nresb = np.sort(nresb) + np.testing.assert_allclose(resa, nresa) + np.testing.assert_allclose(resb, nresb) + # string version res = sa1["a**2 + b**2 > 2 * a * b + 1"].compute() - np.testing.assert_allclose(res["a"][:], nres["a"]) - np.testing.assert_allclose(res["b"][:], nres["b"]) + resa = res["a"][:] + resb = res["b"][:] + nresa = nres["a"] + nresb = nres["b"] + # On general chunked ndim arrays, we cannot guarantee the order of the results + if not (len(a1.shape) == 1 or a1.chunks == a1.shape): + resa = np.sort(resa) + resb = np.sort(resb) + nresa = np.sort(nresa) + nresb = np.sort(nresb) + np.testing.assert_allclose(resa, nresa) + np.testing.assert_allclose(resb, nresb) # Test with partial slice sl = slice(100) res = sa1[a1**2 + a2**2 > 2 * a1 * a2 + 1][sl] - np.testing.assert_allclose(res["a"], nres[sl]["a"]) - np.testing.assert_allclose(res["b"], nres[sl]["b"]) + if len(a1.shape) == 1 or a1.chunks == a1.shape: + np.testing.assert_allclose(res["a"], nres[sl]["a"]) + np.testing.assert_allclose(res["b"], nres[sl]["b"]) + else: + # In this case, we cannot compare results, only the length + assert len(res["a"]) == len(nres[sl]["a"]) + assert len(res["b"]) == len(nres[sl]["b"]) # string version res = sa1["a**2 + b**2 > 2 * a * b + 1"][sl] - np.testing.assert_allclose(res["a"], nres[sl]["a"]) - np.testing.assert_allclose(res["b"], nres[sl]["b"]) + if len(a1.shape) == 1 or a1.chunks == a1.shape: + np.testing.assert_allclose(res["a"], nres[sl]["a"]) + np.testing.assert_allclose(res["b"], nres[sl]["b"]) + else: + # We cannot compare the results here, other than the length + assert len(res["a"]) == len(nres[sl]["a"]) + assert len(res["b"]) == len(nres[sl]["b"]) # Test where indirectly via a condition in getitem in a NDField @@ -274,13 +313,21 @@ def test_where_getitem_field(array_fixture, npflavor, lazystr): expr = (a2 < 0) | ~((a1**2 > a2**2) & ~(a1 * a2 > 1)) assert expr.dtype == np.bool_ # Compute and check - res = a1[expr] + res = a1[expr][:] nres = na1[ne.evaluate("(na2 < 0) | ~((na1**2 > na2**2) & ~(na1 * na2 > 1))")] - np.testing.assert_allclose(res[:], nres) + # On general chunked ndim arrays, we cannot guarantee the order of the results + if not (len(a1.shape) == 1 or a1.chunks == a1.shape): + res = np.sort(res) + nres = np.sort(nres) + np.testing.assert_allclose(res, nres) # Test with getitem sl = slice(100) ressl = res[sl] - np.testing.assert_allclose(ressl, nres[sl]) + if len(a1.shape) == 1 or a1.chunks == a1.shape: + np.testing.assert_allclose(ressl, nres[sl]) + else: + # In this case, we cannot compare results, only the length + assert len(ressl) == len(nres[sl]) # Test where combined with a reduction @@ -296,12 +343,15 @@ def test_where_reduction1(array_fixture): # Test *implicit* where (a query) combined with a reduction def test_where_reduction2(array_fixture): sa1, sa2, nsa1, nsa2, a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture - axis = None if sa1.ndim == 1 else 1 # We have to use the original names in fields here - expr = sa1[f"(b * a.sum(axis={axis})) > 0"] + expr = sa1[f"(b * a.sum()) > 0"] res = expr[:] - nres = nsa1[(na2 * na1.sum(axis=axis)) > 0] - np.testing.assert_allclose(res["a"], nres["a"]) + nres = nsa1[(na2 * na1.sum()) > 0] + # On general chunked ndim arrays, we cannot guarantee the order of the results + if not (len(a1.shape) == 1 or a1.chunks == a1.shape): + np.testing.assert_allclose(np.sort(res["a"]), np.sort(nres["a"])) + else: + np.testing.assert_allclose(res["a"], nres["a"]) # More complex cases with where() calls combined with reductions, @@ -326,9 +376,8 @@ def test_where_fusion2(array_fixture): expr = a1**2 + a2**2 > 2 * a1 * a2 + 1 npexpr = ne.evaluate("na1**2 + na2**2 > 2 * na1 * na2 + 1") - axis = None if sa1.ndim == 1 else 1 - res = expr.where(0.5, 0.2) + expr.where(0.3, 0.6).sum(axis=axis) - nres = np.where(npexpr, 0.5, 0.2) + np.where(npexpr, 0.3, 0.6).sum(axis=axis) + res = expr.where(0.5, 0.2) + expr.where(0.3, 0.6).sum() + nres = np.where(npexpr, 0.5, 0.2) + np.where(npexpr, 0.3, 0.6).sum() np.testing.assert_allclose(res[:], nres)