Skip to content

Commit

Permalink
Chore/upgrade datafusion 44 (#973)
Browse files Browse the repository at this point in the history
* Bump DataFusion version to 44

* Trait definition for plan properties now returns LexOrdering

* find_df_window_func was removed upstream

* Prepare and Execute variants were removed from LogicalPlan

* Substrait functions now take SessionState instead of SessionContext

* Remove unused import

* RuntimeConfig is now deprecated

* Switch from RuntimeConfig to RuntimeEnvBuilder

* Update return types on unit tests

* DF 44 changes the execution plan properties to have boundedness and emission type

* Initcap now returns stringview

* Bump datafusion version in example
  • Loading branch information
timsaucer authored Jan 9, 2025
1 parent 4b262be commit db1bc62
Show file tree
Hide file tree
Showing 19 changed files with 529 additions and 487 deletions.
783 changes: 408 additions & 375 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ tokio = { version = "1.41", features = ["macros", "rt", "rt-multi-thread", "sync
pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] }
pyo3-async-runtimes = { version = "0.22", features = ["tokio-runtime"]}
arrow = { version = "53", features = ["pyarrow"] }
datafusion = { version = "43.0.0", features = ["pyarrow", "avro", "unicode_expressions"] }
datafusion-substrait = { version = "43.0.0", optional = true }
datafusion-proto = { version = "43.0.0" }
datafusion-ffi = { version = "43.0.0" }
datafusion-functions-window-common = { version = "43.0.0" }
datafusion = { version = "44.0.0", features = ["pyarrow", "avro", "unicode_expressions"] }
datafusion-substrait = { version = "44.0.0", optional = true }
datafusion-proto = { version = "44.0.0" }
datafusion-ffi = { version = "44.0.0" }
prost = "0.13" # keep in line with `datafusion-substrait`
uuid = { version = "1.11", features = ["v4"] }
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ It is possible to configure runtime (memory and disk settings) and configuration

```python
runtime = (
RuntimeConfig()
RuntimeEnvBuilder()
.with_disk_manager_os()
.with_fair_spill_pool(10000000)
)
Expand Down
6 changes: 4 additions & 2 deletions benchmarks/db-benchmark/groupby-datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from datafusion import (
col,
functions as f,
RuntimeConfig,
RuntimeEnvBuilder,
SessionConfig,
SessionContext,
)
Expand Down Expand Up @@ -85,7 +85,9 @@ def execute(df):

# create a session context with explicit runtime and config settings
runtime = (
RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(64 * 1024 * 1024 * 1024)
RuntimeEnvBuilder()
.with_disk_manager_os()
.with_fair_spill_pool(64 * 1024 * 1024 * 1024)
)
config = (
SessionConfig()
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/tpch/tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def bench(data_path, query_path):

# create context
# runtime = (
# RuntimeConfig()
# RuntimeEnvBuilder()
# .with_disk_manager_os()
# .with_fair_spill_pool(10000000)
# )
Expand Down
8 changes: 4 additions & 4 deletions docs/source/user-guide/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ Configuration
=============

Let's look at how we can configure DataFusion. When creating a :py:class:`~datafusion.context.SessionContext`, you can pass in
a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.context.RuntimeConfig` object. These two cover a wide range of options.
a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.context.RuntimeEnvBuilder` object. These two cover a wide range of options.

.. code-block:: python
from datafusion import RuntimeConfig, SessionConfig, SessionContext
from datafusion import RuntimeEnvBuilder, SessionConfig, SessionContext
# create a session context with default settings
ctx = SessionContext()
print(ctx)
# create a session context with explicit runtime and config settings
runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
runtime = RuntimeEnvBuilder().with_disk_manager_os().with_fair_spill_pool(10000000)
config = (
SessionConfig()
.with_create_default_catalog_and_schema(True)
Expand All @@ -48,4 +48,4 @@ a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.conte
You can read more about available :py:class:`~datafusion.context.SessionConfig` options in the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
and about :code:`RuntimeConfig` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeConfig.html>`_.
and about :code:`RuntimeEnvBuilder` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnvBuilder.html>`_.
4 changes: 2 additions & 2 deletions examples/create-context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
# specific language governing permissions and limitations
# under the License.

from datafusion import RuntimeConfig, SessionConfig, SessionContext
from datafusion import RuntimeEnvBuilder, SessionConfig, SessionContext

# create a session context with default settings
ctx = SessionContext()
print(ctx)

# create a session context with explicit runtime and config settings
runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
runtime = RuntimeEnvBuilder().with_disk_manager_os().with_fair_spill_pool(10000000)
config = (
SessionConfig()
.with_create_default_catalog_and_schema(True)
Expand Down
4 changes: 2 additions & 2 deletions examples/ffi-table-provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ version = "0.1.0"
edition = "2021"

[dependencies]
datafusion = { version = "43.0.0" }
datafusion-ffi = { version = "43.0.0" }
datafusion = { version = "44.0.0" }
datafusion-ffi = { version = "44.0.0" }
pyo3 = { version = "0.22.6", features = ["extension-module", "abi3", "abi3-py38"] }
arrow = { version = "53.2.0" }
arrow-array = { version = "53.2.0" }
Expand Down
4 changes: 2 additions & 2 deletions python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from .context import (
SessionContext,
SessionConfig,
RuntimeConfig,
RuntimeEnvBuilder,
SQLOptions,
)

Expand Down Expand Up @@ -66,7 +66,7 @@
"SessionContext",
"SessionConfig",
"SQLOptions",
"RuntimeConfig",
"RuntimeEnvBuilder",
"Expr",
"ScalarUDF",
"WindowFrame",
Expand Down
55 changes: 33 additions & 22 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from __future__ import annotations

from ._internal import SessionConfig as SessionConfigInternal
from ._internal import RuntimeConfig as RuntimeConfigInternal
from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
from ._internal import SQLOptions as SQLOptionsInternal
from ._internal import SessionContext as SessionContextInternal

Expand Down Expand Up @@ -265,56 +265,58 @@ def set(self, key: str, value: str) -> SessionConfig:
return self


class RuntimeConfig:
class RuntimeEnvBuilder:
"""Runtime configuration options."""

def __init__(self) -> None:
"""Create a new :py:class:`RuntimeConfig` with default values."""
self.config_internal = RuntimeConfigInternal()
"""Create a new :py:class:`RuntimeEnvBuilder` with default values."""
self.config_internal = RuntimeEnvBuilderInternal()

def with_disk_manager_disabled(self) -> RuntimeConfig:
def with_disk_manager_disabled(self) -> RuntimeEnvBuilder:
"""Disable the disk manager, attempts to create temporary files will error.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
"""
self.config_internal = self.config_internal.with_disk_manager_disabled()
return self

def with_disk_manager_os(self) -> RuntimeConfig:
def with_disk_manager_os(self) -> RuntimeEnvBuilder:
"""Use the operating system's temporary directory for disk manager.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
"""
self.config_internal = self.config_internal.with_disk_manager_os()
return self

def with_disk_manager_specified(self, *paths: str | pathlib.Path) -> RuntimeConfig:
def with_disk_manager_specified(
self, *paths: str | pathlib.Path
) -> RuntimeEnvBuilder:
"""Use the specified paths for the disk manager's temporary files.
Args:
paths: Paths to use for the disk manager's temporary files.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
"""
paths_list = [str(p) for p in paths]
self.config_internal = self.config_internal.with_disk_manager_specified(
paths_list
)
return self

def with_unbounded_memory_pool(self) -> RuntimeConfig:
def with_unbounded_memory_pool(self) -> RuntimeEnvBuilder:
"""Use an unbounded memory pool.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
"""
self.config_internal = self.config_internal.with_unbounded_memory_pool()
return self

def with_fair_spill_pool(self, size: int) -> RuntimeConfig:
def with_fair_spill_pool(self, size: int) -> RuntimeEnvBuilder:
"""Use a fair spill pool with the specified size.
This pool works best when you know beforehand the query has multiple spillable
Expand All @@ -335,16 +337,16 @@ def with_fair_spill_pool(self, size: int) -> RuntimeConfig:
size: Size of the memory pool in bytes.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
Examples usage::
config = RuntimeConfig().with_fair_spill_pool(1024)
config = RuntimeEnvBuilder().with_fair_spill_pool(1024)
"""
self.config_internal = self.config_internal.with_fair_spill_pool(size)
return self

def with_greedy_memory_pool(self, size: int) -> RuntimeConfig:
def with_greedy_memory_pool(self, size: int) -> RuntimeEnvBuilder:
"""Use a greedy memory pool with the specified size.
This pool works well for queries that do not need to spill or have a single
Expand All @@ -355,32 +357,39 @@ def with_greedy_memory_pool(self, size: int) -> RuntimeConfig:
size: Size of the memory pool in bytes.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
Example usage::
config = RuntimeConfig().with_greedy_memory_pool(1024)
config = RuntimeEnvBuilder().with_greedy_memory_pool(1024)
"""
self.config_internal = self.config_internal.with_greedy_memory_pool(size)
return self

def with_temp_file_path(self, path: str | pathlib.Path) -> RuntimeConfig:
def with_temp_file_path(self, path: str | pathlib.Path) -> RuntimeEnvBuilder:
"""Use the specified path to create any needed temporary files.
Args:
path: Path to use for temporary files.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
Example usage::
config = RuntimeConfig().with_temp_file_path("/tmp")
config = RuntimeEnvBuilder().with_temp_file_path("/tmp")
"""
self.config_internal = self.config_internal.with_temp_file_path(str(path))
return self


@deprecated("Use `RuntimeEnvBuilder` instead.")
class RuntimeConfig(RuntimeEnvBuilder):
"""See `RuntimeEnvBuilder`."""

pass


class SQLOptions:
"""Options to be used when performing SQL queries."""

Expand Down Expand Up @@ -454,7 +463,9 @@ class SessionContext:
"""

def __init__(
self, config: SessionConfig | None = None, runtime: RuntimeConfig | None = None
self,
config: SessionConfig | None = None,
runtime: RuntimeEnvBuilder | None = None,
) -> None:
"""Main interface for executing queries with DataFusion.
Expand Down
10 changes: 5 additions & 5 deletions python/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from datafusion import (
DataFrame,
RuntimeConfig,
RuntimeEnvBuilder,
SessionConfig,
SessionContext,
SQLOptions,
Expand All @@ -43,7 +43,7 @@ def test_create_context_session_config_only():


def test_create_context_runtime_config_only():
SessionContext(runtime=RuntimeConfig())
SessionContext(runtime=RuntimeEnvBuilder())


@pytest.mark.parametrize("path_to_str", (True, False))
Expand All @@ -54,7 +54,7 @@ def test_runtime_configs(tmp_path, path_to_str):
path1 = str(path1) if path_to_str else path1
path2 = str(path2) if path_to_str else path2

runtime = RuntimeConfig().with_disk_manager_specified(path1, path2)
runtime = RuntimeEnvBuilder().with_disk_manager_specified(path1, path2)
config = SessionConfig().with_default_catalog_and_schema("foo", "bar")
ctx = SessionContext(config, runtime)
assert ctx is not None
Expand All @@ -67,7 +67,7 @@ def test_runtime_configs(tmp_path, path_to_str):
def test_temporary_files(tmp_path, path_to_str):
path = str(tmp_path) if path_to_str else tmp_path

runtime = RuntimeConfig().with_temp_file_path(path)
runtime = RuntimeEnvBuilder().with_temp_file_path(path)
config = SessionConfig().with_default_catalog_and_schema("foo", "bar")
ctx = SessionContext(config, runtime)
assert ctx is not None
Expand All @@ -77,7 +77,7 @@ def test_temporary_files(tmp_path, path_to_str):


def test_create_context_with_all_valid_args():
runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
runtime = RuntimeEnvBuilder().with_disk_manager_os().with_fair_spill_pool(10000000)
config = (
SessionConfig()
.with_create_default_catalog_and_schema(True)
Expand Down
18 changes: 12 additions & 6 deletions python/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@ def test_lit_arith(df):
result = df.collect()
assert len(result) == 1
result = result[0]

assert result.column(0) == pa.array([5, 6, 7])
assert result.column(1) == pa.array(["Hello!", "World!", "!!"])
assert result.column(1) == pa.array(
["Hello!", "World!", "!!"], type=pa.string_view()
)


def test_math_functions():
Expand Down Expand Up @@ -661,9 +664,12 @@ def test_array_function_obj_tests(stmt, py_expr):
),
(
f.concat(column("a").cast(pa.string()), literal("?")),
pa.array(["Hello?", "World?", "!?"]),
pa.array(["Hello?", "World?", "!?"], type=pa.string_view()),
),
(
f.initcap(column("c")),
pa.array(["Hello ", " World ", " !"], type=pa.string_view()),
),
(f.initcap(column("c")), pa.array(["Hello ", " World ", " !"])),
(f.left(column("a"), literal(3)), pa.array(["Hel", "Wor", "!"])),
(f.length(column("c")), pa.array([6, 7, 2], type=pa.int32())),
(f.lower(column("a")), pa.array(["hello", "world", "!"])),
Expand Down Expand Up @@ -871,8 +877,8 @@ def test_temporal_functions(df):
result = df.collect()
assert len(result) == 1
result = result[0]
assert result.column(0) == pa.array([12, 6, 7], type=pa.float64())
assert result.column(1) == pa.array([2022, 2027, 2020], type=pa.float64())
assert result.column(0) == pa.array([12, 6, 7], type=pa.int32())
assert result.column(1) == pa.array([2022, 2027, 2020], type=pa.int32())
assert result.column(2) == pa.array(
[datetime(2022, 12, 1), datetime(2027, 6, 1), datetime(2020, 7, 1)],
type=pa.timestamp("us"),
Expand Down Expand Up @@ -904,7 +910,7 @@ def test_temporal_functions(df):
assert result.column(9) == pa.array(
[datetime(2023, 9, 7, 5, 6, 14, 523952)] * 3, type=pa.timestamp("us")
)
assert result.column(10) == pa.array([31, 26, 2], type=pa.float64())
assert result.column(10) == pa.array([31, 26, 2], type=pa.int32())


def test_arrow_cast(df):
Expand Down
Loading

0 comments on commit db1bc62

Please sign in to comment.