fix both permanent stopping of federation queues and multiple creation of the same federation queues (#4754)

Co-authored-by: Nutomic <me@nutomic.com>
This commit is contained in:
phiresky 2024-05-30 11:08:27 +02:00 committed by GitHub
parent 91e57ff954
commit e8a7bb07a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 52 additions and 22 deletions

View file

@ -41,7 +41,7 @@ pub struct SendManager {
} }
impl SendManager { impl SendManager {
pub fn new(opts: Opts, context: FederationConfig<LemmyContext>) -> Self { fn new(opts: Opts, context: FederationConfig<LemmyContext>) -> Self {
assert!(opts.process_count > 0); assert!(opts.process_count > 0);
assert!(opts.process_index > 0); assert!(opts.process_index > 0);
assert!(opts.process_index <= opts.process_count); assert!(opts.process_index <= opts.process_count);
@ -59,11 +59,27 @@ impl SendManager {
} }
} }
pub fn run(mut self) -> CancellableTask { pub fn run(opts: Opts, context: FederationConfig<LemmyContext>) -> CancellableTask {
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| async move { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| {
self.do_loop(cancel).await?; let opts = opts.clone();
self.cancel().await?; let context = context.clone();
Ok(()) let mut manager = Self::new(opts, context);
async move {
let result = manager.do_loop(cancel).await;
// the loop function will only return if there is (a) an internal error (e.g. db connection
// failure) or (b) it was cancelled from outside.
if let Err(e) = result {
// don't let this error bubble up, just log it, so the below cancel function will run
// regardless
tracing::error!("SendManager failed: {e}");
}
// cancel all the dependent workers as well to ensure they don't get orphaned and keep
// running.
manager.cancel().await?;
LemmyResult::Ok(())
// if the task was not intentionally cancelled, then this whole lambda will be run again by
// CancellableTask after this
}
}) })
} }
@ -102,14 +118,24 @@ impl SendManager {
continue; continue;
} }
// create new worker // create new worker
let instance = instance.clone(); let context = self.context.clone();
let req_data = self.context.to_request_data();
let stats_sender = self.stats_sender.clone(); let stats_sender = self.stats_sender.clone();
self.workers.insert( self.workers.insert(
instance.id, instance.id,
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| async move { CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| {
InstanceWorker::init_and_loop(instance, req_data, stop, stats_sender).await?; // if the instance worker ends unexpectedly due to internal/db errors, this lambda is rerun by cancellabletask.
Ok(()) let instance = instance.clone();
let req_data = context.to_request_data();
let stats_sender = stats_sender.clone();
async move {
InstanceWorker::init_and_loop(
instance,
req_data,
stop,
stats_sender,
)
.await
}
}), }),
); );
} else if !should_federate { } else if !should_federate {

View file

@ -18,7 +18,6 @@ use lemmy_db_schema::{
traits::ApubActor, traits::ApubActor,
utils::{get_conn, DbPool}, utils::{get_conn, DbPool},
}; };
use lemmy_utils::error::LemmyResult;
use moka::future::Cache; use moka::future::Cache;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use reqwest::Url; use reqwest::Url;
@ -26,7 +25,6 @@ use serde_json::Value;
use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration}; use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration};
use tokio::{task::JoinHandle, time::sleep}; use tokio::{task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::error;
/// Decrease the delays of the federation queue. /// Decrease the delays of the federation queue.
/// Should only be used for federation tests since it significantly increases CPU and DB load of the /// Should only be used for federation tests since it significantly increases CPU and DB load of the
@ -62,24 +60,31 @@ impl CancellableTask {
/// spawn a task but with graceful shutdown /// spawn a task but with graceful shutdown
pub fn spawn<F, R>( pub fn spawn<F, R>(
timeout: Duration, timeout: Duration,
task: impl FnOnce(CancellationToken) -> F + Send + 'static, task: impl Fn(CancellationToken) -> F + Send + 'static,
) -> CancellableTask ) -> CancellableTask
where where
F: Future<Output = LemmyResult<R>> + Send + 'static, F: Future<Output = R> + Send + 'static,
R: Send + Debug + 'static, R: Send + Debug + 'static,
{ {
let stop = CancellationToken::new(); let stop = CancellationToken::new();
let stop2 = stop.clone(); let stop2 = stop.clone();
let task: JoinHandle<LemmyResult<R>> = tokio::spawn(task(stop2)); let task: JoinHandle<()> = tokio::spawn(async move {
loop {
let res = task(stop2.clone()).await;
if stop2.is_cancelled() {
return;
} else {
tracing::warn!("task exited, restarting: {res:?}");
}
}
});
let abort = task.abort_handle(); let abort = task.abort_handle();
CancellableTask { CancellableTask {
f: Box::pin(async move { f: Box::pin(async move {
stop.cancel(); stop.cancel();
tokio::select! { tokio::select! {
r = task => { r = task => {
if let Err(ref e) = r? { r.context("CancellableTask failed to cancel cleanly, returned error")?;
error!("CancellableTask threw error: {e}");
}
Ok(()) Ok(())
}, },
_ = sleep(timeout) => { _ = sleep(timeout) => {

View file

@ -210,14 +210,13 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
None None
}; };
let federate = (!args.disable_activity_sending).then(|| { let federate = (!args.disable_activity_sending).then(|| {
let task = SendManager::new( SendManager::run(
Opts { Opts {
process_index: args.federate_process_index, process_index: args.federate_process_index,
process_count: args.federate_process_count, process_count: args.federate_process_count,
}, },
federation_config, federation_config,
); )
task.run()
}); });
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?;