Skip to content

Commit

Permalink
Added initial changes for state build
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Balani committed Nov 29, 2024
1 parent 196590e commit d186728
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,17 @@ default void onPartitionBecomeDroppedFromError(String partitionName) {
default void onPartitionBecomeOfflineFromError(String partitionName) {
}

/**
* Action to take when state building is triggered for filecopy
* @param partitionName of the partition.
*/
void buildStateForFileCopy(String partitionName);

/**
* Action to take when reset method is called on certain partition.
* @param partitionName of the partition.
*/
default void onReset(String partitionName) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public enum TransitionErrorCode {
* If replica is not present in Helix and not found on current node.
*/
ReplicaNotFound,

/**
* If disk manager not found during state build.
*/
DiskManagerNotFoundForFileCopyStateBuild,
/**
* If failure occurs during replica operation (i.e. replica addition/removal in StoreManager, ReplicationManager).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,6 @@ public class ClusterMapConfig {
public final boolean clusterMapIgnoreDownwardStateTransition;

public static final String ENABLE_FILE_COPY_FOR_BOOTSTRAP = "clustermap.enable.file.copy.for.bootstrap";
@Config(ENABLE_FILE_COPY_FOR_BOOTSTRAP)
@Default("false")
public final boolean enableFileCopyForBootstrap;

public ClusterMapConfig(VerifiableProperties verifiableProperties) {
clusterMapFixedTimeoutDatanodeErrorThreshold =
Expand Down Expand Up @@ -460,6 +457,5 @@ public ClusterMapConfig(VerifiableProperties verifiableProperties) {
verifiableProperties.getLongInRange("clustermap.default.replica.capacity.in.bytes", 384L * 1024 * 1024 * 1024,
0, Long.MAX_VALUE);
clusterMapIgnoreDownwardStateTransition = verifiableProperties.getBoolean(IGNORE_DOWNWARD_STATE_TRANSITION, false);
enableFileCopyForBootstrap = verifiableProperties.getBoolean(ENABLE_FILE_COPY_FOR_BOOTSTRAP, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -675,10 +675,10 @@ public class StoreConfig {
public final static String storeBlockStaleBlobStoreToStartName = "store.block.stale.blob.store.to.start";


@Config(ENABLE_FILE_COPY_FOR_BOOTSTRAP)
@Config(STORE_ENABLE_FILE_COPY_FOR_BOOTSTRAP)
@Default("false")
public final boolean enableFileCopyForBootstrap;
public static final String ENABLE_FILE_COPY_FOR_BOOTSTRAP = "clustermap.enable.file.copy.for.bootstrap";
public final boolean storeEnableFileCopyForBootstrap;
public static final String STORE_ENABLE_FILE_COPY_FOR_BOOTSTRAP = "store.enable.file.copy.for.bootstrap";


/**
Expand Down Expand Up @@ -878,6 +878,6 @@ public StoreConfig(VerifiableProperties verifiableProperties) {
storeStaleTimeInDays = verifiableProperties.getIntInRange(storeStaleTimeInDaysName, 7, 0, Integer.MAX_VALUE);
storeBlockStaleBlobStoreToStart = verifiableProperties.getBoolean(storeBlockStaleBlobStoreToStartName, false);
storeReshuffleDisksOnReorder = verifiableProperties.getBoolean(storeReshuffleDisksOnReorderName, false);
enableFileCopyForBootstrap = verifiableProperties.getBoolean(ENABLE_FILE_COPY_FOR_BOOTSTRAP, false);
storeEnableFileCopyForBootstrap = verifiableProperties.getBoolean(STORE_ENABLE_FILE_COPY_FOR_BOOTSTRAP, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,11 @@ public void onPartitionBecomeDroppedFromOffline(String partitionName) {
partitionName);
}

@Override
public void buildStateForFileCopy(String partitionName) {
// no op
}

/**
* If only config specified list of partitions are being replicated from cloud, then check that the partition
* belongs to the specified list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
*/
public class HelixParticipant implements ClusterParticipant, PartitionStateChangeListener {
public static final String DISK_KEY = "DISK";

public static final boolean ENABLE_FILE_COPY = false;
final HelixParticipantMetrics participantMetrics;
private final HelixClusterManager clusterManager;
private final String clusterName;
Expand Down Expand Up @@ -1074,6 +1076,11 @@ public void onPartitionBecomeOfflineFromError(String partitionName) {
localPartitionAndState.put(partitionName, ReplicaState.OFFLINE);
}

@Override
public void buildStateForFileCopy(String partitionName) {
// no op
}

@Override
public void onReset(String partitionName) {
localPartitionAndState.put(partitionName, ReplicaState.OFFLINE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public void onPartitionBecomeOfflineFromInactive(String partitionName) {
public void onPartitionBecomeDroppedFromOffline(String partitionName) {
// no op
}

@Override
public void buildStateForFileCopy(String partitionName) {
// no op
}
}, mock(HelixClusterManager.class));
StateModel stateModel;
switch (config.clustermapStateModelDefinition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,5 +412,10 @@ public void onPartitionBecomeDroppedFromOffline(String partitionName) {
// the transition
removeReplica(replica);
}

@Override
public void buildStateForFileCopy(String partitionName) {
// no op
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -532,5 +532,10 @@ public void onPartitionBecomeDroppedFromOffline(String partitionName) {
// remove replica from in-mem data structure. If replica doesn't exist, log info but don't fail the transition
removeReplica(replica);
}

@Override
public void buildStateForFileCopy(String partitionName) {
// no op
}
}
}
23 changes: 10 additions & 13 deletions ambry-store/src/main/java/com/github/ambry/store/DiskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ boolean controlCompactionForBlobStore(PartitionId id, boolean enabled) {
return succeed;
}

/**
* Add store to compaction manager
*/
void addBlobStoreToCompactionManager(BlobStore store){
compactionManager.addBlobStore(store);
}

/**
* Add a new BlobStore with given {@link ReplicaId}.
* @param replica the {@link ReplicaId} of the {@link Store} which would be added.
Expand Down Expand Up @@ -447,22 +454,12 @@ boolean addBlobStore(ReplicaId replica) {
BlobStore store = new BlobStore(replica, storeConfig, scheduler, longLivedTaskScheduler, this, diskIOScheduler,
diskSpaceAllocator, storeMainMetrics, storeUnderCompactionMetrics, keyFactory, recovery, hardDelete,
replicaStatusDelegates, time, accountService, null, indexPersistScheduler);
// For Filecopy, we do not init the BlobStore since new log/index isn't required, the files will be directly
// copied from remote node.
if (!storeConfig.enableFileCopyForBootstrap) {
store.start();
}

store.start();
// collect store segment requirements and add into DiskSpaceAllocator
List<DiskSpaceRequirements> storeRequirements = Collections.singletonList(store.getDiskSpaceRequirements());
diskSpaceAllocator.addRequiredSegments(diskSpaceAllocator.getOverallRequirements(storeRequirements), false);

// We don't need to add the store into compaction manager for filecopy approach
if (!storeConfig.enableFileCopyForBootstrap) {
// add store into CompactionManager
compactionManager.addBlobStore(store);
}

// add store into CompactionManager
compactionManager.addBlobStore(store);
// add new created store into in-memory data structures.
stores.put(replica.getPartitionId(), store);
partitionToReplicaMap.put(replica.getPartitionId(), replica);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,6 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
}
} while (!replicaAdded);



if (primaryClusterParticipant != null) {
// update InstanceConfig in Helix
try {
Expand Down Expand Up @@ -1012,6 +1010,13 @@ public void onPartitionBecomeDroppedFromOffline(String partitionName) {
logger.info("Partition {} is successfully dropped on current node", partitionName);
}



@Override
public void buildStateForFileCopy(String partitionName) {
// no op
}

/**
* Return true if the blob store should resume the decommission
* @param store The {@link BlobStore}
Expand Down

0 comments on commit d186728

Please sign in to comment.