Skip to content

Commit

Permalink
feat(#41): tackled thread pool starving situations
Browse files Browse the repository at this point in the history
Also corrected the backup logic, that takes heavy indexing into an account.
  • Loading branch information
novoj committed Jun 20, 2024
1 parent de2e49b commit 34bff88
Show file tree
Hide file tree
Showing 16 changed files with 595 additions and 348 deletions.
120 changes: 15 additions & 105 deletions evita_engine/src/main/java/io/evitadb/core/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@
import io.evitadb.core.transaction.memory.TransactionalLayerMaintainer;
import io.evitadb.core.transaction.memory.TransactionalLayerProducer;
import io.evitadb.core.transaction.memory.TransactionalObjectVersion;
import io.evitadb.core.transaction.stage.AbstractTransactionStage;
import io.evitadb.core.transaction.stage.ConflictResolutionTransactionStage.ConflictResolutionTransactionTask;
import io.evitadb.core.transaction.stage.TrunkIncorporationTransactionStage;
import io.evitadb.dataType.PaginatedList;
import io.evitadb.exception.GenericEvitaInternalError;
import io.evitadb.index.CatalogIndex;
Expand Down Expand Up @@ -132,8 +129,6 @@
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand Down Expand Up @@ -745,7 +740,7 @@ public CatalogContract replace(@Nonnull CatalogSchemaContract updatedSchema, @No
)
).toList();

advanceVersion(catalogVersionAfterRename);
this.transactionManager.advanceVersion(catalogVersionAfterRename);
return new Catalog(
catalogVersionAfterRename,
catalogState,
Expand Down Expand Up @@ -799,6 +794,7 @@ public boolean goLive() {
this.tracingContext
);

this.transactionManager.advanceVersion(newCatalog.getVersion());
this.newCatalogVersionConsumer.accept(newCatalog);

// emit the event
Expand All @@ -815,12 +811,15 @@ public void processWriteAheadLog(@Nonnull Consumer<CatalogContract> updatedCatal
this.persistenceService.getFirstNonProcessedTransactionInWal(getVersion())
.ifPresentOrElse(
transactionMutation -> {
final TrunkIncorporationTransactionStage trunkStage = getTrunkIncorporationStage();
final long start = System.nanoTime();
trunkStage.processTransactions(transactionMutation.getCatalogVersion(), Long.MAX_VALUE, false);
final Catalog catalog = this.transactionManager.processWriteAheadLog(
this.evitaConfiguration.server(), this.evitaConfiguration.transaction(),
this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer,
transactionMutation.getCatalogVersion(), Long.MAX_VALUE, false
);
this.persistenceService.purgeAllObsoleteFiles();
log.info("WAL of `{}` catalog was processed in {}.", this.getName(), StringUtils.formatNano(System.nanoTime() - start));
updatedCatalogConsumer.accept(trunkStage.getCatalog());
updatedCatalogConsumer.accept(catalog);
},
() -> updatedCatalogConsumer.accept(this)
);
Expand Down Expand Up @@ -1095,32 +1094,13 @@ public void commitWal(
@Nonnull CompletableFuture<Long> transactionFinalizationFuture
) {
try {
transactionManager.getTransactionalPublisher(
this, this.evitaConfiguration.server(), this.evitaConfiguration.transaction(),
this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer
)
.offer(
new ConflictResolutionTransactionTask(
getName(),
transactionId,
walPersistenceService.getMutationCount(),
walPersistenceService.getMutationSizeInBytes(),
walPersistenceService.getWalReference(),
commitBehaviour,
transactionFinalizationFuture
),
(subscriber, task) -> {
transactionManager.invalidateTransactionalPublisher();
transactionFinalizationFuture.completeExceptionally(
new TransactionException(
"Conflict resolution transaction queue is full! Transaction cannot be processed at the moment."
)
);
return false;
}
);
this.transactionManager.commit(
this.evitaConfiguration.server(), this.evitaConfiguration.transaction(),
this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer,
transactionId, commitBehaviour, walPersistenceService, transactionFinalizationFuture
);
} catch (Exception e) {
transactionManager.invalidateTransactionalPublisher();
this.transactionManager.invalidateTransactionalPublisher();
if (e.getCause() instanceof TransactionException txException) {
throw txException;
} else {
Expand Down Expand Up @@ -1217,23 +1197,7 @@ public long appendWalAndDiscard(
* This method is used to indicate that a catalog is currently available in the live view.
*/
public void notifyCatalogPresentInLiveView() {
SubmissionPublisher<?> current = transactionManager.getTransactionalPublisher(
this, this.evitaConfiguration.server(), this.evitaConfiguration.transaction(),
this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer
);
while (current != null && !current.isClosed()) {
//noinspection unchecked
final List<Subscriber<?>> subscribers = (List<Subscriber<?>>) current.getSubscribers();
Assert.isPremiseValid(current.isClosed() || subscribers.size() == 1, "Only one subscriber is expected, " + subscribers.size() + " found!");
for (Subscriber<?> subscriber : subscribers) {
if (subscriber instanceof AbstractTransactionStage<?, ?> stage) {
stage.updateCatalogReference(this);
current = stage;
} else {
current = null;
}
}
}
this.transactionManager.notifyCatalogPresentInLiveView(this);
}

/**
Expand Down Expand Up @@ -1311,60 +1275,6 @@ void flush() {
PRIVATE METHODS
*/

/**
* Retrieves the TrunkIncorporationTransactionStage from the transactional pipeline.
*
* @return The TrunkIncorporationTransactionStage.
*/
@Nonnull
private TrunkIncorporationTransactionStage getTrunkIncorporationStage() {
SubmissionPublisher<?> current = transactionManager.getTransactionalPublisher(
this, this.evitaConfiguration.server(), this.evitaConfiguration.transaction(),
this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer
);
while (current != null && !current.isClosed()) {
//noinspection unchecked
final List<Subscriber<?>> subscribers = (List<Subscriber<?>>) current.getSubscribers();
Assert.isPremiseValid(current.isClosed() || subscribers.size() == 1, "Only one subscriber is expected, " + subscribers.size() + " found!");
for (Subscriber<?> subscriber : subscribers) {
if (subscriber instanceof TrunkIncorporationTransactionStage stage) {
return stage;
} else if (subscriber instanceof AbstractTransactionStage<?, ?> transactionStage) {
current = transactionStage;
} else {
current = null;
}
}
}
throw new GenericEvitaInternalError(
"TrunkIncorporationTransactionStage is not present in the transactional pipeline!"
);
}

/**
* Informs transactional pipeline jobs that the catalog version has advanced due to external reasons (such as
* catalog renaming).
*/
private void advanceVersion(long catalogVersion) {
SubmissionPublisher<?> current = transactionManager.getTransactionalPublisher(
this, this.evitaConfiguration.server(), this.evitaConfiguration.transaction(),
this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer
);
while (current != null) {
//noinspection unchecked
final List<Subscriber<?>> subscribers = (List<Subscriber<?>>) current.getSubscribers();
Assert.isPremiseValid(subscribers.size() == 1, "Only one subscriber is expected!");
for (Subscriber<?> subscriber : subscribers) {
if (subscriber instanceof AbstractTransactionStage<?, ?> stage) {
stage.advanceVersion(catalogVersion);
current = stage;
} else {
current = null;
}
}
}
}

/**
* Replaces reference to the catalog in this instance. The reference is stored in transactional data structure so
* that it doesn't affect parallel clients until committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,15 @@ private <T extends ObservableTask> T addTaskToQueue(@Nonnull T task) {
*/
private void cancelTimedOutTasks() {
// go through the entire queue, but only once
final ArrayList<ObservableTask> buffer = new ArrayList<>(512);
final int bufferSize = 512;
final ArrayList<ObservableTask> buffer = new ArrayList<>(bufferSize);
final int queueSize = this.queue.size();
int timedOutTasks = 0;
for (int i = 0; i < queueSize; i++) {
// initialize threshold for entire batch only once
final long threshold = System.currentTimeMillis() - this.timeoutInMilliseconds;
// effectively withdraw first block of tasks from the queue
this.queue.drainTo(buffer, buffer.size());
this.queue.drainTo(buffer, bufferSize);
// now go through all of them
final Iterator<ObservableTask> it = buffer.iterator();
while (it.hasNext()) {
Expand Down
Loading

0 comments on commit 34bff88

Please sign in to comment.