Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State Build implementation for File Copy based replication #2954

Merged
merged 8 commits into from
Dec 23, 2024
13 changes: 13 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/server/StoreManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
*/
public interface StoreManager {

/**
* Add a new BlobStore for FileCopy based replication with given {@link ReplicaId}.
* @param replica the {@link ReplicaId} of the {@link Store} which would be added.
* @return {@code true} if adding store was successful. {@code false} if not.
*/
boolean addBlobStoreForFileCopy(ReplicaId replica);

/**
* Add a new BlobStore with given {@link ReplicaId}.
* @param replica the {@link ReplicaId} of the {@link Store} which would be added.
Expand All @@ -44,6 +51,12 @@ public interface StoreManager {
*/
boolean addFileStore(ReplicaId replicaId);

/**
* Build state after filecopy is completed
* @param replica the {@link ReplicaId} of the {@link Store} for which store needs to be built
*/
void buildStateForFileCopy(ReplicaId replica);

/**
* Remove store from storage manager.
* @param id the {@link PartitionId} associated with store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,19 @@ public CloudStorageManager(VerifiableProperties properties, VcrMetrics vcrMetric
lock = new ReentrantReadWriteLock();
}

@Override
public boolean addBlobStoreForFileCopy(ReplicaId replica) {
throw new UnsupportedOperationException("Method not supported");
}

@Override
public boolean addBlobStore(ReplicaId replica) {
return createAndStartBlobStoreIfAbsent(replica.getPartitionId()) != null;
}
@Override
public void buildStateForFileCopy(ReplicaId replica){
throw new UnsupportedOperationException("Method not supported");
}

/**
* Returning false because this will not be used as part of CloudStorageManager Implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,15 @@ public boolean startBlobStore(PartitionId id) {
return returnValueOfStartingBlobStore;
}

@Override
public void buildStateForFileCopy(ReplicaId replica) {
throw new UnsupportedOperationException("Method not supported");
}
@Override
public boolean addBlobStoreForFileCopy(ReplicaId replica) {
throw new UnsupportedOperationException("Method not supported");
}

@Override
public boolean addBlobStore(ReplicaId id) {
updatePartitionToDiskManager(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,43 @@ boolean controlCompactionForBlobStore(PartitionId id, boolean enabled) {
return succeed;
}


/**
* Add a new BlobStore with given {@link ReplicaId}.
* @param replica the {@link ReplicaId} of the {@link Store} which would be added.
* @return {@code true} if adding store was successful. {@code false} if not.
*/
boolean addBlobStoreForFileCopy(ReplicaId replica) {
rwLock.writeLock().lock();
boolean succeed = false;
try {
if (!running) {
logger.error("Failed to add {} because disk manager is not running", replica.getPartitionId());
} else {
// Directory is already created in prefilecopy steps. So no directory cleanup required.
BlobStore store = new BlobStore(replica, storeConfig, scheduler, longLivedTaskScheduler, this, diskIOScheduler,
diskSpaceAllocator, storeMainMetrics, storeUnderCompactionMetrics, keyFactory, recovery, hardDelete,
replicaStatusDelegates, time, accountService, null, indexPersistScheduler);
store.start();
piyujai marked this conversation as resolved.
Show resolved Hide resolved
// add store into CompactionManager
compactionManager.addBlobStore(store);
// add new created store into in-memory data structures.
stores.put(replica.getPartitionId(), store);
// create a bootstrap-in-progress file to distinguish it from regular stores (the file will be checked during
// BOOTSTRAP -> STANDBY transition)
createBootstrapFileIfAbsent(replica);
piyujai marked this conversation as resolved.
Show resolved Hide resolved
logger.info("New store for partitionId {} is successfully added into DiskManager.", replica.getPartitionId());
succeed = true;
}
} catch (Exception e) {
logger.error("Failed to start new added store for partitionId {} for FileCopy based replication", replica.getPartitionId(),
e);
} finally {
rwLock.writeLock().unlock();
}
return succeed;
}

/**
* Add a new BlobStore with given {@link ReplicaId}.
* @param replica the {@link ReplicaId} of the {@link Store} which would be added.
Expand Down Expand Up @@ -460,7 +497,7 @@ boolean addBlobStore(ReplicaId replica) {
// create a bootstrap-in-progress file to distinguish it from regular stores (the file will be checked during
// BOOTSTRAP -> STANDBY transition)
createBootstrapFileIfAbsent(replica);
logger.info("New store is successfully added into DiskManager.");
logger.info("New store is successfully added into DiskManager for partitionId {}.", replica.getPartitionId());
succeed = true;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,28 @@ DiskManager addDisk(DiskId diskId) {
});
}

/**
* Add a new store to the storage manager for file copy based replication post filecopy is completed.
* @param replica the {@link ReplicaId} of the {@link Store} which would be added.
* @return
*/
@Override
public boolean addBlobStoreForFileCopy(ReplicaId replica) {
piyujai marked this conversation as resolved.
Show resolved Hide resolved
if (!partitionToDiskManager.containsKey(replica.getPartitionId())) {
logger.info("PartitionId {} doesn't exist in storage manager during state build, rejecting adding store request", replica.getPartitionId());
return false;
}
// We don't require addDisk since DiskManager is already started during initialization of StorageManager as part
// of prefilecopy steps. We will fetch it from partitionToDiskManager map.
DiskManager diskManager = partitionToDiskManager.get(replica.getPartitionId());
if (diskManager == null || !diskManager.addBlobStoreForFileCopy(replica)) {
logger.error("Failed to add new store into DiskManager");
return false;
}
logger.info("New store is successfully added into StorageManager");
return true;
}

@Override
public boolean addBlobStore(ReplicaId replica) {
if (partitionToDiskManager.containsKey(replica.getPartitionId())) {
Expand All @@ -553,11 +575,51 @@ public boolean addBlobStore(ReplicaId replica) {
return true;
}

/**
* Build inmemory state for file copy based replication post filecopy is completed.
* @param replica the {@link ReplicaId} of the {@link Store} for which store needs to be built
*/
@Override
public boolean addFileStore(ReplicaId replicaId) {
//TODO: Implementation To Be added.
return false;
}
public void buildStateForFileCopy(ReplicaId replica){
if (replica == null) {
logger.error("ReplicaId is null");
throw new StateTransitionException("ReplicaId null is not found in clustermap for " + currentNode, ReplicaNotFound);
}
PartitionId partitionId = replica.getPartitionId();

if (!addBlobStoreForFileCopy(replica)){
piyujai marked this conversation as resolved.
Show resolved Hide resolved
// We have decreased the available disk space in HelixClusterManager#getDiskForBootstrapReplica. Increase it
// back since addition of store failed.
replica.getDiskId().increaseAvailableSpaceInBytes(replica.getCapacityInBytes());
piyujai marked this conversation as resolved.
Show resolved Hide resolved
if (!clusterMap.isDataNodeInFullAutoMode(currentNode)) {
logger.error("Failed to add store for replica {} into storage manager", partitionId.getId());
throw new StateTransitionException("Failed to add store for replica " + partitionId.getId() + " into storage manager",
ReplicaOperationFailure);
} else {
logger.info("Failed to add store for replica {} at location {}. Cleanup and raise StateTransitionException",
partitionId.getId(), replica.getReplicaPath());
// This will remove the reserved space from diskSpaceAllocator
tryRemoveFailedBootstrapBlobStore(replica);
piyujai marked this conversation as resolved.
Show resolved Hide resolved
// Throwing StateTransitionException here since we cannot retry adding BlobStore since Filecopy has copied data
// into the selected disk itself. Hence, putting the replica into ERROR state via StateTransitionException
throw new StateTransitionException("Failed to add store for replica " + partitionId.getId() + " into storage manager",
ReplicaOperationFailure);
}
}
Store store = getStore(replica.getPartitionId(), false);
// Only update store state if this is a state transition for primary participant. Since replication Manager
// which eventually moves this state to STANDBY/LEADER only listens to primary participant, store state gets
// stuck in BOOTSTRAP if this is updated by second participant listener too
ReplicaState currentState = store.getCurrentState();
if (currentState != ReplicaState.LEADER && currentState != ReplicaState.STANDBY) {
// Only set the current state to BOOTSTRAP when it's not LEADER or STANDBY
store.setCurrentState(ReplicaState.BOOTSTRAP);
}
}

/**
* If a bootstrap replica fails, try to remove all the files and directories associated with it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -269,6 +270,132 @@ public void scheduleAndControlCompactionTest() throws Exception {
shutdownAndAssertStoresInaccessible(storageManager, replicas);
}

/**
* Helper util to add blobs to a given Store
* @param store store to add blob to.
* @param size size in bytes of the randomized blob
* @param expiresAtMs expiry in milliseconds to be set for the blob
* @return {@link MockId} the mock id of the blob added
* @throws StoreException
*/
public MockId addRandomBlobToStore(Store store, long size, long expiresAtMs) throws StoreException {
piyujai marked this conversation as resolved.
Show resolved Hide resolved
final Random random = new Random();
short accountId = Utils.getRandomShort(TestUtils.RANDOM);
short containerId = Utils.getRandomShort(TestUtils.RANDOM);
short lifeVersion = MessageInfo.LIFE_VERSION_FROM_FRONTEND;

MockId id = new MockId(TestUtils.getRandomString(MOCK_ID_STRING_LENGTH), accountId, containerId);
long crc = random.nextLong();
MessageInfo info =
new MessageInfo(id, size, false, false, false, expiresAtMs, crc, id.getAccountId(), id.getContainerId(),
Utils.Infinite_Time, lifeVersion);
ByteBuffer buffer = ByteBuffer.wrap(TestUtils.getRandomBytes((int) size));
store.put(new MockMessageWriteSet(Collections.singletonList(info), Collections.singletonList(buffer)));
return id;
}

private StorageManager initializeStorageManagerForStateBuildTests(int newMountPathIndex) throws Exception {
generateConfigs(true, false);
MockDataNodeId localNode = clusterMap.getDataNodes().get(0);
// add new MountPath to local node
File f = File.createTempFile("ambry", ".tmp");
File mountFile =
new File(f.getParent(), "mountpathfile" + MockClusterMap.PLAIN_TEXT_PORT_START_NUMBER + newMountPathIndex);
MockClusterMap.deleteFileOrDirectory(mountFile);
assertTrue("Couldn't create mount path directory", mountFile.mkdir());
localNode.addMountPaths(Collections.singletonList(mountFile.getAbsolutePath()));

StorageManager storageManager = createStorageManager(localNode, metricRegistry, null);
storageManager.start();
return storageManager;
}

// TODO: Add additional negative tests for StateBuild exception handling.
/**
* Test buildStateForFileCopy with newly created {@link ReplicaId}.
* @throws Exception
*/
@Test
public void buildStateForFileCopyTest() throws Exception {
int newMountPathIndex = 3;
int newPartitionId = 803;
StorageManager storageManager = initializeStorageManagerForStateBuildTests(newMountPathIndex);
PartitionId newPartition =
new MockPartitionId(newPartitionId, MockClusterMap.DEFAULT_PARTITION_CLASS, clusterMap.getDataNodes(), newMountPathIndex);
MockDataNodeId localNode = clusterMap.getDataNodes().get(0);
// test add store onto a new disk, which should succeed
assertTrue("Add new store should succeed", storageManager.addBlobStore(newPartition.getReplicaIds().get(0)));
assertNotNull("The store shouldn't be null because new store is successfully added",
storageManager.getStore(newPartition, false));
DiskManager dm = storageManager.getDiskManager(newPartition);
Store store = dm.getStore(newPartition, false);
MockId id1 = addRandomBlobToStore(store, 100, Utils.Infinite_Time);
MockId id2 = addRandomBlobToStore(store, 200, Utils.Infinite_Time);

// Shutdown store and try to build state using buildStateForFileCopy assuming state has to be built for the same
// store with 2 blobs present on the partition's file on disk.
store.shutdown();
storageManager.buildStateForFileCopy(newPartition.getReplicaIds().get(0));
dm = storageManager.getDiskManager(newPartition);
store = dm.getStore(newPartition, false);
assertNotNull(store.get(Collections.singletonList(id1), EnumSet.noneOf(StoreGetOptions.class)));
piyujai marked this conversation as resolved.
Show resolved Hide resolved
piyujai marked this conversation as resolved.
Show resolved Hide resolved
assertNotNull(store.get(Collections.singletonList(id2), EnumSet.noneOf(StoreGetOptions.class)));
}


/**
* Test buildStateForFileCopy with newly created {@link ReplicaId} for failure to add an already started blob store.
* @throws Exception
*/
@Test
public void buildStateForFileCopyDuplicateBlobStoreFailureTest() throws Exception {
int newMountPathIndex = 3;
int newPartitionId = 803;
StorageManager storageManager = initializeStorageManagerForStateBuildTests(newMountPathIndex);

PartitionId newPartition =
new MockPartitionId(newPartitionId, MockClusterMap.DEFAULT_PARTITION_CLASS, clusterMap.getDataNodes(), newMountPathIndex);

// test add store onto a new disk, which should succeed
assertTrue("Add new store should succeed", storageManager.addBlobStore(newPartition.getReplicaIds().get(0)));
assertNotNull("The store shouldn't be null because new store is successfully added",
storageManager.getStore(newPartition, false));
// Attempting to add store via addBlobStoreForFileCopy should fail since the newPartition already has store started.
assertFalse("Add store which is already existing should fail", storageManager.addBlobStoreForFileCopy(newPartition.getReplicaIds().get(0)));
piyujai marked this conversation as resolved.
Show resolved Hide resolved
storageManager.getStore(newPartition, false).shutdown();

// Testing flow where addBlobStoreForFileCopy is called before addBlobStore
// test add store onto a new disk, which should succeed
assertTrue("Add store using addBlobStoreForFileCopy should succeed", storageManager.addBlobStoreForFileCopy(newPartition.getReplicaIds().get(0)));
assertNotNull("The store shouldn't be null because new store is successfully added",
storageManager.getStore(newPartition, false));
// This should fail since the newPartition already has store started.
assertFalse("Add the duplicate store using addBlobStore should fail", storageManager.addBlobStore(newPartition.getReplicaIds().get(0)));
storageManager.getStore(newPartition, false).shutdown();
}

/**
* Test buildStateForFileCopy with {@link ReplicaId} as null.
* @throws Exception
*/
@Test(expected = StateTransitionException.class)
public void buildStateForFileCopyReplicaNullFailureTest() throws Exception {
int newMountPathIndex = 3;
int newPartitionId = 803;
StorageManager storageManager = initializeStorageManagerForStateBuildTests(newMountPathIndex);

PartitionId newPartition =
new MockPartitionId(newPartitionId, MockClusterMap.DEFAULT_PARTITION_CLASS, clusterMap.getDataNodes(), newMountPathIndex);
// test add store onto a new disk, which should succeed
assertTrue("Add new store should succeed", storageManager.addBlobStore(newPartition.getReplicaIds().get(0)));
assertNotNull("The store shouldn't be null because new store is successfully added",
storageManager.getStore(newPartition, false));
// This should fail since the newPartition already has store started.
assertFalse("Add store which is already existing should fail", storageManager.addBlobStoreForFileCopy(newPartition.getReplicaIds().get(0)));
storageManager.buildStateForFileCopy(null);
piyujai marked this conversation as resolved.
Show resolved Hide resolved
}


/**
* Test add new BlobStore with given {@link ReplicaId}.
*/
Expand Down
Loading