Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(pubsub): Add Pub/Sub ingestion from Kafka samples #14954

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions google/cloud/pubsub/samples/topic_admin_samples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,118 @@ void CreateTopicWithCloudStorageIngestion(
argv.at(4), argv.at(5), argv.at(6));
}

void CreateTopicWithAwsMskIngestion(
google::cloud::pubsub_admin::TopicAdminClient client,
std::vector<std::string> const& argv) {
// [START pubsub_create_topic_with_aws_msk_ingestion]
namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
std::string topic_id, std::string cluster_arn, std::string msk_topic,
std::string aws_role_arn, std::string gcp_service_account) {
google::pubsub::v1::Topic request;
request.set_name(
pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
auto* aws_msk =
request.mutable_ingestion_data_source_settings()->mutable_aws_msk();
aws_msk->set_cluster_arn(cluster_arn);
aws_msk->set_topic(msk_topic);
aws_msk->set_aws_role_arn(aws_role_arn);
aws_msk->set_gcp_service_account(gcp_service_account);

auto topic = client.CreateTopic(request);
// Note that kAlreadyExists is a possible error when the library retries.
if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
std::cout << "The topic already exists\n";
return;
}
if (!topic) throw std::move(topic).status();

std::cout << "The topic was successfully created: " << topic->DebugString()
<< "\n";
}
// [END pubsub_create_topic_with_aws_msk_ingestion]
(std::move(client), argv.at(0), argv.at(1), argv.at(2), argv.at(3),
argv.at(4), argv.at(5));
}

void CreateTopicWithConfluentCloudIngestion(
google::cloud::pubsub_admin::TopicAdminClient client,
std::vector<std::string> const& argv) {
// [START pubsub_create_topic_with_confluent_cloud_ingestion]
namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
std::string topic_id, std::string bootstrap_server, std::string cluster_id,
std::string confluent_topic, std::string identity_pool_id,
std::string gcp_service_account) {
google::pubsub::v1::Topic request;
request.set_name(
pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
auto* confluent_cloud = request.mutable_ingestion_data_source_settings()
->mutable_confluent_cloud();
confluent_cloud->set_bootstrap_server(bootstrap_server);
confluent_cloud->set_cluster_id(cluster_id);
confluent_cloud->set_topic(confluent_topic);
confluent_cloud->set_identity_pool_id(identity_pool_id);
confluent_cloud->set_gcp_service_account(gcp_service_account);

auto topic = client.CreateTopic(request);
// Note that kAlreadyExists is a possible error when the library retries.
if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
std::cout << "The topic already exists\n";
return;
}
if (!topic) throw std::move(topic).status();

std::cout << "The topic was successfully created: " << topic->DebugString()
<< "\n";
}
// [END pubsub_create_topic_with_confluent_cloud_ingestion]
(std::move(client), argv.at(0), argv.at(1), argv.at(2), argv.at(3),
argv.at(4), argv.at(5), argv.at(6));
}

void CreateTopicWithAzureEventHubsIngestion(
google::cloud::pubsub_admin::TopicAdminClient client,
std::vector<std::string> const& argv) {
// [START pubsub_create_topic_with_azure_event_hubs_ingestion]
namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
std::string topic_id, std::string resource_group,
std::string event_hubs_namespace, std::string event_hub,
std::string client_id, std::string tenant_id, std::string subscription_id,
std::string gcp_service_account) {
google::pubsub::v1::Topic request;
request.set_name(
pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
auto* azure_event_hubs = request.mutable_ingestion_data_source_settings()
->mutable_azure_event_hubs();
azure_event_hubs->set_resource_group(resource_group);
azure_event_hubs->set_namespace_(event_hubs_namespace);
azure_event_hubs->set_event_hub(event_hub);
azure_event_hubs->set_client_id(client_id);
azure_event_hubs->set_tenant_id(tenant_id);
azure_event_hubs->set_subscription_id(subscription_id);
azure_event_hubs->set_gcp_service_account(gcp_service_account);

auto topic = client.CreateTopic(request);
// Note that kAlreadyExists is a possible error when the library retries.
if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
std::cout << "The topic already exists\n";
return;
}
if (!topic) throw std::move(topic).status();

std::cout << "The topic was successfully created: " << topic->DebugString()
<< "\n";
}
// [END pubsub_create_topic_with_azure_event_hubs_ingestion]
(std::move(client), argv.at(0), argv.at(1), argv.at(2), argv.at(3),
argv.at(4), argv.at(5), argv.at(6), argv.at(7), argv.at(8));
}

void GetTopic(google::cloud::pubsub_admin::TopicAdminClient client,
std::vector<std::string> const& argv) {
namespace pubsub = ::google::cloud::pubsub;
Expand Down Expand Up @@ -626,10 +738,42 @@ void AutoRun(std::vector<std::string> const& argv) {
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com";
auto const* const kinesis_updated_gcp_service_account =
"fake-update-service-account@fake-gcp-project.iam.gserviceaccount.com";

auto const cloud_storage_topic_id =
"cloud-storage-" + RandomTopicId(generator) + "_ingestion_topic";
auto const cloud_storage_bucket = project_id + "-pubsub-bucket";

auto const aws_msk_topic_id =
"aws-msk-" + RandomTopicId(generator) + "_ingestion_topic";
auto const* const aws_msk_cluster_arn =
"arn:aws:kafka:us-east-1:1111111111:cluster/fake-cluster-name/11111111";
auto const* const aws_msk_topic = "fake-msk-topic";
auto const* const aws_msk_role_arn =
"arn:aws:iam::111111111111:role/fake-role-name";
auto const* const aws_msk_gcp_service_account =
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com";

auto const confluent_cloud_topic_id =
"confluent-cloud-" + RandomTopicId(generator) + "_ingestion_topic";
auto const* const confluent_cloud_bootstrap_server =
"fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092";
auto const* const confluent_cloud_cluster_id = "fake-cluster-id";
auto const* const confluent_cloud_topic = "fake-topic";
auto const* const confluent_cloud_identity_pool_id = "fake-pool-id";
auto const* const confluent_cloud_gcp_service_account =
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com";

auto const azure_event_hubs_topic_id =
"azure-event-hubs-" + RandomTopicId(generator) + "_ingestion_topic";
auto const* const azure_event_hubs_resource_group = "fake-resource-group";
auto const* const azure_event_hubs_namespace = "fake-namespace";
auto const* const azure_event_hubs_event_hub = "fake-event-hub";
auto const* const azure_event_hubs_client_id = "fake-client-id";
auto const* const azure_event_hubs_tenant_id = "fake-tenant-id";
auto const* const azure_event_hubs_subscription_id = "fake-subscription-id";
auto const* const azure_event_hubs_gcp_service_account =
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com";

using ::google::cloud::StatusCode;
auto ignore_emulator_failures =
[](auto lambda, StatusCode code = StatusCode::kUnimplemented) {
Expand Down Expand Up @@ -696,6 +840,63 @@ void AutoRun(std::vector<std::string> const& argv) {
},
StatusCode::kInvalidArgument);

std::cout << "\nRunning CreateTopicWithAwsMskIngestion() sample" << std::endl;

ignore_emulator_failures(
[&] {
CreateTopicWithAwsMskIngestion(
topic_admin_client,
{project_id, aws_msk_topic_id, aws_msk_cluster_arn, aws_msk_topic,
aws_msk_role_arn, aws_msk_gcp_service_account});
cleanup.Defer(
[topic_admin_client, project_id, aws_msk_topic_id]() mutable {
std::cout << "\nRunning DeleteTopic() sample" << std::endl;
DeleteTopic(topic_admin_client, {project_id, aws_msk_topic_id});
});
},
StatusCode::kInvalidArgument);

std::cout << "\nRunning CreateTopicWithConfluentCloudIngestion() sample"
<< std::endl;

ignore_emulator_failures(
[&] {
CreateTopicWithConfluentCloudIngestion(
topic_admin_client,
{project_id, confluent_cloud_topic_id,
confluent_cloud_bootstrap_server, confluent_cloud_cluster_id,
confluent_cloud_topic, confluent_cloud_identity_pool_id,
confluent_cloud_gcp_service_account});
cleanup.Defer([topic_admin_client, project_id,
confluent_cloud_topic_id]() mutable {
std::cout << "\nRunning DeleteTopic() sample" << std::endl;
DeleteTopic(topic_admin_client,
{project_id, confluent_cloud_topic_id});
});
},
StatusCode::kInvalidArgument);

std::cout << "\nRunning CreateTopicWithAzureEventHubsIngestion() sample"
<< std::endl;

ignore_emulator_failures(
[&] {
CreateTopicWithAzureEventHubsIngestion(
topic_admin_client,
{project_id, azure_event_hubs_topic_id,
azure_event_hubs_resource_group, azure_event_hubs_namespace,
azure_event_hubs_event_hub, azure_event_hubs_client_id,
azure_event_hubs_tenant_id, azure_event_hubs_subscription_id,
azure_event_hubs_gcp_service_account});
cleanup.Defer([topic_admin_client, project_id,
azure_event_hubs_topic_id]() mutable {
std::cout << "\nRunning DeleteTopic() sample" << std::endl;
DeleteTopic(topic_admin_client,
{project_id, azure_event_hubs_topic_id});
});
},
StatusCode::kInvalidArgument);

std::cout << "\nRunning UpdateTopicType() sample" << std::endl;

UpdateTopicType(
Expand Down Expand Up @@ -768,6 +969,21 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape)
{"project-id", "topic-id", "bucket", "input-format", "text-delimiter",
"match-glob", "minimum-object-create-time"},
CreateTopicWithCloudStorageIngestion),
CreateTopicAdminCommand(
"create-topic-with-aws-msk-ingestion",
{"project-id", "topic-id", "cluster-arn", "msk-topic", "aws-role-arn",
"gcp-service-account"},
CreateTopicWithAwsMskIngestion),
CreateTopicAdminCommand(
"create-topic-with-confluent-cloud-ingestion",
{"project-id", "topic-id", "bootstrap-server", "cluster-id",
"confluent-cloud-topic", "identity-pool-id", "gcp-service-account"},
CreateTopicWithConfluentCloudIngestion),
CreateTopicAdminCommand(
"create-topic-with-azure-event-hubs-ingestion",
{"project-id", "topic-id", "resource-group", "namespace", "event-hub",
"client-id", "tenant-id", "subscription-id", "gcp-service-account"},
CreateTopicWithAzureEventHubsIngestion),
CreateTopicAdminCommand(
"create-topic-with-schema",
{"project-id", "topic-id", "schema-id", "encoding"},
Expand Down
Loading