Skip to content

Commit

Permalink
feat(#41): bootstrap file vacuuming with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
novoj committed Mar 27, 2024
1 parent c3f5fce commit d526c01
Show file tree
Hide file tree
Showing 9 changed files with 442 additions and 35 deletions.
45 changes: 44 additions & 1 deletion evita_common/src/main/java/io/evitadb/utils/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@

import javax.annotation.Nonnull;
import java.io.IOException;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand All @@ -45,6 +47,7 @@
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class FileUtils {
private static final Path[] EMPTY_PATHS = new Path[0];

/**
* Returns list of folders in Evita directory. Each folder is considered to be Evita catalog - name of the folder
Expand All @@ -62,7 +65,7 @@ public static Path[] listDirectories(@Nonnull Path directory) {
);
}
} else {
return new Path[0];
return EMPTY_PATHS;
}
}

Expand Down Expand Up @@ -116,4 +119,44 @@ public static boolean isDirectoryEmpty(@Nonnull Path path) {
);
}
}

/**
* Moves a source file to a target file, replacing the target file if it already exists.
* This method ensures atomic move if supported by the underlying file system.
*
* @param sourceFile the path of the source file to be moved
* @param targetFile the path of the target file where the source file should be moved to
* @throws UnexpectedIOException if an unexpected I/O error occurs during the file movement
*/
public static void rewriteTargetFileAtomically(
@Nonnull Path sourceFile,
@Nonnull Path targetFile
) {
try {
Files.move(
sourceFile, targetFile,
StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE
);
} catch (AtomicMoveNotSupportedException e) {
try {
Files.move(
sourceFile, targetFile,
StandardCopyOption.REPLACE_EXISTING
);
} catch (Exception fallbackException) {
throw new UnexpectedIOException(
"Failed to move temporary bootstrap file to the original location!",
"Failed to move temporary bootstrap file!",
fallbackException
);
}
} catch (Exception e) {
throw new UnexpectedIOException(
"Failed to move temporary bootstrap file to the original location!",
"Failed to move temporary bootstrap file!",
e
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ void shouldPickUpExistingWalOnStartAndReplayItsContents(Evita evita) {
catalogKryoPool,
StorageOptions.builder().build(),
TransactionOptions.builder().build(),
Mockito.mock(Scheduler.class)
Mockito.mock(Scheduler.class),
timestamp -> {}
);

// create WAL file with a few contents first
Expand Down Expand Up @@ -335,7 +336,8 @@ void shouldBuildCorrectHistory(Evita evita) {
catalogKryoPool,
StorageOptions.builder().build(),
TransactionOptions.builder().build(),
Mockito.mock(Scheduler.class)
Mockito.mock(Scheduler.class),
timestamp -> {}
);

// create WAL file multiple times, start Catalog to crunch the history and close evitaDB again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,48 @@ void shouldTraverseBootstrapRecordsFromNewestToOldest() {
}
}

@Test
void shouldTrimBootstrapRecords() {
final String catalogName = SEALED_CATALOG_SCHEMA.getName();
final DefaultCatalogPersistenceService ioService = new DefaultCatalogPersistenceService(
catalogName,
getStorageOptions(),
getTransactionOptions(),
Mockito.mock(Scheduler.class)
);

final OffsetDateTime timestamp = OffsetDateTime.now();
for (int i = 0; i < 12; i++) {
ioService.recordBootstrap(
i + 1, catalogName, 0,
timestamp.plusMinutes(i).toInstant().toEpochMilli()
);
}

final PaginatedList<CatalogVersion> catalogVersions0 = ioService.getCatalogVersions(TimeFlow.FROM_OLDEST_TO_NEWEST, 1, 20);
assertEquals(0, catalogVersions0.getData().get(0).version());
assertEquals(13, catalogVersions0.getTotalRecordCount());

trimAndCheck(ioService, timestamp.plusMinutes(3).plusSeconds(1), 4, 9);
trimAndCheck(ioService, timestamp.plusMinutes(6), 7, 6);
trimAndCheck(ioService, timestamp.plusMinutes(8).minusSeconds(1), 8, 5);
}

private static void trimAndCheck(
@Nonnull DefaultCatalogPersistenceService ioService,
@Nonnull OffsetDateTime toTimestamp,
int expectedVersion,
int expectedCount
) {
ioService.trimBootstrapFile(toTimestamp);

final PaginatedList<CatalogVersion> catalogVersions = ioService.getCatalogVersions(TimeFlow.FROM_OLDEST_TO_NEWEST, 1, 20);
final CatalogVersion firstRecord = catalogVersions.getData().get(0);
assertTrue(toTimestamp.isAfter(firstRecord.timestamp()));
assertEquals(expectedVersion, firstRecord.version());
assertEquals(expectedCount, catalogVersions.getTotalRecordCount());
}

/*
PRIVATE METHODS
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,27 @@
import io.evitadb.utils.FileUtils;
import io.evitadb.utils.NamingConvention;
import io.evitadb.utils.UUIDUtil;
import lombok.Getter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static io.evitadb.test.TestConstants.LONG_RUNNING_TEST;
Expand Down Expand Up @@ -100,6 +104,7 @@ protected Kryo create() {
private final OffHeapMemoryManager noOffHeapMemoryManager = new OffHeapMemoryManager(0, 0);
private final OffHeapMemoryManager bigOffHeapMemoryManager = new OffHeapMemoryManager(10_000_000, 128);
private final int[] txSizes = new int[]{2000, 3000, 4000, 5000, 7000, 9000, 1_000};
private final MockOffsetConsumer offsetConsumer = new MockOffsetConsumer();
private CatalogWriteAheadLog wal;

@BeforeEach
Expand Down Expand Up @@ -202,6 +207,22 @@ void shouldReadAllTransactionsUsingFileIsolatedWal() {
readAndVerifyWal(txInMutations, txSizes, 0);
}

@Test
void shouldCorrectlyReportFirstAvailableTimestamp() {
wal = createCatalogWriteAheadLogOfSmallSize();

final int justEnoughSize = 20;
final int[] transactionSizes = new int[7];
Arrays.fill(transactionSizes, justEnoughSize);

final OffsetDateTime initialTimestamp = OffsetDateTime.now();
writeWal(bigOffHeapMemoryManager, transactionSizes, initialTimestamp);

assertEquals(2, offsetConsumer.getOffsets().size());
assertEquals(initialTimestamp.plusMinutes(1), offsetConsumer.getOffsets().get(0));
assertEquals(initialTimestamp.plusMinutes(2), offsetConsumer.getOffsets().get(1));
}

@Nonnull
private CatalogWriteAheadLog createCatalogWriteAheadLogOfSmallSize() {
return new CatalogWriteAheadLog(
Expand All @@ -213,7 +234,8 @@ private CatalogWriteAheadLog createCatalogWriteAheadLogOfSmallSize() {
.walFileCountKept(5)
.walFileSizeBytes(16_384)
.build(),
Mockito.mock(Scheduler.class)
Mockito.mock(Scheduler.class),
offsetConsumer
);
}

Expand All @@ -225,7 +247,8 @@ private CatalogWriteAheadLog createCatalogWriteAheadLogOfLargeEnoughSize() {
catalogKryoPool,
StorageOptions.builder().build(),
TransactionOptions.builder().walFileSizeBytes(Long.MAX_VALUE).build(),
Mockito.mock(Scheduler.class)
Mockito.mock(Scheduler.class),
offsetConsumer
);
}

Expand All @@ -244,6 +267,18 @@ private void createCachedSupplierReadAndVerifyFrom(Map<Long, List<Mutation>> txI
*/
@Nonnull
private Map<Long, List<Mutation>> writeWal(@Nonnull OffHeapMemoryManager offHeapMemoryManager, int[] transactionSizes) {
return writeWal(offHeapMemoryManager, transactionSizes, null);
}

/**
* Writes the Write-Ahead Log (WAL) using the provided off-heap memory manager.
*
* @param offHeapMemoryManager the off-heap memory manager to use
* @param transactionSizes an array of transaction sizes
* @return a map of catalog versions to corresponding mutations
*/
@Nonnull
private Map<Long, List<Mutation>> writeWal(@Nonnull OffHeapMemoryManager offHeapMemoryManager, int[] transactionSizes, @Nullable OffsetDateTime initialTimestamp) {
final DataGenerator dataGenerator = new DataGenerator();
final CatalogSchema catalogSchema = CatalogSchema._internalBuild(
TestConstants.TEST_CATALOG,
Expand All @@ -262,6 +297,7 @@ private Map<Long, List<Mutation>> writeWal(@Nonnull OffHeapMemoryManager offHeap
)
);

OffsetDateTime timestamp = initialTimestamp == null ? OffsetDateTime.now() : initialTimestamp;
final Map<Long, List<Mutation>> txInMutations = CollectionUtils.createHashMap(transactionSizes.length);
for (int i = 0; i < transactionSizes.length; i++) {
int txSize = transactionSizes[i];
Expand All @@ -275,7 +311,7 @@ private Map<Long, List<Mutation>> writeWal(@Nonnull OffHeapMemoryManager offHeap
)
.limit(txSize)
.map(EntityBuilder::toMutation)
.map(Optional::get)
.flatMap(Optional::stream)
.collect(Collectors.toCollection(LinkedList::new));

final long catalogVersion = i + 1;
Expand All @@ -289,7 +325,7 @@ private Map<Long, List<Mutation>> writeWal(@Nonnull OffHeapMemoryManager offHeap
catalogVersion,
mutations.size(),
walReference.getContentLength(),
OffsetDateTime.now()
timestamp
);

final long start = this.wal.getWalFilePath().toFile().length();
Expand All @@ -306,6 +342,8 @@ private Map<Long, List<Mutation>> writeWal(@Nonnull OffHeapMemoryManager offHeap
)
);
txInMutations.put(catalogVersion, mutations);

timestamp = timestamp.plusMinutes(1);
}
return txInMutations;
}
Expand Down Expand Up @@ -377,4 +415,13 @@ private void readAndVerifyWalInReverse(@Nonnull Map<Long, List<Mutation>> txInMu
assertEquals(transactionSizes.length - (transactionSizes.length - startIndex) + 1, txRead);
}

private static class MockOffsetConsumer implements Consumer<OffsetDateTime> {
@Getter private final List<OffsetDateTime> offsets = new LinkedList<>();

@Override
public void accept(OffsetDateTime offsetDateTime) {
offsets.add(offsetDateTime);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ protected Kryo create() {
catalogKryoPool,
StorageOptions.builder().build(),
TransactionOptions.builder().build(),
Mockito.mock(Scheduler.class)
Mockito.mock(Scheduler.class),
offsetDateTime -> {}
);
private final Path walFilePath = walDirectory.resolve(getWalFileName(TEST_CATALOG, walFileReference.fileIndex()));
private final int[] txSizes = new int[] {55, 152, 199, 46};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.evitadb.store.kryo.ObservableOutputKeeper;
import io.evitadb.store.offsetIndex.exception.SyncFailedException;
import io.evitadb.utils.Assert;
import lombok.Getter;

import javax.annotation.Nonnull;
import java.io.File;
Expand Down Expand Up @@ -86,7 +87,7 @@ public class WriteOnlyFileHandle implements WriteOnlyHandle {
* The path to the target file that this handle is associated with.
* This handle provides write-only access to the file at this path.
*/
private final Path targetFile;
@Getter private final Path targetFile;
/**
* The variable `observableOutputKeeper` is an instance of the class `ObservableOutputKeeper`. It is used to keep
* references to `ObservableOutput` instances that internally maintain large buffers for serialization. The need for
Expand Down
Loading

0 comments on commit d526c01

Please sign in to comment.