From 9cb483e5bb2a779f8d84dc8e2f6ad62b8a35332f Mon Sep 17 00:00:00 2001 From: aga9900 Date: Wed, 29 Jan 2025 14:20:55 +0530 Subject: [PATCH] Resolving conflicts --- .../clustermap/StateTransitionException.java | 2 +- .../com/github/ambry/server/StoreManager.java | 4 +- .../com/github/ambry/store/FileStore.java | 257 ++++++++++++++++++ .../ambry/cloud/CloudStorageManager.java | 7 +- .../clustermap/AmbryReplicaSyncUpManager.java | 21 +- .../ambry/clustermap/HelixParticipant.java | 45 +-- .../filetransfer/FileCopyController.java | 4 + ...ationManager.java => FileCopyManager.java} | 49 +++- .../filetransfer/FileCopyOperationState.java | 9 + .../FileCopyOperationTracker.java | 45 +++ .../ambry/store/BootstrapController.java | 2 +- .../com/github/ambry/store/DiskManager.java | 4 +- .../github/ambry/store/StorageManager.java | 55 +++- build.gradle | 1 + 14 files changed, 469 insertions(+), 36 deletions(-) create mode 100644 ambry-api/src/main/java/com/github/ambry/store/FileStore.java create mode 100644 ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyController.java rename ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/{FileBasedReplicationManager.java => FileCopyManager.java} (63%) create mode 100644 ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationState.java create mode 100644 ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationTracker.java diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java b/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java index 8bb0b73199..4d7a3580d8 100644 --- a/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java @@ -83,6 +83,6 @@ public enum TransitionErrorCode { /** * If Bootstap Controller fails in pre-filecopy steps for specific replica. */ - BootstrapControllerFailure + BootstrapControllerFailure, } } diff --git a/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java b/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java index 951b9fdbf5..f11a9fd128 100644 --- a/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java +++ b/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java @@ -17,13 +17,13 @@ import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.store.ChunkResponse; +import com.github.ambry.store.FileStore; import com.github.ambry.store.LogInfo; import com.github.ambry.store.Store; import com.github.ambry.store.StoreException; import java.io.DataInputStream; import java.io.FileInputStream; import java.io.IOException; -import java.nio.file.FileStore; import java.util.Collection; import java.util.List; import java.util.regex.Pattern; @@ -55,6 +55,8 @@ public interface StoreManager { */ boolean addFileStore(ReplicaId replicaId); + void setUpReplica(String partitionName); + /** * Build state after filecopy is completed * @param replica the {@link ReplicaId} of the {@link Store} for which store needs to be built diff --git a/ambry-api/src/main/java/com/github/ambry/store/FileStore.java b/ambry-api/src/main/java/com/github/ambry/store/FileStore.java new file mode 100644 index 0000000000..861ba585d4 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/store/FileStore.java @@ -0,0 +1,257 @@ +package com.github.ambry.store; + +import com.github.ambry.clustermap.FileStoreException; +import com.github.ambry.clustermap.FileStoreException.FileStoreErrorCode; +import com.github.ambry.config.FileCopyConfig; +import com.github.ambry.replication.FindToken; +import com.github.ambry.utils.CrcInputStream; +import com.github.ambry.utils.CrcOutputStream; +import com.github.ambry.utils.Pair; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class FileStore { + private static final Logger logger = LoggerFactory.getLogger(FileStore.class); + private static boolean isRunning = false; + private final String dataDir; + private final FileMetadataSerde fileMetadataSerde; + private final FileCopyConfig fileCopyConfig; + + public FileStore(String dataDir, FileCopyConfig fileCopyConfig){ + this.dataDir = dataDir; + this.fileMetadataSerde = new FileMetadataSerde(); + this.fileCopyConfig = fileCopyConfig; + } + + public void start() throws StoreException { + isRunning = true; + } + public boolean isRunning() { + return isRunning; + } + public void stop() { + isRunning = false; + } + + + // TODO Moved to BlobStore as the bootstrapping node wouldn't have FileStore instantiated. + public FileInputStream getStreamForFileRead(String mountPath, String fileName) + throws IOException { + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + // TODO: Handle edge cases and validations + String filePath = mountPath + "/" + fileName; + File file = new File(filePath); + // Check if file exists and is readable + if (!file.exists() || !file.canRead()) { + throw new IOException("File doesn't exist or cannot be read: " + filePath); + } + return new FileInputStream(file); + } + + public void putChunkToFile(String outputFilePath, FileInputStream fileInputStream) + throws IOException { + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + if(fileInputStream == null){ + throw new IllegalArgumentException("fileInputStream is null"); + } + // TODO: Handle edge cases and validations + + // Determine the size of the file + long fileSize = fileInputStream.available(); + + // Read all bytes from the source file and append them to the output file + + byte[] content = new byte[(int) fileSize]; // Read the content of the source file into a byte array + fileInputStream.read(content); // Read bytes into the array + Files.write(Paths.get(outputFilePath), content, StandardOpenOption.CREATE, StandardOpenOption.APPEND); + + System.out.println("Write successful for chunk to file: " + outputFilePath); + } + + // New class in input: List + public void persistMetaDataToFile(String mountPath, List logInfoList) throws IOException { + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + if(logInfoList == null){ + throw new IllegalArgumentException("logInfoList is null"); + } + + File temp = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName + ".tmp"); + File actual = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName); + try { + FileOutputStream fileStream = new FileOutputStream(temp); + fileMetadataSerde.persist(logInfoList, fileStream); + System.out.println("FileCopyMetadata file serialized and written to file: " + actual.getAbsolutePath()); + // swap temp file with the original file + temp.renameTo(actual); + logger.debug("Completed writing remote tokens to file {}", actual.getAbsolutePath()); + } catch (IOException e) { + logger.error("IO error while persisting tokens to disk {}", temp.getAbsoluteFile()); + throw e; + } + } + + + public List readMetaDataFromFile(String mountPath) throws IOException { + List logInfoList = new ArrayList<>(); + if(!isRunning){ + throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure); + } + + File fileCopyMetaDataFile = new File(mountPath, fileCopyConfig.fileCopyMetaDataFileName); + if (!fileCopyMetaDataFile.exists()) { + logger.info("fileCopyMetaDataFile {} not found", fileCopyMetaDataFile.getAbsolutePath()); + return logInfoList; + } + try { + FileInputStream fileStream = new FileInputStream(fileCopyMetaDataFile); + System.out.println("Attempting reading from file: " + fileCopyMetaDataFile.getAbsolutePath()); + logInfoList = fileMetadataSerde.retrieve(fileStream); + return logInfoList; + } catch (IOException e) { + logger.error("IO error while reading filecopy metadata from disk {}", fileCopyMetaDataFile.getAbsoluteFile()); + throw e; + } + } + + public void shutdown(){ + return; + } + + /** + * Class to serialize and deserialize replica tokens + */ + private static class FileMetadataSerde { + private static final short Crc_Size = 8; + private static final short VERSION_0 = 0; + private static final short CURRENT_VERSION = VERSION_0; + + public FileMetadataSerde() { + } + + /** + * Serialize the remote tokens to the file + * @param logInfoList the mapping from the replicas to the remote tokens + * @param outputStream the file output stream to write to + */ + public void persist(List logInfoList, OutputStream outputStream) + throws IOException { + CrcOutputStream crcOutputStream = new CrcOutputStream(outputStream); + DataOutputStream writer = new DataOutputStream(crcOutputStream); + try { + + writer.writeInt(logInfoList.size()); + for (LogInfo logInfo : logInfoList) { + // write log segment size and name + writer.writeLong(logInfo.getLogSegment().getFileSize()); + writer.writeLong(logInfo.getLogSegment().getFileName().getBytes().length); + writer.write(logInfo.getLogSegment().getFileName().getBytes()); + writer.writeInt(logInfo.getIndexSegments().size()); + for(FileInfo fileInfo : logInfo.getIndexSegments()){ + writer.writeLong(fileInfo.getFileSize()); + writer.writeLong(fileInfo.getFileName().getBytes().length); + writer.write(fileInfo.getFileName().getBytes()); + } + writer.writeInt(logInfo.getBloomFilters().size()); + for(FileInfo fileInfo: logInfo.getBloomFilters()){ + writer.writeLong(fileInfo.getFileSize()); + writer.writeLong(fileInfo.getFileName().getBytes().length); + writer.write(fileInfo.getFileName().getBytes()); + } + } + + long crcValue = crcOutputStream.getValue(); + writer.writeLong(crcValue); + } catch (IOException e) { + logger.error("IO error while serializing remote peer tokens", e); + throw e; + } finally { + if (outputStream instanceof FileOutputStream) { + // flush and overwrite file + ((FileOutputStream) outputStream).getChannel().force(true); + } + writer.close(); + } + } + + /** + * Deserialize the remote tokens + * @param inputStream the input stream from the persistent file + * @return the mapping from replicas to remote tokens + */ + public List retrieve(InputStream inputStream) throws IOException { + List logInfoList = new ArrayList<>(); + CrcInputStream crcStream = new CrcInputStream(inputStream); + DataInputStream stream = new DataInputStream(crcStream); + ConcurrentMap> peerTokens = new ConcurrentHashMap<>(); + try { + while (stream.available() > Crc_Size) { + int logInfoListSize = stream.readInt(); + for(int i = 0; i < logInfoListSize; i++){ + // read log segment name + Long logSegmentSize = stream.readLong(); + byte[] logSegmentNameBytes = new byte[(int) stream.readLong()]; + stream.readFully(logSegmentNameBytes); + String logSegmentName = new String(logSegmentNameBytes); + FileInfo logSegment = new FileInfo(logSegmentName, logSegmentSize); + // read index segments + int indexSegmentsSize = stream.readInt(); + List indexSegments = new ArrayList<>(); + for(int j = 0; j < indexSegmentsSize; j++){ + Long fileSize = stream.readLong(); + byte[] indexSegmentNameBytes = new byte[(int) stream.readLong()]; + stream.readFully(indexSegmentNameBytes); + String indexSegmentName = new String(indexSegmentNameBytes); + indexSegments.add(new FileInfo(indexSegmentName, fileSize)); + } + // read bloom filters + int bloomFiltersSize = stream.readInt(); + List bloomFilters = new ArrayList<>(); + for(int j = 0; j < bloomFiltersSize; j++){ + Long fileSize = stream.readLong(); + byte[] bloomFilterNameBytes = new byte[(int) stream.readLong()]; + stream.readFully(bloomFilterNameBytes); + String bloomFilterName = new String(bloomFilterNameBytes); + bloomFilters.add(new FileInfo(bloomFilterName, fileSize)); + } + logInfoList.add(new LogInfo(logSegment, indexSegments, bloomFilters)); + } + } + + long computedCrc = crcStream.getValue(); + long readCrc = stream.readLong(); + if (computedCrc != readCrc) { + logger.error("Crc mismatch during peer token deserialization, computed " + computedCrc + ", read " + readCrc); + return new ArrayList<>(); + } + return logInfoList; + } catch (IOException e) { + logger.error("IO error deserializing remote peer tokens", e); + return new ArrayList<>(); + } finally { + stream.close(); + } + } + } +} diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java index 4f8fef0e11..294eff93bf 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java @@ -23,11 +23,11 @@ import com.github.ambry.server.ServerErrorCode; import com.github.ambry.server.StoreManager; import com.github.ambry.store.ChunkResponse; +import com.github.ambry.store.FileStore; import com.github.ambry.store.LogInfo; import com.github.ambry.store.Store; import java.io.DataInputStream; import java.io.FileInputStream; -import java.nio.file.FileStore; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -85,6 +85,11 @@ public boolean addFileStore(ReplicaId replicaId) { } + @Override + public void setUpReplica(String partitionName) { + throw new UnsupportedOperationException("Method not supported"); + } + @Override public boolean shutdownBlobStore(PartitionId id) { try { diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java index d9de3e9902..cb20cf7366 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java @@ -49,6 +49,8 @@ public class AmbryReplicaSyncUpManager implements ReplicaSyncUpManager { private final ConcurrentHashMap replicaToLagInfos = new ConcurrentHashMap<>(); private final ClusterMapConfig clusterMapConfig; private final ReentrantLock updateLock = new ReentrantLock(); + private final ConcurrentHashMap partitionToFileCopyLatch = new ConcurrentHashMap<>(); + private final ConcurrentHashMap partitionToFileCopySuccessLatch = new ConcurrentHashMap<>(); public AmbryReplicaSyncUpManager(ClusterMapConfig clusterMapConfig) { this.clusterMapConfig = clusterMapConfig; @@ -65,7 +67,8 @@ public void initiateBootstrap(ReplicaId replicaId) { @Override public void initiateFileCopy(ReplicaId replicaId) { - //To Be Added With File Copy Protocol + partitionToFileCopyLatch.put(replicaId.getPartitionId().toPathString(), new CountDownLatch(1)); + partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), false); } @Override @@ -108,7 +111,18 @@ public void waitBootstrapCompleted(String partitionName) throws InterruptedExcep @Override public void waitForFileCopyCompleted(String partitionName) throws InterruptedException { - //To Be Added With File Copy Protocol + CountDownLatch latch = partitionToFileCopyLatch.get(partitionName); + if(latch == null) { + logger.info("Skipping file copy for existing partition {}", partitionName); + } else{ + logger.info("Waiting for new partition to {} to comeplete FileCopy", partitionName); + latch.await(); + partitionToFileCopyLatch.remove(partitionName); + if(!partitionToFileCopySuccessLatch.remove(partitionName)){ + throw new StateTransitionException("Partition " + partitionName + " failed to copy files.", FileCopyProtocolFailure); + } + logger.info("File Copy is complete on partition {}", partitionName); + } } @Override @@ -204,7 +218,8 @@ public void onBootstrapComplete(ReplicaId replicaId) { @Override public void onFileCopyComplete(ReplicaId replicaId) { - //To Be Added With File Copy Protocol + partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), true); + countDownLatch(partitionToFileCopyLatch, replicaId.getPartitionId().toPathString()); } @Override diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java index cb82a82e13..084c5ba6b3 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java @@ -858,24 +858,35 @@ private DataNodeConfig getDataNodeConfig() { @Override public void onPartitionBecomeBootstrapFromOffline(String partitionName) { try { - // 1. take actions in storage manager (add new replica if necessary) - PartitionStateChangeListener storageManagerListener = - partitionStateChangeListeners.get(StateModelListenerType.StorageManagerListener); - if (storageManagerListener != null) { - storageManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); - } - // 2. take actions in replication manager (add new replica if necessary) - PartitionStateChangeListener replicationManagerListener = - partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener); - if (replicationManagerListener != null) { - replicationManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); - } - // 3. take actions in stats manager (add new replica if necessary) - PartitionStateChangeListener statsManagerListener = - partitionStateChangeListeners.get(StateModelListenerType.StatsManagerListener); - if (statsManagerListener != null) { - statsManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); + if(partitionName == "xyz"){ + PartitionStateChangeListener partitionStateChangeListener = + partitionStateChangeListeners.get(StateModelListenerType.FileCopyManagerListener); + partitionStateChangeListener.onPartitionBecomeBootstrapFromOffline(partitionName); + replicaSyncUpManager.waitForFileCopyCompleted(partitionName); + } else { + // 1. take actions in storage manager (add new replica if necessary) + PartitionStateChangeListener storageManagerListener = + partitionStateChangeListeners.get(StateModelListenerType.StorageManagerListener); + if (storageManagerListener != null) { + storageManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); + } + // 2. take actions in replication manager (add new replica if necessary) + PartitionStateChangeListener replicationManagerListener = + partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener); + if (replicationManagerListener != null) { + replicationManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); + } + // 3. take actions in stats manager (add new replica if necessary) + PartitionStateChangeListener statsManagerListener = + partitionStateChangeListeners.get(StateModelListenerType.StatsManagerListener); + if (statsManagerListener != null) { + statsManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); + } } + } catch (InterruptedException e){ + logger.error("Bootstrap was interrupted on partition {}", partitionName); + localPartitionAndState.put(partitionName, ReplicaState.ERROR); + throw new StateTransitionException("Bootstrap failed or was interrupted", BootstrapFailure); } catch (Exception e) { localPartitionAndState.put(partitionName, ReplicaState.ERROR); throw e; diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyController.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyController.java new file mode 100644 index 0000000000..cbbf8cc6c0 --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyController.java @@ -0,0 +1,4 @@ +package com.github.ambry.filetransfer; + +public class FileCopyController { +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileBasedReplicationManager.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyManager.java similarity index 63% rename from ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileBasedReplicationManager.java rename to ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyManager.java index 998f56ca53..8bfff68cb5 100644 --- a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileBasedReplicationManager.java +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyManager.java @@ -17,30 +17,42 @@ import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.ClusterParticipant; import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.PartitionStateChangeListener; +import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.clustermap.StateModelListenerType; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.FileCopyBasedReplicationConfig; +import com.github.ambry.config.FileCopyConfig; import com.github.ambry.config.StoreConfig; import com.github.ambry.network.NetworkClientFactory; import com.github.ambry.replica.prioritization.PrioritizationManager; import com.github.ambry.server.StoreManager; +import com.github.ambry.store.FileStore; +import com.github.ambry.store.StoreException; import com.github.ambry.store.StoreKeyFactory; import java.io.IOException; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FileBasedReplicationManager { +public class FileCopyManager { protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final PrioritizationManager prioritizationManager; private final StoreManager storeManager; - public FileBasedReplicationManager(PrioritizationManager prioritizationManager, FileCopyBasedReplicationConfig fileCopyBasedReplicationConfig, ClusterMapConfig clusterMapConfig, - StoreConfig storeConfig, StoreManager storeManager, StoreKeyFactory storeKeyFactory, ClusterMap clusterMap, + private final FileCopyConfig fileCopyConfig; + + private final StoreConfig storeConfig; + + public FileCopyManager(PrioritizationManager prioritizationManager, FileCopyBasedReplicationConfig fileCopyBasedReplicationConfig, ClusterMapConfig clusterMapConfig, + FileCopyConfig fileCopyConfig, StoreConfig storeConfig, StoreManager storeManager, StoreKeyFactory storeKeyFactory, ClusterMap clusterMap, ScheduledExecutorService scheduler, DataNodeId dataNode, NetworkClientFactory networkClientFactory, MetricRegistry metricRegistry, ClusterParticipant clusterParticipant) throws InterruptedException { + this.fileCopyConfig = fileCopyConfig; + this.storeConfig = storeConfig; if (clusterParticipant != null) { clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.FileCopyManagerListener, new PartitionStateChangeListenerImpl()); @@ -55,15 +67,42 @@ public FileBasedReplicationManager(PrioritizationManager prioritizationManager, } public void start() throws InterruptedException, IOException { + } + + public void callMetaDataAPI(){ + } class PartitionStateChangeListenerImpl implements PartitionStateChangeListener { @Override public void onPartitionBecomeBootstrapFromOffline(String partitionName) { if(storeManager.getReplica(partitionName) == null){ - //storeManager.setUpReplica(partitionName); + storeManager.setUpReplica(partitionName); + } + ReplicaId replicaId = storeManager.getReplica(partitionName); + if(storeManager.getFileStore(replicaId.getPartitionId()) == null){ + storeManager.addFileStore(replicaId); + try { + storeManager.getFileStore(replicaId.getPartitionId()).start(); + } catch (StoreException e) { + throw new RuntimeException(e); + } } - prioritizationManager.addReplica(partitionName); + if(!storeManager.isFileExists(replicaId.getPartitionId(), storeConfig.storeFileCopyCompletedFileName)){ + FileStore fileStore = storeManager.getFileStore(replicaId.getPartitionId()); + PartitionId partitionId = replicaId.getPartitionId(); + List replicaIds = (List) partitionId.getReplicaIds(); + //TODO : Find the + String hostName = replicaIds.get(0).getDataNodeId().getHostname(); + + //callMetaData + //store metadata + //chunk API caller + //build state + }else{ + storeManager.buildStateForFileCopy(replicaId); + } + } @Override diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationState.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationState.java new file mode 100644 index 0000000000..a37f7e7671 --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationState.java @@ -0,0 +1,9 @@ +package com.github.ambry.filetransfer; + +public enum FileCopyOperationState { + Start, + META_DATA_REQUEST_SENT, + META_DATA_RESPONSE_RECEIVED, + CHUNK_DATA_REQUEST_IN_PROGRESS, + CHUNK_DATA_EXCHANGE_COMPLETE, +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationTracker.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationTracker.java new file mode 100644 index 0000000000..a6c78c1e2f --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyOperationTracker.java @@ -0,0 +1,45 @@ +package com.github.ambry.filetransfer; + +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.clustermap.ReplicaId; +import java.util.List; + + +public class FileCopyOperationTracker { + + private final PartitionId partitionId; + private final FileCopyOperationState fileCopyOperationState; + + + public FileCopyOperationTracker(PartitionId partitionId, FileCopyOperationState fileCopyOperationState) { + this.partitionId = partitionId; + this.fileCopyOperationState = fileCopyOperationState; + + } + + void start(){ + while(fileCopyOperationState.equals(FileCopyOperationState.CHUNK_DATA_EXCHANGE_COMPLETE)) { + switch (fileCopyOperationState){ + case Start: + break; + case META_DATA_REQUEST_SENT: + break; + case META_DATA_RESPONSE_RECEIVED: + break; + case CHUNK_DATA_REQUEST_IN_PROGRESS: + break; + case CHUNK_DATA_EXCHANGE_COMPLETE: + break; + } + + if (fileCopyOperationState.equals(FileCopyOperationState.Start)){ + + + List replicaIds = (List) partitionId.getReplicaIds(); + String hostName = replicaIds.get(0).getDataNodeId().getHostname(); + String partitionName = String.valueOf(partitionId.getId()); + //fileCopyOperationState = FileCopyOperationState.META_DATA_REQUEST_SENT; + } + } + } +} diff --git a/ambry-store/src/main/java/com/github/ambry/store/BootstrapController.java b/ambry-store/src/main/java/com/github/ambry/store/BootstrapController.java index c0999d1013..83cc584b0d 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/BootstrapController.java +++ b/ambry-store/src/main/java/com/github/ambry/store/BootstrapController.java @@ -103,7 +103,7 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName) PartitionStateChangeListener listenerToInvoke = null; if (null == replica) { - if (isFileCopyFeatureEnabled()) { + if (isFileCopyFeatureEnabled() && partitionName == "") { // "New partition -> FC" // This is a new partition placement and FileCopy bootstrap protocol is enabled. listenerToInvoke = fileCopyManagerListener; diff --git a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java index 38b0f68ec8..1a17871677 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java @@ -824,8 +824,8 @@ public DiskHealthStatus getDiskHealthStatus() { * @param fileName * @return */ - public boolean isFileExists(String fileName) { - String filePath = this.disk.getMountPath() + File.separator + fileName; + public boolean isFileExists(PartitionId partitionId, String fileName) { + String filePath = this.disk.getMountPath() + "/" + partitionId.toPathString() + File.separator + fileName; return new File(filePath).exists(); } diff --git a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java index 6b008b7a25..ef469fa266 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java @@ -40,7 +40,6 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.nio.file.FileStore; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -486,7 +485,7 @@ public boolean controlCompactionForBlobStore(PartitionId id, boolean enabled) { @Override public boolean isFileExists(PartitionId partitionId, String fileName) { - return this.getDiskManager(partitionId).isFileExists(fileName); + return this.getDiskManager(partitionId).isFileExists(partitionId, fileName); } @Override @@ -607,10 +606,56 @@ public boolean addBlobStore(ReplicaId replica) { * @param replica the {@link ReplicaId} of the {@link Store} for which store needs to be built */ @Override - public boolean addFileStore(ReplicaId replicaId) { - //TODO: Implementation To Be added. - return false; + public boolean addFileStore(ReplicaId replica) { + if (partitionToDiskManager.containsKey(replica.getPartitionId())) { + logger.info("{} already exists in storage manager, rejecting adding store request", replica.getPartitionId()); + return false; + } + DiskManager diskManager = addDisk(replica.getDiskId()); + if (diskManager == null || !diskManager.addBlobStore(replica)) { + logger.error("Failed to add new store into DiskManager"); + return false; + } + partitionToDiskManager.put(replica.getPartitionId(), diskManager); + partitionNameToReplicaId.put(replica.getPartitionId().toPathString(), replica); + logger.info("New store is successfully added into StorageManager"); + return true; + } + + @Override + public void setUpReplica(String partitionName) { + ReplicaId replica = partitionNameToReplicaId.get(partitionName); + if (replica == null) { + ReplicaId replicaToAdd; + boolean replicaAdded = false; + do { + // there can be two scenarios: + // 1. this is the first time to add new replica onto current node; + // 2. last replica addition failed at some point before updating InstanceConfig in Helix + // In either case, we should add replica to current node by calling "addBlobStore(ReplicaId replica)" + replicaToAdd = clusterMap.getBootstrapReplica(partitionName, currentNode); + if (replicaToAdd == null) { + logger.error("No new replica found for partition {} in cluster map", partitionName); + throw new StateTransitionException( + "New replica " + partitionName + " is not found in clustermap for " + currentNode, ReplicaNotFound); + } + // Attempt to add store into storage manager. If store already exists on disk (but not in clustermap), make + // sure old store of this replica is deleted (this store may be created in previous replica addition but failed + // at some point). Then a brand new store associated with this replica should be created and started. + if (!addFileStore(replicaToAdd)) { + // We have decreased the available disk space in HelixClusterManager#getDiskForBootstrapReplica. Increase it + // back since addition of store failed. + replicaToAdd.getDiskId().increaseAvailableSpaceInBytes(replicaToAdd.getCapacityInBytes()); + + logger.info("Failed to add store {} at location {}. Retrying bootstrapping replica at different location", + partitionName, replicaToAdd.getReplicaPath()); + }else{ + replicaAdded = true; + } + } while (!replicaAdded); + } } + public void buildStateForFileCopy(ReplicaId replica){ if (replica == null) { logger.error("ReplicaId is null"); diff --git a/build.gradle b/build.gradle index ea929a848c..06edbc7f7a 100644 --- a/build.gradle +++ b/build.gradle @@ -334,6 +334,7 @@ project (':ambry-prioritization') { project (':ambry-file-transfer') { dependencies{ compile project(':ambry-api') + compile project(':ambry-protocol') compile project(':ambry-commons') compile project(':ambry-store') compile project(':ambry-prioritization')