Skip to content

Commit

Permalink
More fine tuning for partition sizes and test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
FrancescAlted committed Jan 14, 2025
1 parent 286d665 commit 36bf1b4
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 63 deletions.
13 changes: 9 additions & 4 deletions src/blosc2/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ def get_cbuffer_sizes(src: object) -> tuple[(int, int, int)]:


# Compute a decent value for chunksize based on L3 and/or heuristics
def get_chunksize(blocksize, l3_minimum=2**20, l3_maximum=2**26):
def get_chunksize(blocksize, l3_minimum=16*2**20, l3_maximum=2**26):
# Find a decent default when L3 cannot be detected by cpuinfo
# Based mainly in heuristics
chunksize = blocksize
Expand Down Expand Up @@ -1406,10 +1406,15 @@ def compute_chunks_blocks( # noqa: C901
max_blocksize = blocksize
if platform.machine() == "x86_64":
# For modern Intel/AMD archs, experiments say to use half of the L2 cache size
max_blocksize = blosc2.cpu_info["l2_cache_size"] // 2
# max_blocksize = blosc2.cpu_info["l2_cache_size"] // 2
# New experiments say that using the 2x of the L1 cache size is also good
max_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 2
elif platform.system() == "Darwin" and "arm" in platform.machine():
# For Apple Silicon, experiments say we can use the full L1 data cache size
max_blocksize = blosc2.cpu_info["l1_data_cache_size"]
# For Apple Silicon, experiments say we can use 2x the L1 data cache size
max_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 2
else:
# For other archs, use 2x the L1 cache size
max_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 2
if "clevel" in cparams and cparams["clevel"] == 0:
# Experiments show that, when no compression is used, it is not a good idea
# to exceed half of private cache for the blocksize because speed suffers
Expand Down
54 changes: 16 additions & 38 deletions src/blosc2/lazyexpr.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def _compute_smaller_slice(larger_shape, smaller_shape, larger_slice):
if smaller_shape[i - diff_dims] != 1:
smaller_slice.append(larger_slice[i])
else:
smaller_slice.append(slice(None))
smaller_slice.append(slice(0, larger_shape[i]))

return tuple(smaller_slice)

Expand All @@ -451,7 +451,7 @@ def compute_smaller_slice(larger_shape, smaller_shape, larger_slice):
"""
diff_dims = len(larger_shape) - len(smaller_shape)
return tuple(
larger_slice[i] if smaller_shape[i - diff_dims] != 1 else slice(None)
larger_slice[i] if smaller_shape[i - diff_dims] != 1 else slice(0, larger_shape[i])
for i in range(diff_dims, len(larger_shape))
)

Expand Down Expand Up @@ -601,6 +601,7 @@ def validate_inputs(inputs: dict, out=None) -> tuple: # noqa: C901
return out.shape, None, None, True

inputs = [input for input in inputs.values() if hasattr(input, "shape") and input is not np]
# This will raise an exception if the input shapes are not compatible
shape = compute_broadcast_shape(inputs)

if not all(np.array_equal(shape, input.shape) for input in inputs):
Expand Down Expand Up @@ -833,14 +834,6 @@ def fill_chunk_operands( # noqa: C901
chunk_operands[key] = value[slice_]
continue

# TODO: broadcast is not in the fast path yet, so no need to check for it
# slice_shape = tuple(s.stop - s.start for s in slice_)
# if check_smaller_shape(value, shape, slice_shape):
# # We need to fetch the part of the value that broadcasts with the operand
# smaller_slice = compute_smaller_slice(shape, value.shape, slice_)
# chunk_operands[key] = value[smaller_slice]
# continue

if not full_chunk or not isinstance(value, blosc2.NDArray):
# The chunk is not a full one, or has padding, or is not a blosc2.NDArray,
# so we need to go the slow path
Expand Down Expand Up @@ -919,10 +912,9 @@ def fast_eval( # noqa: C901
)
iter_disk = all_ndarray and any_persisted

chunk_operands = {}
chunks_idx, nchunks = get_chunks_idx(shape, chunks)

# Iterate over the chunks and evaluate the expression
chunks_idx, nchunks = get_chunks_idx(shape, chunks)
chunk_operands = {}
for nchunk in range(nchunks):
coords = tuple(np.unravel_index(nchunk, chunks_idx))
slice_ = tuple(
Expand All @@ -933,8 +925,6 @@ def fast_eval( # noqa: C901
chunks_ = tuple(s.stop - s.start for s in slice_)

full_chunk = chunks_ == chunks
# To avoid overbooking memory, we need to clear the chunk_operands dict
chunk_operands.clear()
fill_chunk_operands(
operands, slice_, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands
)
Expand Down Expand Up @@ -1080,19 +1070,20 @@ def slices_eval( # noqa: C901
orig_slice = _slice

if chunks is None:
# Any out or operand with `chunks` will be used to get the chunks
operands_ = [o for o in operands.values() if hasattr(o, "chunks")]
# Either out, or operand with `chunks`, can be used to get the chunks
operands_ = [o for o in operands.values() if hasattr(o, "chunks") and o.shape == shape]
if out is not None and hasattr(out, "chunks"):
chunks = out.chunks
elif out is None or len(operands_) == 0:
elif len(operands_) > 0:
# Use the first operand with chunks to get the necessary chunking information
chunks = operands_[0].chunks
else:
# Typically, we enter here when using UDFs, and out is a NumPy array.
# Use operands to get the shape and chunks
# operand will be a 'fake' NDArray just to get the necessary chunking information
temp = blosc2.empty(shape, dtype=dtype)
chunks = temp.chunks
del temp
else:
# Typically, we enter here when using UDFs, and out is a NumPy array.
# Use operands to get the shape and chunks
chunks = operands_[0].chunks

# Get the indexes for chunks
chunks_idx, nchunks = get_chunks_idx(shape, chunks)
Expand All @@ -1109,9 +1100,10 @@ def slices_eval( # noqa: C901
# Now, use only the fields that are necessary for the sorting
dtype_ = np.dtype([(f, dtype_[f]) for f in _order])

# Iterate over the operands and get the chunks
chunk_operands = {}
for nchunk in range(nchunks):
coords = tuple(np.unravel_index(nchunk, chunks_idx))
chunk_operands = {}
# Calculate the shape of the (chunk) slice_ (specially at the end of the array)
slice_ = tuple(
slice(c * s, min((c + 1) * s, shape[i]))
Expand All @@ -1135,6 +1127,7 @@ def slices_eval( # noqa: C901
)
slice_shape = tuple(s.stop - s.start for s in slice_)
len_chunk = math.prod(slice_shape)

# Get the slice of each operand
for key, value in operands.items():
if np.isscalar(value):
Expand Down Expand Up @@ -1339,8 +1332,6 @@ def reduce_slices( # noqa: C901
# Choose the array with the largest shape as the reference for chunks
operand = max((o for o in operands.values() if hasattr(o, "chunks")), key=lambda x: len(x.shape))
chunks = operand.chunks
aligned = blosc2.are_partitions_aligned(shape, chunks, operand.blocks)
behaved = blosc2.are_partitions_behaved(shape, chunks, operand.blocks)

# Check if the partitions are aligned (i.e. all operands have the same shape,
# chunks and blocks, and have no padding). This will allow us to take the fast path.
Expand Down Expand Up @@ -1368,8 +1359,6 @@ def reduce_slices( # noqa: C901
# Iterate over the operands and get the chunks
chunks_idx, nchunks = get_chunks_idx(shape, chunks)
chunk_operands = {}

# Iterate over the operands and get the chunks
for nchunk in range(nchunks):
coords = tuple(np.unravel_index(nchunk, chunks_idx))
# Calculate the shape of the (chunk) slice_ (specially at the end of the array)
Expand Down Expand Up @@ -1397,8 +1386,6 @@ def reduce_slices( # noqa: C901
)
chunks_ = tuple(s.stop - s.start for s in slice_)

# To avoid overbooking memory, we need to clear the chunk_operands dict
chunk_operands.clear()
if _slice in (None, ()) and fast_path:
# Fast path
full_chunk = chunks_ == chunks
Expand All @@ -1407,8 +1394,6 @@ def reduce_slices( # noqa: C901
)
else:
# Get the slice of each operand
chunk_operands = {}

for key, value in operands.items():
if np.isscalar(value):
chunk_operands[key] = value
Expand All @@ -1421,13 +1406,6 @@ def reduce_slices( # noqa: C901
smaller_slice = compute_smaller_slice(operand.shape, value.shape, slice_)
chunk_operands[key] = value[smaller_slice]
continue
if isinstance(value, blosc2.NDArray):
if aligned and behaved:
# Decompress the whole chunk
buff = value.schunk.decompress_chunk(nchunk)
bsize = value.dtype.itemsize * math.prod(chunks_)
chunk_operands[key] = np.frombuffer(buff[:bsize], dtype=value.dtype).reshape(chunks_)
continue
chunk_operands[key] = value[slice_]

# Evaluate and reduce the expression using chunks of operands
Expand Down
4 changes: 2 additions & 2 deletions src/blosc2/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,9 @@ def wrapper(*args, **kwargs):
val = func(*new_args, **kwargs)
# Always return NumPy array(s)
if isinstance(val, tuple):
return tuple([v[:] if not isinstance(v, np.ndarray) else v for v in val])
return tuple([v[()] if not isinstance(v, np.ndarray) else v for v in val])
elif not isinstance(val, np.ndarray):
return val[:]
return val[()]
return val

return wrapper
87 changes: 68 additions & 19 deletions tests/ndarray/test_lazyexpr_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,19 @@ def test_where_one_param(array_fixture):
# Test with eval
res = expr.where(a1).compute()
nres = na1[ne.evaluate("na1**2 + na2**2 > 2 * na1 * na2 + 1")]
# On general chunked ndim arrays, we cannot guarantee the order of the results
if not (len(a1.shape) == 1 or a1.chunks == a1.shape):
res = np.sort(res)
nres = np.sort(nres)
np.testing.assert_allclose(res[:], nres)
# Test with getitem
sl = slice(100)
res = expr.where(a1)[sl]
np.testing.assert_allclose(res, nres[sl])
if len(a1.shape) == 1 or a1.chunks == a1.shape:
np.testing.assert_allclose(res, nres[sl])
else:
# In this case, we cannot compare results, only the length
assert len(res) == len(nres[sl])


# Test where indirectly via a condition in getitem in a NDArray
Expand All @@ -238,22 +246,53 @@ def test_where_getitem(array_fixture):
# Test with complete slice
res = sa1[a1**2 + a2**2 > 2 * a1 * a2 + 1].compute()
nres = nsa1[ne.evaluate("na1**2 + na2**2 > 2 * na1 * na2 + 1")]
np.testing.assert_allclose(res["a"][:], nres["a"])
np.testing.assert_allclose(res["b"][:], nres["b"])
resa = res["a"][:]
resb = res["b"][:]
nresa = nres["a"]
nresb = nres["b"]
# On general chunked ndim arrays, we cannot guarantee the order of the results
if not (len(a1.shape) == 1 or a1.chunks == a1.shape):
resa = np.sort(resa)
resb = np.sort(resb)
nresa = np.sort(nresa)
nresb = np.sort(nresb)
np.testing.assert_allclose(resa, nresa)
np.testing.assert_allclose(resb, nresb)

# string version
res = sa1["a**2 + b**2 > 2 * a * b + 1"].compute()
np.testing.assert_allclose(res["a"][:], nres["a"])
np.testing.assert_allclose(res["b"][:], nres["b"])
resa = res["a"][:]
resb = res["b"][:]
nresa = nres["a"]
nresb = nres["b"]
# On general chunked ndim arrays, we cannot guarantee the order of the results
if not (len(a1.shape) == 1 or a1.chunks == a1.shape):
resa = np.sort(resa)
resb = np.sort(resb)
nresa = np.sort(nresa)
nresb = np.sort(nresb)
np.testing.assert_allclose(resa, nresa)
np.testing.assert_allclose(resb, nresb)

# Test with partial slice
sl = slice(100)
res = sa1[a1**2 + a2**2 > 2 * a1 * a2 + 1][sl]
np.testing.assert_allclose(res["a"], nres[sl]["a"])
np.testing.assert_allclose(res["b"], nres[sl]["b"])
if len(a1.shape) == 1 or a1.chunks == a1.shape:
np.testing.assert_allclose(res["a"], nres[sl]["a"])
np.testing.assert_allclose(res["b"], nres[sl]["b"])
else:
# In this case, we cannot compare results, only the length
assert len(res["a"]) == len(nres[sl]["a"])
assert len(res["b"]) == len(nres[sl]["b"])
# string version
res = sa1["a**2 + b**2 > 2 * a * b + 1"][sl]
np.testing.assert_allclose(res["a"], nres[sl]["a"])
np.testing.assert_allclose(res["b"], nres[sl]["b"])
if len(a1.shape) == 1 or a1.chunks == a1.shape:
np.testing.assert_allclose(res["a"], nres[sl]["a"])
np.testing.assert_allclose(res["b"], nres[sl]["b"])
else:
# We cannot compare the results here, other than the length
assert len(res["a"]) == len(nres[sl]["a"])
assert len(res["b"]) == len(nres[sl]["b"])


# Test where indirectly via a condition in getitem in a NDField
Expand All @@ -274,13 +313,21 @@ def test_where_getitem_field(array_fixture, npflavor, lazystr):
expr = (a2 < 0) | ~((a1**2 > a2**2) & ~(a1 * a2 > 1))
assert expr.dtype == np.bool_
# Compute and check
res = a1[expr]
res = a1[expr][:]
nres = na1[ne.evaluate("(na2 < 0) | ~((na1**2 > na2**2) & ~(na1 * na2 > 1))")]
np.testing.assert_allclose(res[:], nres)
# On general chunked ndim arrays, we cannot guarantee the order of the results
if not (len(a1.shape) == 1 or a1.chunks == a1.shape):
res = np.sort(res)
nres = np.sort(nres)
np.testing.assert_allclose(res, nres)
# Test with getitem
sl = slice(100)
ressl = res[sl]
np.testing.assert_allclose(ressl, nres[sl])
if len(a1.shape) == 1 or a1.chunks == a1.shape:
np.testing.assert_allclose(ressl, nres[sl])
else:
# In this case, we cannot compare results, only the length
assert len(ressl) == len(nres[sl])


# Test where combined with a reduction
Expand All @@ -296,12 +343,15 @@ def test_where_reduction1(array_fixture):
# Test *implicit* where (a query) combined with a reduction
def test_where_reduction2(array_fixture):
sa1, sa2, nsa1, nsa2, a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture
axis = None if sa1.ndim == 1 else 1
# We have to use the original names in fields here
expr = sa1[f"(b * a.sum(axis={axis})) > 0"]
expr = sa1[f"(b * a.sum()) > 0"]
res = expr[:]
nres = nsa1[(na2 * na1.sum(axis=axis)) > 0]
np.testing.assert_allclose(res["a"], nres["a"])
nres = nsa1[(na2 * na1.sum()) > 0]
# On general chunked ndim arrays, we cannot guarantee the order of the results
if not (len(a1.shape) == 1 or a1.chunks == a1.shape):
np.testing.assert_allclose(np.sort(res["a"]), np.sort(nres["a"]))
else:
np.testing.assert_allclose(res["a"], nres["a"])


# More complex cases with where() calls combined with reductions,
Expand All @@ -326,9 +376,8 @@ def test_where_fusion2(array_fixture):
expr = a1**2 + a2**2 > 2 * a1 * a2 + 1
npexpr = ne.evaluate("na1**2 + na2**2 > 2 * na1 * na2 + 1")

axis = None if sa1.ndim == 1 else 1
res = expr.where(0.5, 0.2) + expr.where(0.3, 0.6).sum(axis=axis)
nres = np.where(npexpr, 0.5, 0.2) + np.where(npexpr, 0.3, 0.6).sum(axis=axis)
res = expr.where(0.5, 0.2) + expr.where(0.3, 0.6).sum()
nres = np.where(npexpr, 0.5, 0.2) + np.where(npexpr, 0.3, 0.6).sum()
np.testing.assert_allclose(res[:], nres)


Expand Down

0 comments on commit 36bf1b4

Please sign in to comment.