Skip to content

Commit

Permalink
Resolving conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
aga9900 committed Jan 29, 2025
1 parent 7bb1dac commit 9cb483e
Show file tree
Hide file tree
Showing 14 changed files with 469 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ public enum TransitionErrorCode {
/**
* If Bootstap Controller fails in pre-filecopy steps for specific replica.
*/
BootstrapControllerFailure
BootstrapControllerFailure,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.store.ChunkResponse;
import com.github.ambry.store.FileStore;
import com.github.ambry.store.LogInfo;
import com.github.ambry.store.Store;
import com.github.ambry.store.StoreException;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.FileStore;
import java.util.Collection;
import java.util.List;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -55,6 +55,8 @@ public interface StoreManager {
*/
boolean addFileStore(ReplicaId replicaId);

void setUpReplica(String partitionName);

/**
* Build state after filecopy is completed
* @param replica the {@link ReplicaId} of the {@link Store} for which store needs to be built
Expand Down
257 changes: 257 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/FileStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
package com.github.ambry.store;

import com.github.ambry.clustermap.FileStoreException;
import com.github.ambry.clustermap.FileStoreException.FileStoreErrorCode;
import com.github.ambry.config.FileCopyConfig;
import com.github.ambry.replication.FindToken;
import com.github.ambry.utils.CrcInputStream;
import com.github.ambry.utils.CrcOutputStream;
import com.github.ambry.utils.Pair;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class FileStore {
private static final Logger logger = LoggerFactory.getLogger(FileStore.class);
private static boolean isRunning = false;
private final String dataDir;
private final FileMetadataSerde fileMetadataSerde;
private final FileCopyConfig fileCopyConfig;

public FileStore(String dataDir, FileCopyConfig fileCopyConfig){
this.dataDir = dataDir;
this.fileMetadataSerde = new FileMetadataSerde();
this.fileCopyConfig = fileCopyConfig;
}

public void start() throws StoreException {
isRunning = true;
}
public boolean isRunning() {
return isRunning;
}
public void stop() {
isRunning = false;
}


// TODO Moved to BlobStore as the bootstrapping node wouldn't have FileStore instantiated.
public FileInputStream getStreamForFileRead(String mountPath, String fileName)
throws IOException {
if(!isRunning){
throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure);
}
// TODO: Handle edge cases and validations
String filePath = mountPath + "/" + fileName;
File file = new File(filePath);
// Check if file exists and is readable
if (!file.exists() || !file.canRead()) {
throw new IOException("File doesn't exist or cannot be read: " + filePath);
}
return new FileInputStream(file);
}

public void putChunkToFile(String outputFilePath, FileInputStream fileInputStream)
throws IOException {
if(!isRunning){
throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure);
}
if(fileInputStream == null){
throw new IllegalArgumentException("fileInputStream is null");
}
// TODO: Handle edge cases and validations

// Determine the size of the file
long fileSize = fileInputStream.available();

// Read all bytes from the source file and append them to the output file

byte[] content = new byte[(int) fileSize]; // Read the content of the source file into a byte array
fileInputStream.read(content); // Read bytes into the array
Files.write(Paths.get(outputFilePath), content, StandardOpenOption.CREATE, StandardOpenOption.APPEND);

System.out.println("Write successful for chunk to file: " + outputFilePath);
}

// New class in input: List<FileMetaData>
public void persistMetaDataToFile(String mountPath, List<LogInfo> logInfoList) throws IOException {
if(!isRunning){
throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure);
}
if(logInfoList == null){
throw new IllegalArgumentException("logInfoList is null");
}

File temp = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName + ".tmp");
File actual = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName);
try {
FileOutputStream fileStream = new FileOutputStream(temp);
fileMetadataSerde.persist(logInfoList, fileStream);
System.out.println("FileCopyMetadata file serialized and written to file: " + actual.getAbsolutePath());
// swap temp file with the original file
temp.renameTo(actual);
logger.debug("Completed writing remote tokens to file {}", actual.getAbsolutePath());
} catch (IOException e) {
logger.error("IO error while persisting tokens to disk {}", temp.getAbsoluteFile());
throw e;
}
}


public List<LogInfo> readMetaDataFromFile(String mountPath) throws IOException {
List<LogInfo> logInfoList = new ArrayList<>();
if(!isRunning){
throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure);
}

File fileCopyMetaDataFile = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName);
if (!fileCopyMetaDataFile.exists()) {
logger.info("fileCopyMetaDataFile {} not found", fileCopyMetaDataFile.getAbsolutePath());
return logInfoList;
}
try {
FileInputStream fileStream = new FileInputStream(fileCopyMetaDataFile);
System.out.println("Attempting reading from file: " + fileCopyMetaDataFile.getAbsolutePath());
logInfoList = fileMetadataSerde.retrieve(fileStream);
return logInfoList;
} catch (IOException e) {
logger.error("IO error while reading filecopy metadata from disk {}", fileCopyMetaDataFile.getAbsoluteFile());
throw e;
}
}

public void shutdown(){
return;
}

/**
* Class to serialize and deserialize replica tokens
*/
private static class FileMetadataSerde {
private static final short Crc_Size = 8;
private static final short VERSION_0 = 0;
private static final short CURRENT_VERSION = VERSION_0;

public FileMetadataSerde() {
}

/**
* Serialize the remote tokens to the file
* @param logInfoList the mapping from the replicas to the remote tokens
* @param outputStream the file output stream to write to
*/
public void persist(List<LogInfo> logInfoList, OutputStream outputStream)
throws IOException {
CrcOutputStream crcOutputStream = new CrcOutputStream(outputStream);
DataOutputStream writer = new DataOutputStream(crcOutputStream);
try {

writer.writeInt(logInfoList.size());
for (LogInfo logInfo : logInfoList) {
// write log segment size and name
writer.writeLong(logInfo.getLogSegment().getFileSize());
writer.writeLong(logInfo.getLogSegment().getFileName().getBytes().length);
writer.write(logInfo.getLogSegment().getFileName().getBytes());
writer.writeInt(logInfo.getIndexSegments().size());
for(FileInfo fileInfo : logInfo.getIndexSegments()){
writer.writeLong(fileInfo.getFileSize());
writer.writeLong(fileInfo.getFileName().getBytes().length);
writer.write(fileInfo.getFileName().getBytes());
}
writer.writeInt(logInfo.getBloomFilters().size());
for(FileInfo fileInfo: logInfo.getBloomFilters()){
writer.writeLong(fileInfo.getFileSize());
writer.writeLong(fileInfo.getFileName().getBytes().length);
writer.write(fileInfo.getFileName().getBytes());
}
}

long crcValue = crcOutputStream.getValue();
writer.writeLong(crcValue);
} catch (IOException e) {
logger.error("IO error while serializing remote peer tokens", e);
throw e;
} finally {
if (outputStream instanceof FileOutputStream) {
// flush and overwrite file
((FileOutputStream) outputStream).getChannel().force(true);
}
writer.close();
}
}

/**
* Deserialize the remote tokens
* @param inputStream the input stream from the persistent file
* @return the mapping from replicas to remote tokens
*/
public List<LogInfo> retrieve(InputStream inputStream) throws IOException {
List<LogInfo> logInfoList = new ArrayList<>();
CrcInputStream crcStream = new CrcInputStream(inputStream);
DataInputStream stream = new DataInputStream(crcStream);
ConcurrentMap<String, Pair<Long, FindToken>> peerTokens = new ConcurrentHashMap<>();
try {
while (stream.available() > Crc_Size) {
int logInfoListSize = stream.readInt();
for(int i = 0; i < logInfoListSize; i++){
// read log segment name
Long logSegmentSize = stream.readLong();
byte[] logSegmentNameBytes = new byte[(int) stream.readLong()];
stream.readFully(logSegmentNameBytes);
String logSegmentName = new String(logSegmentNameBytes);
FileInfo logSegment = new FileInfo(logSegmentName, logSegmentSize);
// read index segments
int indexSegmentsSize = stream.readInt();
List<FileInfo> indexSegments = new ArrayList<>();
for(int j = 0; j < indexSegmentsSize; j++){
Long fileSize = stream.readLong();
byte[] indexSegmentNameBytes = new byte[(int) stream.readLong()];
stream.readFully(indexSegmentNameBytes);
String indexSegmentName = new String(indexSegmentNameBytes);
indexSegments.add(new FileInfo(indexSegmentName, fileSize));
}
// read bloom filters
int bloomFiltersSize = stream.readInt();
List<FileInfo> bloomFilters = new ArrayList<>();
for(int j = 0; j < bloomFiltersSize; j++){
Long fileSize = stream.readLong();
byte[] bloomFilterNameBytes = new byte[(int) stream.readLong()];
stream.readFully(bloomFilterNameBytes);
String bloomFilterName = new String(bloomFilterNameBytes);
bloomFilters.add(new FileInfo(bloomFilterName, fileSize));
}
logInfoList.add(new LogInfo(logSegment, indexSegments, bloomFilters));
}
}

long computedCrc = crcStream.getValue();
long readCrc = stream.readLong();
if (computedCrc != readCrc) {
logger.error("Crc mismatch during peer token deserialization, computed " + computedCrc + ", read " + readCrc);
return new ArrayList<>();
}
return logInfoList;
} catch (IOException e) {
logger.error("IO error deserializing remote peer tokens", e);
return new ArrayList<>();
} finally {
stream.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.server.StoreManager;
import com.github.ambry.store.ChunkResponse;
import com.github.ambry.store.FileStore;
import com.github.ambry.store.LogInfo;
import com.github.ambry.store.Store;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.nio.file.FileStore;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -85,6 +85,11 @@ public boolean addFileStore(ReplicaId replicaId) {

}

@Override
public void setUpReplica(String partitionName) {
throw new UnsupportedOperationException("Method not supported");
}

@Override
public boolean shutdownBlobStore(PartitionId id) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class AmbryReplicaSyncUpManager implements ReplicaSyncUpManager {
private final ConcurrentHashMap<ReplicaId, LocalReplicaLagInfos> replicaToLagInfos = new ConcurrentHashMap<>();
private final ClusterMapConfig clusterMapConfig;
private final ReentrantLock updateLock = new ReentrantLock();
private final ConcurrentHashMap<String, CountDownLatch> partitionToFileCopyLatch = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Boolean> partitionToFileCopySuccessLatch = new ConcurrentHashMap<>();

public AmbryReplicaSyncUpManager(ClusterMapConfig clusterMapConfig) {
this.clusterMapConfig = clusterMapConfig;
Expand All @@ -65,7 +67,8 @@ public void initiateBootstrap(ReplicaId replicaId) {

@Override
public void initiateFileCopy(ReplicaId replicaId) {
//To Be Added With File Copy Protocol
partitionToFileCopyLatch.put(replicaId.getPartitionId().toPathString(), new CountDownLatch(1));
partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), false);
}

@Override
Expand Down Expand Up @@ -108,7 +111,18 @@ public void waitBootstrapCompleted(String partitionName) throws InterruptedExcep

@Override
public void waitForFileCopyCompleted(String partitionName) throws InterruptedException {
//To Be Added With File Copy Protocol
CountDownLatch latch = partitionToFileCopyLatch.get(partitionName);
if(latch == null) {
logger.info("Skipping file copy for existing partition {}", partitionName);
} else{
logger.info("Waiting for new partition to {} to comeplete FileCopy", partitionName);
latch.await();
partitionToFileCopyLatch.remove(partitionName);
if(!partitionToFileCopySuccessLatch.remove(partitionName)){
throw new StateTransitionException("Partition " + partitionName + " failed to copy files.", FileCopyProtocolFailure);
}
logger.info("File Copy is complete on partition {}", partitionName);
}
}

@Override
Expand Down Expand Up @@ -204,7 +218,8 @@ public void onBootstrapComplete(ReplicaId replicaId) {

@Override
public void onFileCopyComplete(ReplicaId replicaId) {
//To Be Added With File Copy Protocol
partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), true);
countDownLatch(partitionToFileCopyLatch, replicaId.getPartitionId().toPathString());
}

@Override
Expand Down
Loading

0 comments on commit 9cb483e

Please sign in to comment.