From e27b0c10e3d29ca7d9b47b3a0a4f6af9df250598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Novotn=C3=BD?= Date: Sun, 23 Jun 2024 16:49:25 +0200 Subject: [PATCH] feat(#41): tackled thread pool starving situations Automatic re-scheduling and transaction pipeline recreation. --- documentation/user/en/operate/configure.md | 7 ------ .../user/en/operate/reference/metrics.md | 12 +++++----- .../core/transaction/TransactionManager.java | 24 ++++++++++--------- .../CatalogWriteAheadLogIntegrationTest.java | 12 ++++++---- .../main/resources/evita-configuration.yaml | 1 - .../store/wal/CatalogWriteAheadLog.java | 2 +- .../main/resources/evita-configuration.yaml | 1 - 7 files changed, 27 insertions(+), 32 deletions(-) diff --git a/documentation/user/en/operate/configure.md b/documentation/user/en/operate/configure.md index b289027a50..1f137cbe3a 100644 --- a/documentation/user/en/operate/configure.md +++ b/documentation/user/en/operate/configure.md @@ -39,7 +39,6 @@ transaction: # [see Transaction configurati transactionMemoryRegionCount: 256 walFileSizeBytes: 16MB walFileCountKept: 8 - maxQueueSize: 1K flushFrequencyInMillis: 1s cache: # [see Cache configuration](#cache-configuration) @@ -489,12 +488,6 @@ This section contains configuration options for the storage layer of the databas

Number of WAL files to keep. Increase this number in combination with `walFileSizeBytes` if you want to keep longer history of changes.

-
maxQueueSize
-
-

**Default:** `1K`

-

Size of the catalog queue for parallel transaction. If there are more transaction than the number of free - threads in the pool, the transaction are queued. If the queue is full, the transaction is rejected.

-
flushFrequencyInMillis

**Default:** `1s`

diff --git a/documentation/user/en/operate/reference/metrics.md b/documentation/user/en/operate/reference/metrics.md index d899d75427..75842d9cd2 100644 --- a/documentation/user/en/operate/reference/metrics.md +++ b/documentation/user/en/operate/reference/metrics.md @@ -3,12 +3,6 @@

Labels used in metrics

-
taskName
-
N/A: N/A
-
fileType
-
File type: N/A
-
name
-
Logical file name: N/A
fileType
File type: N/A
name
@@ -99,6 +93,12 @@
Logical file name: N/A
recordType
Record type: N/A
+
taskName
+
N/A: N/A
+
fileType
+
File type: N/A
+
name
+
Logical file name: N/A
diff --git a/evita_engine/src/main/java/io/evitadb/core/transaction/TransactionManager.java b/evita_engine/src/main/java/io/evitadb/core/transaction/TransactionManager.java index 16f1708b81..07c2d3aab4 100644 --- a/evita_engine/src/main/java/io/evitadb/core/transaction/TransactionManager.java +++ b/evita_engine/src/main/java/io/evitadb/core/transaction/TransactionManager.java @@ -470,17 +470,19 @@ public void updateLastFinalizedCatalog(@Nonnull Catalog lastFinalizedCatalog, lo */ public void notifyCatalogPresentInLiveView(@Nonnull Catalog livingCatalog) { final Catalog theLivingCatalog = getLivingCatalog(); - Assert.isPremiseValid( - theLivingCatalog.getVersion() < livingCatalog.getVersion(), - "Catalog versions must be in order! " + - "Expected " + theLivingCatalog.getVersion() + ", got " + livingCatalog.getVersion() + "." - ); - final long theLastFinalizedVersion = getLastFinalizedCatalogVersion(); - Assert.isPremiseValid( - theLastFinalizedVersion >= livingCatalog.getVersion(), - "Catalog versions must be in order! " + - "Expected " + theLastFinalizedVersion + ", got " + livingCatalog.getVersion() + "." - ); + if (livingCatalog.getVersion() > 0L) { + Assert.isPremiseValid( + theLivingCatalog.getVersion() < livingCatalog.getVersion(), + "Catalog versions must be in order! " + + "Expected " + theLivingCatalog.getVersion() + ", got " + livingCatalog.getVersion() + "." + ); + final long theLastFinalizedVersion = getLastFinalizedCatalogVersion(); + Assert.isPremiseValid( + theLastFinalizedVersion >= livingCatalog.getVersion(), + "Catalog versions must be in order! " + + "Expected " + theLastFinalizedVersion + ", got " + livingCatalog.getVersion() + "." + ); + } this.lastAssignedCatalogVersion.updateAndGet(current -> Math.max(current, livingCatalog.getVersion())); this.livingCatalog.set(livingCatalog); diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/wal/CatalogWriteAheadLogIntegrationTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/wal/CatalogWriteAheadLogIntegrationTest.java index e61eeffb35..50a1c1ddae 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/wal/CatalogWriteAheadLogIntegrationTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/wal/CatalogWriteAheadLogIntegrationTest.java @@ -251,10 +251,12 @@ void shouldCorrectlyReportFirstAvailableTimestamp() { final OffsetDateTime initialTimestamp = OffsetDateTime.now(); writeWal(bigOffHeapMemoryManager, transactionSizes, initialTimestamp); + this.wal.walProcessedUntil(Long.MAX_VALUE); + this.wal.removeWalFiles(); - assertEquals(2, offsetConsumer.getCatalogVersions().size()); - assertEquals(initialTimestamp.plusMinutes(1), offsetConsumer.getCatalogVersions().get(0)); - assertEquals(initialTimestamp.plusMinutes(2), offsetConsumer.getCatalogVersions().get(1)); + // only one call would occur with the latest version possible + assertEquals(1, offsetConsumer.getCatalogVersions().size()); + assertEquals(3, offsetConsumer.getCatalogVersions().get(0)); } @Nonnull @@ -271,7 +273,7 @@ private CatalogWriteAheadLog createCatalogWriteAheadLogOfSmallSize() { .build(), Mockito.mock(Scheduler.class), offsetConsumer, - null + firstActiveCatalogVersion -> {} ); } @@ -286,7 +288,7 @@ private CatalogWriteAheadLog createCatalogWriteAheadLogOfLargeEnoughSize() { TransactionOptions.builder().walFileSizeBytes(Long.MAX_VALUE).build(), Mockito.mock(Scheduler.class), offsetConsumer, - null + firstActiveCatalogVersion -> {} ); } diff --git a/evita_server/src/main/resources/evita-configuration.yaml b/evita_server/src/main/resources/evita-configuration.yaml index b7bded418d..395e5f44b6 100644 --- a/evita_server/src/main/resources/evita-configuration.yaml +++ b/evita_server/src/main/resources/evita-configuration.yaml @@ -38,7 +38,6 @@ transaction: transactionMemoryRegionCount: ${transaction.memoryRegionCount:256} walFileSizeBytes: ${transaction.walFilSize:16M} walFileCountKept: ${transaction.walFileCountKept:8} - maxQueueSize: ${transaction.maxQueueSize:1024} flushFrequencyInMillis: ${transaction.flushFrequencyInMillis:1000} cache: diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/CatalogWriteAheadLog.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/CatalogWriteAheadLog.java index 6caa3878e8..bf24347192 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/CatalogWriteAheadLog.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/CatalogWriteAheadLog.java @@ -1036,7 +1036,7 @@ MutationSupplier createSupplier(long startCatalogVersion, @Nullable Long request /** * Removes the obsolete WAL files from the catalog storage path. */ - private long removeWalFiles() { + long removeWalFiles() { synchronized (this.pendingRemovals) { final long catalogVersion = this.processedCatalogVersion.get(); final Set toRemove = new HashSet<>(64); diff --git a/evita_test_support/src/main/resources/evita-configuration.yaml b/evita_test_support/src/main/resources/evita-configuration.yaml index 4903599d8a..1d1c21d85d 100644 --- a/evita_test_support/src/main/resources/evita-configuration.yaml +++ b/evita_test_support/src/main/resources/evita-configuration.yaml @@ -38,7 +38,6 @@ transaction: transactionMemoryRegionCount: ${transaction.memoryRegionCount:256} walFileSizeBytes: ${transaction.walFilSize:16M} walFileCountKept: ${transaction.walFileCountKept:8} - maxQueueSize: ${transaction.maxQueueSize:1024} flushFrequencyInMillis: ${transaction.flushFrequencyInMillis:1000} cache: