Skip to content

Commit

Permalink
Final step for T3 demo: getBlob operations post state build for T3 de…
Browse files Browse the repository at this point in the history
…mo added
  • Loading branch information
Jai Balani committed Jan 30, 2025
1 parent fda8548 commit 7e25168
Showing 1 changed file with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.clustermap.StaticClusterManager;
import com.github.ambry.clustermap.VcrClusterAgentsFactory;
import com.github.ambry.commons.BlobId;
import com.github.ambry.commons.Callback;
import com.github.ambry.commons.LoggingNotificationSystem;
import com.github.ambry.commons.NettyInternalMetrics;
Expand All @@ -56,8 +57,11 @@
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.messageformat.BlobStoreHardDelete;
import com.github.ambry.messageformat.BlobStoreRecovery;
import com.github.ambry.messageformat.MessageFormatFlags;
import com.github.ambry.network.BlockingChannelConnectionPool;
import com.github.ambry.network.ConnectedChannel;
import com.github.ambry.network.ConnectionPool;
import com.github.ambry.network.ConnectionPoolTimeoutException;
import com.github.ambry.network.LocalNetworkClientFactory;
import com.github.ambry.network.LocalRequestResponseChannel;
import com.github.ambry.network.NettyServerRequestResponseChannel;
Expand All @@ -74,6 +78,10 @@
import com.github.ambry.network.http2.Http2NetworkClientFactory;
import com.github.ambry.network.http2.Http2ServerMetrics;
import com.github.ambry.notification.NotificationSystem;
import com.github.ambry.protocol.GetOption;
import com.github.ambry.protocol.GetRequest;
import com.github.ambry.protocol.GetResponse;
import com.github.ambry.protocol.PartitionRequestInfo;
import com.github.ambry.protocol.RequestHandlerPool;
import com.github.ambry.repair.RepairRequestsDb;
import com.github.ambry.repair.RepairRequestsDbFactory;
Expand Down Expand Up @@ -658,7 +666,7 @@ private void testFileChunkAggregationForFileCopy() throws IOException {
}


private void testStateBuildPostFileCopy() throws IOException {
private void testStateBuildPostFileCopy() throws IOException, ConnectionPoolTimeoutException, InterruptedException {
String partitionName = "803";
String logFilePath = "/tmp/803/14_0_log"; // The path to the log file where chunks are written
String outputFilePath = "/tmp/803/15_0_log"; // New file where the log data will be copied
Expand Down Expand Up @@ -703,6 +711,34 @@ private void testStateBuildPostFileCopy() throws IOException {

storageManager.buildStateForFileCopy(storageManager.getReplica(partitionName));
System.out.println("State build successfully for partitionId: " + partitionName);

// Perform getBlob operations on few blobs to verify is state is built correctly.

List<BlobId> blobIdList = new ArrayList<>(2);
blobIdList.add(new BlobId("AAYQAgZEAAgAAQAAAAAAAAMja1b_H6RbSG2fzSHaZem-SA", clusterMap));
blobIdList.add(new BlobId("AAYQAgZEAAgAAQAAAAAAAAMj48QxOzSxRoKbgGiP59OZFw", clusterMap));
ConnectedChannel connectedChannel =
connectionPool.checkOutConnection("localhost", new Port(clusterMap.getDataNodeIds().get(0).getPort(), PortType.PLAINTEXT),
5000);
for (BlobId blobId : blobIdList) {
System.out.println("Trying getBlob operation for blobId: " + blobId.getID());
GetResponse getResponse = getBlob(blobId, connectedChannel);
System.out.println("BlobId: " + blobId.getID() + " found with GetResponse: " + getResponse);
}
}


/**
* Fetches a single blob from ambry server node
* @param blobId the {@link BlobId} that needs to be fetched
* @param connectedChannel the {@link ConnectedChannel} to use to send and receive data
* @throws IOException
*/
GetResponse getBlob(BlobId blobId, ConnectedChannel connectedChannel) throws IOException {
List<PartitionRequestInfo> partitionRequestInfoList = new ArrayList<>();
partitionRequestInfoList.add(new PartitionRequestInfo(blobId.getPartition(), Collections.singletonList(blobId)));
GetRequest getRequest = new GetRequest(1, "client1", MessageFormatFlags.BlobInfo, partitionRequestInfoList, GetOption.None);
return GetResponse.readFrom(connectedChannel.sendAndReceive(getRequest).getInputStream(), clusterMap);
}

/**
Expand Down

0 comments on commit 7e25168

Please sign in to comment.