diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java index 35924d7b84..df6a84fdf0 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java @@ -34,6 +34,7 @@ import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.clustermap.StaticClusterManager; import com.github.ambry.clustermap.VcrClusterAgentsFactory; +import com.github.ambry.commons.BlobId; import com.github.ambry.commons.Callback; import com.github.ambry.commons.LoggingNotificationSystem; import com.github.ambry.commons.NettyInternalMetrics; @@ -56,8 +57,11 @@ import com.github.ambry.config.VerifiableProperties; import com.github.ambry.messageformat.BlobStoreHardDelete; import com.github.ambry.messageformat.BlobStoreRecovery; +import com.github.ambry.messageformat.MessageFormatFlags; import com.github.ambry.network.BlockingChannelConnectionPool; +import com.github.ambry.network.ConnectedChannel; import com.github.ambry.network.ConnectionPool; +import com.github.ambry.network.ConnectionPoolTimeoutException; import com.github.ambry.network.LocalNetworkClientFactory; import com.github.ambry.network.LocalRequestResponseChannel; import com.github.ambry.network.NettyServerRequestResponseChannel; @@ -74,6 +78,10 @@ import com.github.ambry.network.http2.Http2NetworkClientFactory; import com.github.ambry.network.http2.Http2ServerMetrics; import com.github.ambry.notification.NotificationSystem; +import com.github.ambry.protocol.GetOption; +import com.github.ambry.protocol.GetRequest; +import com.github.ambry.protocol.GetResponse; +import com.github.ambry.protocol.PartitionRequestInfo; import com.github.ambry.protocol.RequestHandlerPool; import com.github.ambry.repair.RepairRequestsDb; import com.github.ambry.repair.RepairRequestsDbFactory; @@ -658,7 +666,7 @@ private void testFileChunkAggregationForFileCopy() throws IOException { } - private void testStateBuildPostFileCopy() throws IOException { + private void testStateBuildPostFileCopy() throws IOException, ConnectionPoolTimeoutException, InterruptedException { String partitionName = "803"; String logFilePath = "/tmp/803/14_0_log"; // The path to the log file where chunks are written String outputFilePath = "/tmp/803/15_0_log"; // New file where the log data will be copied @@ -703,6 +711,34 @@ private void testStateBuildPostFileCopy() throws IOException { storageManager.buildStateForFileCopy(storageManager.getReplica(partitionName)); System.out.println("State build successfully for partitionId: " + partitionName); + + // Perform getBlob operations on few blobs to verify is state is built correctly. + + List blobIdList = new ArrayList<>(2); + blobIdList.add(new BlobId("AAYQAgZEAAgAAQAAAAAAAAMja1b_H6RbSG2fzSHaZem-SA", clusterMap)); + blobIdList.add(new BlobId("AAYQAgZEAAgAAQAAAAAAAAMj48QxOzSxRoKbgGiP59OZFw", clusterMap)); + ConnectedChannel connectedChannel = + connectionPool.checkOutConnection("localhost", new Port(clusterMap.getDataNodeIds().get(0).getPort(), PortType.PLAINTEXT), + 5000); + for (BlobId blobId : blobIdList) { + System.out.println("Trying getBlob operation for blobId: " + blobId.getID()); + GetResponse getResponse = getBlob(blobId, connectedChannel); + System.out.println("BlobId: " + blobId.getID() + " found with GetResponse: " + getResponse); + } + } + + + /** + * Fetches a single blob from ambry server node + * @param blobId the {@link BlobId} that needs to be fetched + * @param connectedChannel the {@link ConnectedChannel} to use to send and receive data + * @throws IOException + */ + GetResponse getBlob(BlobId blobId, ConnectedChannel connectedChannel) throws IOException { + List partitionRequestInfoList = new ArrayList<>(); + partitionRequestInfoList.add(new PartitionRequestInfo(blobId.getPartition(), Collections.singletonList(blobId))); + GetRequest getRequest = new GetRequest(1, "client1", MessageFormatFlags.BlobInfo, partitionRequestInfoList, GetOption.None); + return GetResponse.readFrom(connectedChannel.sendAndReceive(getRequest).getInputStream(), clusterMap); } /**