Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storcon: signal LSN wait to pageserver during live migration #10452

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,13 @@ pub struct TenantConfigPatchRequest {
pub config: TenantConfigPatch, // as we have a flattened field, we should reject all unknown fields in it
}

#[derive(Serialize, Deserialize, Debug)]
pub struct TenantWaitLsnRequest {
#[serde(flatten)]
pub timelines: HashMap<TimelineId, Lsn>,
pub timeout: Duration,
}

/// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "slug", content = "data", rename_all = "snake_case")]
Expand Down
15 changes: 15 additions & 0 deletions pageserver/client/src/mgmt_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,4 +763,19 @@ impl Client {
.await
.map_err(Error::ReceiveBody)
}

pub async fn wait_lsn(
&self,
tenant_shard_id: TenantShardId,
request: TenantWaitLsnRequest,
) -> Result<StatusCode> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/wait_lsn",
self.mgmt_api_endpoint,
);

self.request_noerror(Method::POST, uri, request)
.await
.map(|resp| resp.status())
}
}
64 changes: 64 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::time::Duration;

use anyhow::{anyhow, Context, Result};
use enumset::EnumSet;
use futures::future::join_all;
use futures::StreamExt;
use futures::TryFutureExt;
use humantime::format_rfc3339;
Expand Down Expand Up @@ -40,6 +41,7 @@ use pageserver_api::models::TenantShardSplitRequest;
use pageserver_api::models::TenantShardSplitResponse;
use pageserver_api::models::TenantSorting;
use pageserver_api::models::TenantState;
use pageserver_api::models::TenantWaitLsnRequest;
use pageserver_api::models::TimelineArchivalConfigRequest;
use pageserver_api::models::TimelineCreateRequestMode;
use pageserver_api::models::TimelineCreateRequestModeImportPgdata;
Expand Down Expand Up @@ -94,6 +96,8 @@ use crate::tenant::timeline::CompactOptions;
use crate::tenant::timeline::CompactRequest;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::timeline::WaitLsnTimeout;
use crate::tenant::timeline::WaitLsnWaiter;
use crate::tenant::GetTimelineError;
use crate::tenant::OffloadedTimeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
Expand Down Expand Up @@ -2789,6 +2793,63 @@ async fn secondary_download_handler(
json_response(status, progress)
}

async fn wait_lsn_handler(
mut request: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let wait_lsn_request: TenantWaitLsnRequest = json_request(&mut request).await?;

let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;

let mut wait_futures = Vec::default();
for timeline in tenant.list_timelines() {
let Some(lsn) = wait_lsn_request.timelines.get(&timeline.timeline_id) else {
continue;
};

let fut = {
let timeline = timeline.clone();
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
async move {
timeline
.wait_lsn(
*lsn,
WaitLsnWaiter::HttpEndpoint,
WaitLsnTimeout::Custom(wait_lsn_request.timeout),
&ctx,
)
.await
}
};
wait_futures.push(fut);
}

if wait_futures.is_empty() {
return json_response(StatusCode::NOT_FOUND, ());
}

let all_done = tokio::select! {
results = join_all(wait_futures) => {
results.iter().all(|res| res.is_ok())
},
_ = cancel.cancelled() => {
return Err(ApiError::Cancelled);
}
};

let status = if all_done {
StatusCode::OK
} else {
StatusCode::ACCEPTED
};

json_response(status, ())
}

async fn secondary_status_handler(
request: Request<Body>,
_cancel: CancellationToken,
Expand Down Expand Up @@ -3569,6 +3630,9 @@ pub fn make_router(
.post("/v1/tenant/:tenant_shard_id/secondary/download", |r| {
api_handler(r, secondary_download_handler)
})
.post("/v1/tenant/:tenant_shard_id/wait_lsn", |r| {
api_handler(r, wait_lsn_handler)
})
.put("/v1/tenant/:tenant_shard_id/break", |r| {
testing_api_handler("set tenant state to broken", r, handle_tenant_break)
})
Expand Down
2 changes: 2 additions & 0 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1651,6 +1651,7 @@ impl PageServerHandler {
.wait_lsn(
not_modified_since,
crate::tenant::timeline::WaitLsnWaiter::PageService,
timeline::WaitLsnTimeout::Default,
ctx,
)
.await?;
Expand Down Expand Up @@ -1985,6 +1986,7 @@ impl PageServerHandler {
.wait_lsn(
lsn,
crate::tenant::timeline::WaitLsnWaiter::PageService,
crate::tenant::timeline::WaitLsnTimeout::Default,
ctx,
)
.await?;
Expand Down
7 changes: 6 additions & 1 deletion pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2558,7 +2558,12 @@ impl Tenant {
// sizes etc. and that would get confused if the previous page versions
// are not in the repository yet.
ancestor_timeline
.wait_lsn(*lsn, timeline::WaitLsnWaiter::Tenant, ctx)
.wait_lsn(
*lsn,
timeline::WaitLsnWaiter::Tenant,
timeline::WaitLsnTimeout::Default,
ctx,
)
.await
.map_err(|e| match e {
e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState { .. }) => {
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1643,6 +1643,7 @@ impl TenantManager {
.wait_lsn(
*target_lsn,
crate::tenant::timeline::WaitLsnWaiter::Tenant,
crate::tenant::timeline::WaitLsnTimeout::Default,
ctx,
)
.await
Expand Down
28 changes: 21 additions & 7 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,10 +903,17 @@ impl From<GetReadyAncestorError> for PageReconstructError {
}
}

pub(crate) enum WaitLsnTimeout {
Custom(Duration),
// Use the [`PageServerConf::wait_lsn_timeout`] default
Default,
}

pub(crate) enum WaitLsnWaiter<'a> {
Timeline(&'a Timeline),
Tenant,
PageService,
HttpEndpoint,
}

/// Argument to [`Timeline::shutdown`].
Expand Down Expand Up @@ -1286,6 +1293,7 @@ impl Timeline {
&self,
lsn: Lsn,
who_is_waiting: WaitLsnWaiter<'_>,
timeout: WaitLsnTimeout,
ctx: &RequestContext, /* Prepare for use by cancellation */
) -> Result<(), WaitLsnError> {
let state = self.current_state();
Expand All @@ -1302,7 +1310,7 @@ impl Timeline {
| TaskKind::WalReceiverConnectionPoller => {
let is_myself = match who_is_waiting {
WaitLsnWaiter::Timeline(waiter) => Weak::ptr_eq(&waiter.myself, &self.myself),
WaitLsnWaiter::Tenant | WaitLsnWaiter::PageService => unreachable!("tenant or page_service context are not expected to have task kind {:?}", ctx.task_kind()),
WaitLsnWaiter::Tenant | WaitLsnWaiter::PageService | WaitLsnWaiter::HttpEndpoint => unreachable!("tenant or page_service context are not expected to have task kind {:?}", ctx.task_kind()),
};
if is_myself {
if let Err(current) = self.last_record_lsn.would_wait_for(lsn) {
Expand All @@ -1318,13 +1326,14 @@ impl Timeline {
}
}

let timeout = match timeout {
WaitLsnTimeout::Custom(t) => t,
WaitLsnTimeout::Default => self.conf.wait_lsn_timeout,
};

let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();

match self
.last_record_lsn
.wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
.await
{
match self.last_record_lsn.wait_for_timeout(lsn, timeout).await {
Ok(()) => Ok(()),
Err(e) => {
use utils::seqwait::SeqWaitError::*;
Expand Down Expand Up @@ -3473,7 +3482,12 @@ impl Timeline {
}
}
ancestor
.wait_lsn(self.ancestor_lsn, WaitLsnWaiter::Timeline(self), ctx)
.wait_lsn(
self.ancestor_lsn,
WaitLsnWaiter::Timeline(self),
WaitLsnTimeout::Default,
ctx,
)
.await
.map_err(|e| match e {
e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ pub(super) async fn connection_manager_loop_step(
};

last_discovery_ts = Some(std::time::Instant::now());
debug!("No active connection and no candidates, sending discovery request to the broker");
info!("No active connection and no candidates, sending discovery request to the broker");

// Cancellation safety: we want to send a message to the broker, but publish_one()
// function can get cancelled by the other select! arm. This is absolutely fine, because
Expand Down
18 changes: 16 additions & 2 deletions storage_controller/src/pageserver_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use pageserver_api::{
models::{
detach_ancestor::AncestorDetached, LocationConfig, LocationConfigListResponse,
PageserverUtilization, SecondaryProgress, TenantScanRemoteStorageResponse,
TenantShardSplitRequest, TenantShardSplitResponse, TimelineArchivalConfigRequest,
TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest, TopTenantShardsResponse,
TenantShardSplitRequest, TenantShardSplitResponse, TenantWaitLsnRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest,
TopTenantShardsResponse,
},
shard::TenantShardId,
};
Expand Down Expand Up @@ -299,4 +300,17 @@ impl PageserverClient {
self.inner.top_tenant_shards(request).await
)
}

pub(crate) async fn wait_lsn(
&self,
tenant_shard_id: TenantShardId,
request: TenantWaitLsnRequest,
) -> Result<StatusCode> {
measured_request!(
"wait_lsn",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.wait_lsn(tenant_shard_id, request).await
)
}
}
61 changes: 60 additions & 1 deletion storage_controller/src/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::persistence::Persistence;
use crate::{compute_hook, service};
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
};
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_client::mgmt_api;
Expand Down Expand Up @@ -348,6 +348,32 @@ impl Reconciler {
Ok(())
}

async fn wait_lsn(
&self,
node: &Node,
tenant_shard_id: TenantShardId,
timelines: HashMap<TimelineId, Lsn>,
) -> Result<StatusCode, ReconcileError> {
const TIMEOUT: Duration = Duration::from_secs(10);

let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.service_config.jwt_token.as_deref(),
);

client
.wait_lsn(
tenant_shard_id,
TenantWaitLsnRequest {
timelines,
timeout: TIMEOUT,
},
)
.await
.map_err(|e| e.into())
}

async fn get_lsns(
&self,
tenant_shard_id: TenantShardId,
Expand Down Expand Up @@ -461,6 +487,39 @@ impl Reconciler {
node: &Node,
baseline: HashMap<TimelineId, Lsn>,
) -> anyhow::Result<()> {
// Signal to the pageserver that it should ingest up to the baseline LSNs.
loop {
match self.wait_lsn(node, tenant_shard_id, baseline.clone()).await {
Ok(StatusCode::OK) => {
// Everything is caught up
return Ok(());
}
Ok(StatusCode::ACCEPTED) => {
// Some timelines are not caught up yet.
// They'll be polled below.
break;
}
Ok(StatusCode::NOT_FOUND) => {
// None of the timelines are present on the pageserver.
// This is correct if they've all been deleted, but
// let let the polling loop below cross check.
break;
}
Ok(status_code) => {
tracing::warn!(
"Unexpected status code ({status_code}) returned by wait_lsn endpoint"
);
break;
}
Err(e) => {
tracing::info!("🕑 Can't trigger LSN wait on {node} yet, waiting ({e})",);
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
}
}

// Poll the LSNs until they catch up
loop {
let latest = match self.get_lsns(tenant_shard_id, node).await {
Ok(l) => l,
Expand Down
Loading