Skip to content

Commit

Permalink
feat(#41): create and document backup & restore process
Browse files Browse the repository at this point in the history
Backup & restore done and working. No client implementation and missing integration test.
  • Loading branch information
novoj committed May 4, 2024
1 parent 7ff1216 commit 7430158
Show file tree
Hide file tree
Showing 24 changed files with 1,030 additions and 302 deletions.
11 changes: 11 additions & 0 deletions evita_api/src/main/java/io/evitadb/api/CatalogContract.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import io.evitadb.api.requestResponse.system.TimeFlow;
import io.evitadb.api.requestResponse.transaction.TransactionMutation;
import io.evitadb.dataType.PaginatedList;
import io.evitadb.exception.UnexpectedIOException;

import javax.annotation.Nonnull;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -300,11 +302,20 @@ boolean renameCollectionOfEntity(@Nonnull String entityType, @Nonnull String new
@Nonnull
Stream<Mutation> getReversedCommittedMutationStream(long catalogVersion);

/**
* Creates a backup of the specified catalog and returns an InputStream to read the binary data of the zip file.
*
* @param outputStream an OutputStream to write the binary data of the zip file
* @throws UnexpectedIOException if an I/O error occurs during reading the catalog contents
*/
void backup(OutputStream outputStream) throws UnexpectedIOException;

/**
* Terminates catalog instance and frees all claimed resources. Prepares catalog instance to be garbage collected.
*
* This method is idempotent and may be called multiple times. Only first call is really processed and others are
* ignored.
*/
void terminate();

}
21 changes: 21 additions & 0 deletions evita_api/src/main/java/io/evitadb/api/EvitaContract.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@
import io.evitadb.api.requestResponse.system.SystemStatus;
import io.evitadb.exception.EvitaInternalError;
import io.evitadb.exception.EvitaInvalidUsageException;
import io.evitadb.exception.UnexpectedIOException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -318,6 +321,24 @@ default void updateCatalog(
}
}

/**
* Creates a backup of the specified catalog and returns an InputStream to read the binary data of the zip file.
*
* @param catalogName the name of the catalog to backup
* @param outputStream an OutputStream to write the binary data of the zip file
* @throws UnexpectedIOException if an I/O error occurs during reading the catalog contents
*/
void backupCatalog(@Nonnull String catalogName, @Nonnull OutputStream outputStream) throws UnexpectedIOException;

/**
* Restores a catalog from the provided InputStream which contains the binary data of a previously backed up zip file.
*
* @param catalogName the name of the catalog to restore
* @param inputStream an InputStream to read the binary data of the zip file
* @throws UnexpectedIOException if an I/O error occurs
*/
void restoreCatalog(@Nonnull String catalogName, @Nonnull InputStream inputStream) throws UnexpectedIOException;

/**
* Overloaded method {@link #updateCatalogAsync(String, Function, CommitBehavior, SessionFlags...)} that returns
* future that is completed when the transaction reaches the processing stage defined by the `commitBehaviour`
Expand Down
10 changes: 10 additions & 0 deletions evita_api/src/main/java/io/evitadb/api/EvitaSessionContract.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@
import io.evitadb.api.requestResponse.schema.mutation.catalog.ModifyCatalogSchemaMutation;
import io.evitadb.api.requestResponse.schema.mutation.catalog.ModifyEntitySchemaMutation;
import io.evitadb.exception.EvitaInvalidUsageException;
import io.evitadb.exception.UnexpectedIOException;
import io.evitadb.utils.ArrayUtils;
import io.evitadb.utils.Assert;

import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.Closeable;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -984,6 +986,14 @@ <T extends Serializable> DeletedHierarchy<T> deleteEntityAndItsHierarchy(@Nonnul
@Nonnull
SealedEntity[] deleteSealedEntitiesAndReturnBodies(@Nonnull Query query);

/**
* Creates a backup of the specified catalog and returns an InputStream to read the binary data of the zip file.
*
* @param outputStream an OutputStream to write the binary data of the zip file
* @throws UnexpectedIOException if an I/O error occurs during reading the catalog contents
*/
void backupCatalog(@Nonnull OutputStream outputStream) throws UnexpectedIOException;

/**
* Default implementation uses ID for comparing two sessions (and to distinguish one session from another).
*
Expand Down
28 changes: 26 additions & 2 deletions evita_engine/src/main/java/io/evitadb/core/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import io.evitadb.core.transaction.stage.WalAppendingTransactionStage;
import io.evitadb.dataType.PaginatedList;
import io.evitadb.exception.GenericEvitaInternalError;
import io.evitadb.exception.UnexpectedIOException;
import io.evitadb.index.CatalogIndex;
import io.evitadb.index.CatalogIndexKey;
import io.evitadb.index.EntityIndex;
Expand Down Expand Up @@ -122,6 +123,8 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.file.Path;
import java.util.*;
Expand Down Expand Up @@ -255,6 +258,23 @@ public final class Catalog implements CatalogContract, CatalogVersionBeyondTheHo
*/
private long lastPersistedSchemaVersion;

/**
* Verifies whether the catalog name could be used for a new catalog.
*
* @param catalogName the name of the catalog
* @param storageOptions the storage options
*/
public static Path restoreCatalogTo(
@Nonnull String catalogName,
@Nonnull StorageOptions storageOptions,
@Nonnull InputStream inputStream
) {
return ServiceLoader.load(CatalogPersistenceServiceFactory.class)
.findFirst()
.map(it -> it.restoreCatalogTo(catalogName, storageOptions, inputStream))
.orElseThrow(() -> new IllegalStateException("IO service is unexpectedly not available!"));
}

/**
* Creates a transaction pipeline for transaction processing consisting of 4 stages:
*
Expand Down Expand Up @@ -344,7 +364,6 @@ public Catalog(

public Catalog(
@Nonnull String catalogName,
@Nonnull Path catalogPath,
@Nonnull CacheSupervisor cacheSupervisor,
@Nonnull StorageOptions storageOptions,
@Nonnull TransactionOptions transactionOptions,
Expand All @@ -356,7 +375,7 @@ public Catalog(
this.tracingContext = tracingContext;
this.persistenceService = ServiceLoader.load(CatalogPersistenceServiceFactory.class)
.findFirst()
.map(it -> it.load(this, catalogName, catalogPath, storageOptions, transactionOptions, scheduler))
.map(it -> it.load(this, catalogName, storageOptions, transactionOptions, scheduler))
.orElseThrow(() -> new IllegalStateException("IO service is unexpectedly not available!"));
final CatalogHeader catalogHeader = this.persistenceService.getCatalogHeader(
this.persistenceService.getLastCatalogVersion()
Expand Down Expand Up @@ -820,6 +839,11 @@ public Stream<Mutation> getReversedCommittedMutationStream(long catalogVersion)
return this.persistenceService.getReversedCommittedMutationStream(catalogVersion);
}

@Override
public void backup(OutputStream outputStream) throws UnexpectedIOException {
this.persistenceService.backup(outputStream);
}

@Override
public void terminate() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
import io.evitadb.api.requestResponse.system.TimeFlow;
import io.evitadb.core.exception.CatalogCorruptedException;
import io.evitadb.dataType.PaginatedList;
import io.evitadb.exception.UnexpectedIOException;
import io.evitadb.utils.FileUtils;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import javax.annotation.Nonnull;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.file.Path;
import java.util.Map;
Expand Down Expand Up @@ -221,6 +223,11 @@ public boolean goLive() {
throw new CatalogCorruptedException(this);
}

@Override
public void backup(OutputStream outputStream) throws UnexpectedIOException {
throw new CatalogCorruptedException(this);
}

@Override
public void terminate() {
// we don't need to terminate corrupted catalog
Expand Down
94 changes: 63 additions & 31 deletions evita_engine/src/main/java/io/evitadb/core/Evita.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.evitadb.core.maintenance.SessionKiller;
import io.evitadb.core.query.algebra.Formula;
import io.evitadb.exception.EvitaInvalidUsageException;
import io.evitadb.exception.UnexpectedIOException;
import io.evitadb.scheduling.RejectingExecutor;
import io.evitadb.scheduling.Scheduler;
import io.evitadb.thread.TimeoutableThread;
Expand All @@ -80,6 +81,8 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import java.io.Closeable;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.time.OffsetDateTime;
Expand Down Expand Up @@ -238,34 +241,15 @@ public Evita(@Nonnull EvitaConfiguration configuration) {
this.catalogs = CollectionUtils.createConcurrentHashMap(directories.length);
final CountDownLatch startUpLatch = new CountDownLatch(directories.length);
for (Path directory : directories) {
final String catalogName = directory.toFile().getName();
this.executor.execute(() -> {
try {
final long start = System.nanoTime();
final CatalogContract catalog = new Catalog(
catalogName,
directory,
this.cacheSupervisor,
this.configuration.storage(),
this.configuration.transaction(),
this.reflectionLookup,
this.scheduler,
this::replaceCatalogReference,
this.tracingContext
);
log.info("Catalog {} fully loaded in: {}", catalogName, StringUtils.formatNano(System.nanoTime() - start));
// this will be one day used in more clever way, when entire catalog loading will be split into
// multiple smaller tasks and done asynchronously after the startup (along with catalog loading / unloading feature)
catalog.processWriteAheadLog(
updatedCatalog -> this.catalogs.put(catalogName, updatedCatalog)
);
} catch (Throwable ex) {
log.error("Catalog {} is corrupted!", catalogName, ex);
this.catalogs.put(catalogName, new CorruptedCatalog(catalogName, directory, ex));
} finally {
startUpLatch.countDown();
this.executor.execute(
() -> {
try {
loadCatalog(directory.toFile().getName());
} finally {
startUpLatch.countDown();
}
}
});
);
}

try {
Expand Down Expand Up @@ -474,6 +458,22 @@ public <T> CompletableFuture<T> updateCatalogAsync(
}
}

@Override
public void backupCatalog(@Nonnull String catalogName, @Nonnull OutputStream outputStream) throws UnexpectedIOException {
assertActive();
try (final EvitaSessionContract session = this.createSession(new SessionTraits(catalogName))) {
session.backupCatalog(outputStream);
}
}

@Override
public void restoreCatalog(@Nonnull String catalogName, @Nonnull InputStream inputStream) throws UnexpectedIOException {
assertActive();
Catalog.restoreCatalogTo(catalogName, this.configuration.storage(), inputStream);
// finally, try to load catalog from the disk
loadCatalog(catalogName);
}

@Override
public CompletableFuture<Long> updateCatalogAsync(
@Nonnull String catalogName,
Expand Down Expand Up @@ -600,7 +600,39 @@ public CatalogContract getCatalogInstanceOrThrowException(@Nonnull String catalo

/*
PRIVATE METHODS
*/

/**
* Loads catalog from the designated directory. If the catalog is corrupted, it will be marked as such, but it'll
* still be added to the list of catalogs.
*
* @param catalogName name of the catalog
*/
private void loadCatalog(@Nonnull String catalogName) {
final Path directory = configuration.storage().storageDirectoryOrDefault().resolve(catalogName);
try {
final long start = System.nanoTime();
final CatalogContract catalog = new Catalog(
catalogName,
this.cacheSupervisor,
this.configuration.storage(),
this.configuration.transaction(),
this.reflectionLookup,
this.scheduler,
this::replaceCatalogReference,
this.tracingContext
);
log.info("Catalog {} fully loaded in: {}", catalogName, StringUtils.formatNano(System.nanoTime() - start));
// this will be one day used in more clever way, when entire catalog loading will be split into
// multiple smaller tasks and done asynchronously after the startup (along with catalog loading / unloading feature)
catalog.processWriteAheadLog(
updatedCatalog -> this.catalogs.put(catalogName, updatedCatalog)
);
} catch (Throwable ex) {
log.error("Catalog {} is corrupted!", catalogName);
this.catalogs.put(catalogName, new CorruptedCatalog(catalogName, directory, ex));
}
}

/**
* Creates new catalog in the evitaDB.
Expand Down Expand Up @@ -707,7 +739,7 @@ private void doReplaceCatalogInternal(
final CatalogContract previousCatalog = this.catalogs.put(catalogNameToBeReplaced, replacedCatalog);

// notify callback that it's now a live snapshot
((Catalog)replacedCatalog).notifyCatalogPresentInLiveView();
((Catalog) replacedCatalog).notifyCatalogPresentInLiveView();

structuralChangeObservers.forEach(it -> it.onCatalogDelete(catalogNameToBeReplacedWith));
if (previousCatalog == null) {
Expand Down Expand Up @@ -1041,8 +1073,8 @@ private static class TimeoutThreadKiller implements Runnable {

public TimeoutThreadKiller(
int timeoutInSeconds,
int checkRateInSeconds,
@Nonnull EnhancedQueueExecutor executor
int checkRateInSeconds,
@Nonnull EnhancedQueueExecutor executor
) {
this.timeoutInSeconds = timeoutInSeconds;
this.executor = executor;
Expand Down Expand Up @@ -1103,7 +1135,7 @@ private record CatalogNameInConvention(
* Represents a created session.
* This class is a record that encapsulates a session and a future for closing the session.
*
* @param session reference to the created session itself
* @param session reference to the created session itself
* @param closeFuture future that gets completed when session is closed
*/
private record CreatedSession(
Expand Down
7 changes: 7 additions & 0 deletions evita_engine/src/main/java/io/evitadb/core/EvitaSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import io.evitadb.core.transaction.TransactionWalFinalizer;
import io.evitadb.exception.EvitaInternalError;
import io.evitadb.exception.EvitaInvalidUsageException;
import io.evitadb.exception.UnexpectedIOException;
import io.evitadb.utils.ArrayUtils;
import io.evitadb.utils.Assert;
import io.evitadb.utils.ReflectionLookup;
Expand All @@ -89,6 +90,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.OutputStream;
import java.io.Serializable;
import java.time.OffsetDateTime;
import java.util.Arrays;
Expand Down Expand Up @@ -1021,6 +1023,11 @@ public SealedEntity[] deleteSealedEntitiesAndReturnBodies(@Nonnull Query query)
});
}

@Override
public void backupCatalog(@Nonnull OutputStream outputStream) throws UnexpectedIOException {
getCatalog().backup(outputStream);
}

@Nonnull
@Override
public Optional<UUID> getOpenedTransactionId() {
Expand Down
Loading

0 comments on commit 7430158

Please sign in to comment.