Skip to content

Commit

Permalink
fix(#41): merge with dev branch
Browse files Browse the repository at this point in the history
  • Loading branch information
novoj committed Jun 2, 2024
1 parent c040ddc commit dbd3254
Showing 1 changed file with 61 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import io.evitadb.test.extension.EvitaParameterResolver;
import io.evitadb.test.generator.DataGenerator;
import io.evitadb.utils.CollectionUtils;
import io.evitadb.utils.StringUtils;
import io.evitadb.utils.UUIDUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -87,7 +86,6 @@
import org.mockito.Mockito;

import javax.annotation.Nonnull;
import java.math.BigDecimal;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -370,7 +368,7 @@ private static Set<PkWithCatalogVersion> automaticallyGenerateEntitiesInParallel

System.out.println(
"Created " + primaryKeysWithTxIds.size() + " entities in " + (numberOfThreads * iterations) +
" transactions and " + (System.currentTimeMillis() - initialStart) + " ms."
" transactions in " + (System.currentTimeMillis() - initialStart) + " ms."
);

return primaryKeysWithTxIds;
Expand Down Expand Up @@ -984,153 +982,90 @@ void shouldAutomaticallyGenerateEntitiesInParallel(EvitaContract originalEvita,
.build()
);

final int numberOfThreads = 30;
final int iterations = 100;
final ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
final CountDownLatch latch = new CountDownLatch(numberOfThreads);
final Set<PkWithCatalogVersion> primaryKeysWithTxIds = new ConcurrentSkipListSet<>();

}
);
automaticallyGenerateEntitiesInParallel(evita, productSchema, null);
}

@DisplayName("Verify code has no problems assigning new PK in concurrent environment and simultaneous backup & restore")
@UseDataSet(value = TRANSACTIONAL_DATA_SET, destroyAfterTest = true)
@Tag(LONG_RUNNING_TEST)
@ParameterizedTest(name = "This test verifies, that all data files are correctly rotated and compacted.")
@ArgumentsSource(TimeArgumentProvider.class)
void shouldCorrectlyRotateAllFiles(GenerationalTestInput input) throws Exception {
final Path testDirectory = getTestDirectory().resolve("shouldCorrectlyRotateAllFiles");
cleanTestSubDirectory("shouldCorrectlyRotateAllFiles");
@Test
void shouldBackupAndRestoreCatalogDuringHeavyParallelIndexing(EvitaContract originalEvita, SealedEntitySchema productSchema) throws Exception {
final EvitaConfiguration originalConfiguration = ((Evita) originalEvita).getConfiguration();
originalEvita.close();

// reinitialize evita with a larger queue size
final Evita evita = new Evita(
EvitaConfiguration.builder()
.storage(
StorageOptions.builder()
.storageDirectory(testDirectory)
.minimalActiveRecordShare(0.9)
.fileSizeCompactionThresholdBytes(16_384)
.build()
)
.name(originalConfiguration.name())
.storage(originalConfiguration.storage())
.transaction(
TransactionOptions.builder()
.walFileSizeBytes(4_096)
.walFileCountKept(2)
.build()
)
.server(
ServerOptions.builder()
.killTimedOutShortRunningThreadsEverySeconds(-1)
.closeSessionsAfterSecondsOfInactivity(-1)
.maxQueueSize(16_384)
.build()
)
.server(originalConfiguration.server())
.cache(originalConfiguration.cache())
.build()
);

try {
final String entityProduct = "product";
final String attributeUrl = "url";
final String attributeCode = "code";
final String attributeName = "name";
final String attributePrice = "price";

evita.defineCatalog(TEST_CATALOG)
.updateAndFetchViaNewSession(evita)
.openForWrite()
.withAttribute(attributeUrl, String.class)
.withEntitySchema(entityProduct, productSchema -> productSchema
.withoutGeneratedPrimaryKey()
.withGlobalAttribute(attributeUrl)
.withAttribute(attributeCode, String.class, thatIs -> thatIs.unique().sortable())
.withAttribute(attributeName, String.class, thatIs -> thatIs.filterable().sortable())
.withAttribute(attributePrice, BigDecimal.class, thatIs -> thatIs.filterable().sortable()))
.updateViaNewSession(evita);
evita.updateCatalog(
TEST_CATALOG, session -> {
session.goLiveAndClose();
final Path backupFile = Files.createTempFile(getTestDirectory(), "backup", ".tmp");
final Set<PkWithCatalogVersion> insertedPrimaryKeysAndAssociatedTxs = automaticallyGenerateEntitiesInParallel(
evita, productSchema, theEvita -> {
try (OutputStream outputStream = Files.newOutputStream(backupFile)) {
theEvita.backupCatalog(TEST_CATALOG, outputStream);
} catch (IOException e) {
fail("Backup failed!", e);
}
);
}
);

final Faker faker = new Faker(new Random(input.randomSeed()));
final LocalDateTime initialStart = LocalDateTime.now();
do {
evita.updateCatalog(
TEST_CATALOG,
session -> {
session.getEntity(
entityProduct, 1, attributeContentAll()
)
.map(SealedInstance::openForWrite)
.orElseGet(() -> session.createNewEntity(entityProduct, 1))
.setAttribute(attributeUrl, faker.internet().url())
.setAttribute(attributeCode, faker.code().isbn10())
.setAttribute(attributeName, faker.book().title())
.setAttribute(attributePrice, BigDecimal.valueOf(faker.number().randomDouble(2, 1, 1000)))
.upsertVia(session);
}
);
} while (Duration.between(initialStart, LocalDateTime.now()).toMinutes() < input.intervalInMinutes());
assertTrue(backupFile.toFile().exists(), "Backup file does not exist!");

// close the evita
evita.close();
final String restoredCatalogName = TEST_CATALOG + "_restored";
evita.restoreCatalog(restoredCatalogName, Files.newInputStream(backupFile));

try (final Evita restartedEvita = new Evita(evita.getConfiguration())) {
assertInstanceOf(
Catalog.class, restartedEvita.getCatalogInstance(TEST_CATALOG).orElseThrow(),
"Catalog should be loaded from the disk!"
);
}
final long originalCatalogVersion = evita.queryCatalog(
TEST_CATALOG,
EvitaSessionContract::getCatalogVersion
);

// final solution
/*evita.queryCatalog(
TEST_CATALOG,
session -> {
assertTrue(session.getEntity(entityProduct, 1).isPresent());
log.info("Original catalog finished with version: " + originalCatalogVersion);

// read entire history
DefaultCatalogPersistenceService.getBootstrapRecordStream(
TEST_CATALOG, evita.getConfiguration().storage()
).forEach(
record -> {
log.info("Bootstrap record: " + record);
// TODO JNO - reconstruct the catalog from that moment, which verifies the consistency
evita.queryCatalog(
restoredCatalogName,
session -> {
final long restoredCatalogVersion = session.getCatalogVersion();
log.info("Restored catalog is version: " + restoredCatalogVersion);

assertTrue(
restoredCatalogVersion < originalCatalogVersion,
"Restored catalog version should be lower than the original one!"
);

final AtomicInteger productCount = new AtomicInteger();
insertedPrimaryKeysAndAssociatedTxs
.stream()
.filter(it -> it.catalogVersion() <= restoredCatalogVersion).forEach(
it -> {
final Optional<SealedEntity> entity = session.getEntity(it.getType(), it.getPrimaryKey());
assertTrue(
entity.isPresent(),
"Entity `" + it + "` visible in version `" + it.catalogVersion() +
"` not found in restored catalog with restored version `" + restoredCatalogVersion + "`!"
);
productCount.incrementAndGet();
}
);
}
);*/
} finally {
if (evita.isActive()) {
evita.close();
}
// TODO JNO - UNCOMMENT
// cleanTestDirectory();
}
assertTrue(latch.await(300, TimeUnit.SECONDS), "Timeouted!");

if (thrownException.get() != null) {
throw thrownException.get();
}

// wait until Evita reaches the last version of the catalog
long waitingStart = System.currentTimeMillis();
while (
// cap to one minute
System.currentTimeMillis() - waitingStart < 120_000 &&
// and finish when the last transaction is visible
evita.queryCatalog(TEST_CATALOG, EvitaSessionContract::getCatalogVersion) < numberOfThreads * iterations + 1
) {
Thread.onSpinWait();
}
log.info("Restored catalog has " + productCount.get() + " products.");

assertEquals(primaryKeysWithTxIds.size(), numberOfThreads * iterations);
final Set<Integer> primaryKeys = primaryKeysWithTxIds.stream()
.map(PkWithCatalogVersion::getPrimaryKey)
.collect(Collectors.toSet());
for (int i = 1; i <= numberOfThreads * iterations; i++) {
assertTrue(primaryKeys.contains(i), "Primary key missing: " + (i));
}
assertEquals(
productCount.get(),
session.getEntityCollectionSize(productSchema.getName()),
"Number of products in restored catalog does not match the original catalog!"
);

System.out.println(
"Created " + primaryKeysWithTxIds.size() + " entities in " + (numberOfThreads * iterations) +
" transactions in " + StringUtils.formatDuration(Duration.ofMillis(System.currentTimeMillis() - initialStart)) + "."
}
);
}

Expand Down

0 comments on commit dbd3254

Please sign in to comment.