Skip to content

Commit

Permalink
Added test util for state build verification
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Balani committed Jan 30, 2025
1 parent 70326f3 commit 216fafb
Showing 1 changed file with 46 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ public void startup() throws InstantiationException {
testE2EFlow();
logger.info("Demo: E2E flow took {} ms", System.currentTimeMillis() - startTimeMs);

// testChunkAggregateWithStateBuildForFileCopy();
testFileChunkAggregationForFileCopy();
testStateBuildPostFileCopy();
// testFileStoreUtils();
} catch (Exception e) {
logger.error("Error during startup", e);
Expand Down Expand Up @@ -594,12 +595,13 @@ private void testFileStoreUtils() throws StoreException, IOException {
System.out.println("Parsed log file contents read for offset=" + offset + ", size=" + size + " is: " + StandardCharsets.UTF_8.decode(byteBuffer));
}

private void testChunkAggregateWithStateBuildForFileCopy() throws IOException {
String partitionName = "0";
private void testFileChunkAggregationForFileCopy() throws IOException {
String chunkPath = "/tmp/0/test_chunk"; // The path to the chunk file
String logFilePath = "/tmp/0/0_log"; // The path to the log file where chunks are written
String outputFilePath = "/tmp/0/output_log_copy"; // New file where the log data will be copied

System.out.println("Testing file chunk aggregation for filecopy with Filestore");

int numChunksToWrite = 10; // Number of times the chunk should be written to the log file
int chunkSize = (int)Files.size(Paths.get(chunkPath)); // Size of the chunk

Expand All @@ -625,6 +627,44 @@ private void testChunkAggregateWithStateBuildForFileCopy() throws IOException {
}


// Step 2: Read from logFilePath chunk by chunk and write to a new file in the same directory
int offset = 0;
try (FileInputStream logInputStream = new FileInputStream(logFilePath);
FileOutputStream outputStream = new FileOutputStream(outputFilePath)) {

byte[] buffer = new byte[chunkSize];
int bytesRead;
while ((bytesRead = logInputStream.read(buffer)) != -1) {
// Write the chunk to the new file
outputStream.write(buffer, 0, bytesRead);
offset += bytesRead;

System.out.println("Writing chunk to new file at offset " + offset);
}

// Verify if contents of both files are same
byte[] content1 = Files.readAllBytes(Paths.get(logFilePath));
byte[] content2 = Files.readAllBytes(Paths.get(outputFilePath));
// Compare the byte arrays
if (Arrays.equals(content1, content2)) {
System.out.println("Input and output files are identical.");
} else {
System.out.println("Input and output files differ.");
}
System.out.println("File copy completed. Data written to " + outputFilePath);
} catch (IOException e) {
System.err.println("An error occurred while reading or writing the log file: " + e.getMessage());
}
}


private void testStateBuildPostFileCopy() throws IOException {
String partitionName = "803";
String logFilePath = "/tmp/803/14_0_log"; // The path to the log file where chunks are written
String outputFilePath = "/tmp/803/15_0_log"; // New file where the log data will be copied
int chunkSize = 100*1024*1024; // Size of the chunk
System.out.println("Testing state build post filecopy for partitionId " + partitionName);

// Step 2: Read from logFilePath chunk by chunk and write to a new file in the same directory
int offset = 0;
try (FileInputStream logInputStream = new FileInputStream(logFilePath);
Expand Down Expand Up @@ -660,8 +700,10 @@ private void testChunkAggregateWithStateBuildForFileCopy() throws IOException {
// rename the output file to act as the new base log file
Files.move(Paths.get(outputFilePath), Paths.get(logFilePath));

storageManager.buildStateForFileCopy(storageManager.getReplica(partitionName));
System.out.println("Renamed log file: " + outputFilePath + " to " + logFilePath );

storageManager.buildStateForFileCopy(storageManager.getReplica(partitionName));
System.out.println("State build successfully for partitionId: " + partitionName);
}

/**
Expand Down

0 comments on commit 216fafb

Please sign in to comment.