Skip to content

Commit

Permalink
++ Test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
DevenAhluwalia committed Dec 20, 2024
1 parent b1e001d commit bdd7024
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,6 @@
import static com.github.ambry.clustermap.TestUtils.*;
import static org.junit.Assert.*;


// Permits Replica to be constructed with a null Partition
class TestReplica extends Replica {
public TestReplica(HardwareLayout hardwareLayout, JSONObject jsonObject) throws JSONException {
super(hardwareLayout, null, jsonObject);
}

public TestReplica(TestHardwareLayout hardwareLayout, Disk disk) throws JSONException {
super(null, disk, hardwareLayout.clusterMapConfig);
}

@Override
public void validatePartition() {
// Null OK
}
}

/**
* Tests {@link Replica} class.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.github.ambry.clustermap;

import org.json.JSONException;
import org.json.JSONObject;


// Permits Replica to be constructed with a null Partition
public class TestReplica extends Replica {
public TestReplica(HardwareLayout hardwareLayout, JSONObject jsonObject) throws JSONException {
super(hardwareLayout, null, jsonObject);
}

public TestReplica(TestUtils.TestHardwareLayout hardwareLayout, Disk disk) throws JSONException {
super(null, disk, hardwareLayout.clusterMapConfig);
}

@Override
public void validatePartition() {
// Null OK
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.store;

import com.github.ambry.clustermap.ClusterParticipant;
Expand All @@ -11,6 +24,7 @@
import com.github.ambry.config.StoreConfig;
import com.github.ambry.server.StoreManager;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
Expand All @@ -24,7 +38,7 @@ public class BootstrapController {

private final String BOOTSTRAP_IN_PROGRESS_FILE_NAME;
private final String FILECOPY_IN_PROGRESS_FILE_NAME;
private final Pattern allLogSegmentFilesPattern = Pattern.compile("\\d+\\.log");
private final Pattern allLogSegmentFilesPattern = Pattern.compile(".*_log.*");
private final ServerConfig serverConfig;
private final StoreManager storeManager;
private final PartitionStateChangeListener storageManagerListener;
Expand Down Expand Up @@ -60,6 +74,7 @@ public void start() {
}

class BootstrapControllerImpl implements PartitionStateChangeListener {
EnumSet<ReplicationProtocolTransitionType> replicationProtocolTransitionType;

@Override
public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName) {
Expand All @@ -77,11 +92,17 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName)
// "New partition -> FC"
// This is a new partition placement and FileCopy bootstrap protocol is enabled.
listenerToInvoke = fileCopyManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.NEW_PARTITION_TO_FILE_BASED_BOOTSTRAP);
logStateChange("New partition -> FC", partitionName);
} else {
// "New partition -> R"
// This is a new partition placement and FileCopy bootstrap protocol is disabled.
listenerToInvoke = storageManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.NEW_PARTITION_TO_BLOB_BASED_BOOTSTRAP);
logStateChange("New partition -> R", partitionName);
}
} else {
Expand All @@ -91,20 +112,30 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName)
// Last attempt with blob based bootstrap protocol had failed for this partition.
// FileCopy bootstrap protocol is enabled but we will still continue with blob based bootstrap protocol.
listenerToInvoke = storageManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.BLOB_BASED_INCOMPLETE_TO_FILE_BASED_BOOTSTRAP);
logStateChange("R.Incomplete -> FC", partitionName);
} else if (isAnyLogSegmentExists(replica.getPartitionId())) {
if (isFileExists(replica.getPartitionId(), FILECOPY_IN_PROGRESS_FILE_NAME)) {
// FC.Incomplete -> FC
// Last attempt with FileCopy bootstrap protocol had failed for this partition.
// We will resume the boostrap with FileCopy bootstrap protocol.
listenerToInvoke = fileCopyManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.FILE_BASED_INCOMPLETE_TO_FILE_BASED_BOOTSTRAP);
logStateChange("FC.Incomplete -> FC", partitionName);
} else {
// R.complete -> FC or FC.complete -> FC
// Last attempt either with blob based or file based bootstrap protocol had succeeded for this partition.
// We'll continue with blob based bootstrap protocol for this partition to catch up with its peers.
// as part of Bootstrap->Standby state transition.
listenerToInvoke = storageManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.BLOB_BASED_COMPLETE_TO_FILE_BASED_BOOTSTRAP,
ReplicationProtocolTransitionType.FILE_BASED_COMPLETE_TO_FILE_BASED_BOOTSTRAP);
logStateChange("R.complete -> FC or FC.complete -> FC", partitionName);
}
}
Expand All @@ -114,6 +145,9 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName)
// Last attempt with blob based bootstrap protocol had failed for this partition.
// FileCopy bootstrap protocol is disabled and we will continue with blob based bootstrap protocol.
listenerToInvoke = storageManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.BLOB_BASED_INCOMPLETE_TO_BLOB_BASED_BOOTSTRAP);
logStateChange("R.Incomplete -> R", partitionName);
} else if (isAnyLogSegmentExists(replica.getPartitionId())) {
if (isFileExists(replica.getPartitionId(), FILECOPY_IN_PROGRESS_FILE_NAME)) {
Expand All @@ -128,13 +162,20 @@ public void onPartitionBecomeBootstrapFromOffline(@Nonnull String partitionName)
throw new StateTransitionException(message, BootstrapControllerFailure);
}
listenerToInvoke = storageManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.FILE_BASED_INCOMPLETE_TO_BLOB_BASED_BOOTSTRAP);
logStateChange("FC.Incomplete -> R", partitionName);
} else {
// R.complete -> R or FC.complete -> R
// Last attempt either with blob based or file based bootstrap protocol had succeeded for this partition.
// We'll continue with blob based bootstrap protocol for this partition to catch up with its peers.
// as part of Bootstrap->Standby state transition.
listenerToInvoke = storageManagerListener;

replicationProtocolTransitionType = EnumSet.of(
ReplicationProtocolTransitionType.BLOB_BASED_COMPLETE_TO_BLOB_BASED_BOOTSTRAP,
ReplicationProtocolTransitionType.FILE_BASED_COMPLETE_TO_BLOB_BASED_BOOTSTRAP);
logStateChange("R.complete -> R or FC.complete -> R", partitionName);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.github.ambry.store;

public enum ReplicationProtocolTransitionType {
/**
* Pre restart protocol: NA
* Bootstrap status: NA
* Post restart protocol: Blob based
*/
NEW_PARTITION_TO_BLOB_BASED_BOOTSTRAP,

/**
* Pre restart protocol: NA
* Bootstrap status: NA
* Post restart protocol: File based
*/
NEW_PARTITION_TO_FILE_BASED_BOOTSTRAP,

/**
* Pre restart protocol: Blob based
* Bootstrap status: Complete
* Post restart protocol: File based
*/
BLOB_BASED_COMPLETE_TO_FILE_BASED_BOOTSTRAP,

/**
* Pre restart protocol: File based
* Bootstrap status: Complete
* Post restart protocol: Blob based
*/
FILE_BASED_COMPLETE_TO_BLOB_BASED_BOOTSTRAP,

/**
* Pre restart protocol: Blob based
* Bootstrap status: Complete
* Post restart protocol: Blob based
*/
BLOB_BASED_COMPLETE_TO_BLOB_BASED_BOOTSTRAP,

/**
* Pre restart protocol: File based
* Bootstrap status: Complete
* Post restart protocol: File based
*/
FILE_BASED_COMPLETE_TO_FILE_BASED_BOOTSTRAP,

/**
* Pre restart protocol: Blob based
* Bootstrap status: InComplete
* Post restart protocol: Blob based
*/
BLOB_BASED_INCOMPLETE_TO_BLOB_BASED_BOOTSTRAP,

/**
* Pre restart protocol: File based
* Bootstrap status: InComplete
* Post restart protocol: File based
*/
FILE_BASED_INCOMPLETE_TO_FILE_BASED_BOOTSTRAP,

/**
* Pre restart protocol: Blob based
* Bootstrap status: InComplete
* Post restart protocol: File based
*/
BLOB_BASED_INCOMPLETE_TO_FILE_BASED_BOOTSTRAP,

/**
* Pre restart protocol: File based
* Bootstrap status: InComplete
* Post restart protocol: Blob based
*/
FILE_BASED_INCOMPLETE_TO_BLOB_BASED_BOOTSTRAP
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.github.ambry.store;

import com.github.ambry.clustermap.ClusterParticipant;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.PartitionStateChangeListener;
import com.github.ambry.clustermap.TestReplica;
import com.github.ambry.clustermap.ReplicaSyncUpManager;
import com.github.ambry.clustermap.StateModelListenerType;
import com.github.ambry.config.ServerConfig;
import com.github.ambry.config.ServerReplicationMode;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.server.StoreManager;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.junit.BeforeClass;
import org.junit.Test;

import static org.mockito.Mockito.*;


public class BootstrapControllerTest {
private static ServerConfig serverConfig;
private static StoreConfig storeConfig;
private static StoreManager storeManager;
private static String partitionName1 = "p1";
private static PartitionStateChangeListener fileCopyManagerListener;
private static PartitionStateChangeListener storageManagerListener;
private static ClusterParticipant primaryClusterParticipant;

@BeforeClass
public static void initialize() {
storeConfig = mock(StoreConfig.class);
storeManager = mock(StoreManager.class);
primaryClusterParticipant = mock(ClusterParticipant.class);

fileCopyManagerListener = mock(PartitionStateChangeListener.class);

storageManagerListener = mock(PartitionStateChangeListener.class);

Map<StateModelListenerType, PartitionStateChangeListener> partitionStateChangeListeners =
new HashMap<StateModelListenerType, PartitionStateChangeListener>() {
{
put(StateModelListenerType.FileCopyManagerListener, fileCopyManagerListener);
put(StateModelListenerType.StorageManagerListener, storageManagerListener);
}
};
when(primaryClusterParticipant.getPartitionStateChangeListeners())
.thenReturn(partitionStateChangeListeners);

when(primaryClusterParticipant.getReplicaSyncUpManager())
.thenReturn(mock(ReplicaSyncUpManager.class));
}

@Test
public void test__ReplicationProtocolTransitionType__NEW_PARTITION_TO_FILE_BASED_BOOTSTRAP() {
// Arrange
final BootstrapController.BootstrapControllerImpl bootstrapControllerImpl =
getBootstrapControllerImpl(ServerReplicationMode.FILE_BASED);

when(storeManager.getReplica(partitionName1)).thenReturn(null);

// Act
bootstrapControllerImpl.onPartitionBecomeBootstrapFromOffline(partitionName1);

// Assert
verify(fileCopyManagerListener, times(1))
.onPartitionBecomeBootstrapFromOffline(partitionName1);
verify(storageManagerListener, never())
.onPartitionBecomeBootstrapFromOffline(partitionName1);

assert bootstrapControllerImpl.replicationProtocolTransitionType.size() == 1;
assert bootstrapControllerImpl.replicationProtocolTransitionType.contains(
ReplicationProtocolTransitionType.NEW_PARTITION_TO_FILE_BASED_BOOTSTRAP);
}

@Test
public void test__ReplicationProtocolTransitionType__NEW_PARTITION_TO_BLOB_BASED_BOOTSTRAP() {
// Arrange
final BootstrapController.BootstrapControllerImpl bootstrapControllerImpl =
getBootstrapControllerImpl(ServerReplicationMode.BLOB_BASED);

when(storeManager.getReplica(partitionName1)).thenReturn(null);

// Act
bootstrapControllerImpl.onPartitionBecomeBootstrapFromOffline(partitionName1);

// Assert
verify(fileCopyManagerListener, never())
.onPartitionBecomeBootstrapFromOffline(partitionName1);
verify(storageManagerListener, times(1))
.onPartitionBecomeBootstrapFromOffline(partitionName1);

assert bootstrapControllerImpl.replicationProtocolTransitionType.size() == 1;
assert bootstrapControllerImpl.replicationProtocolTransitionType.contains(
ReplicationProtocolTransitionType.NEW_PARTITION_TO_BLOB_BASED_BOOTSTRAP);
}

@Test
public void test__ReplicationProtocolTransitionType__BLOB_BASED_COMPLETE_TO_FILE_BASED_BOOTSTRAP() {
// Arrange
final BootstrapController.BootstrapControllerImpl bootstrapControllerImpl =
getBootstrapControllerImpl(ServerReplicationMode.FILE_BASED);

TestReplica testReplica = mock(TestReplica.class);
PartitionId partitionId = mock(PartitionId.class);
when(testReplica.getPartitionId()).thenReturn(partitionId);
when(storeManager.getReplica(partitionName1)).thenReturn(testReplica);
when(storeManager.isFileExists(partitionId, storeConfig.storeBootstrapInProgressFile))
.thenReturn(true);

// Act
bootstrapControllerImpl.onPartitionBecomeBootstrapFromOffline(partitionName1);

// Assert
verify(fileCopyManagerListener, never())
.onPartitionBecomeBootstrapFromOffline(partitionName1);
verify(storageManagerListener, times(1))
.onPartitionBecomeBootstrapFromOffline(partitionName1);

assert bootstrapControllerImpl.replicationProtocolTransitionType.size() == 1;
assert bootstrapControllerImpl.replicationProtocolTransitionType.contains(
ReplicationProtocolTransitionType.BLOB_BASED_INCOMPLETE_TO_FILE_BASED_BOOTSTRAP);
}

private static BootstrapController.BootstrapControllerImpl getBootstrapControllerImpl(
ServerReplicationMode serverReplicationMode) {

Properties props = new Properties();
props.setProperty("server.replication.protocol.for.hydration", String.valueOf(serverReplicationMode));
serverConfig = new ServerConfig(new VerifiableProperties(props));

return new BootstrapController(storeManager, storeConfig, serverConfig, primaryClusterParticipant)
.new BootstrapControllerImpl();
}
}

0 comments on commit bdd7024

Please sign in to comment.