Relax timeout for sending activities (#4864)

* Relax timeout for sending activities

Lemmy considers timeouts during activity sending as retryable errors.
While it is frequently enough to retry sending the same activity again after
the original submission attempt resulted in a timeout, allowing the receiving
side to use more time for synchronous processing should reduce the number of
retries needed overall and improve overall compatibility.

Some ActivityPub software, such as Mastodon, implements a queue for processing
received activities asynchronously, which allows immediately returning a
response for activity submissions. Other software, such as Lemmy or Hubzilla
implement synchronous processing of activities before returning a response.

ActivityPub does not specify specific timeouts to be used:
https://github.com/w3c/activitypub/issues/365

* Simplify usage of federation_sender_config Option
This commit is contained in:
Richard Schwab 2024-07-02 15:30:13 +02:00 committed by GitHub
parent 9120207314
commit 2c57f42022
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -55,7 +55,7 @@ use prometheus_metrics::serve_prometheus;
use reqwest_middleware::ClientBuilder; use reqwest_middleware::ClientBuilder;
use reqwest_tracing::TracingMiddleware; use reqwest_tracing::TracingMiddleware;
use serde_json::json; use serde_json::json;
use std::{env, ops::Deref}; use std::{env, ops::Deref, time::Duration};
use tokio::signal::unix::SignalKind; use tokio::signal::unix::SignalKind;
use tracing::subscriber::set_global_default; use tracing::subscriber::set_global_default;
use tracing_actix_web::TracingLogger; use tracing_actix_web::TracingLogger;
@ -64,6 +64,13 @@ use tracing_log::LogTracer;
use tracing_subscriber::{filter::Targets, layer::SubscriberExt, Layer, Registry}; use tracing_subscriber::{filter::Targets, layer::SubscriberExt, Layer, Registry};
use url::Url; use url::Url;
/// Timeout for HTTP requests while sending activities. A longer timeout provides better
/// compatibility with other ActivityPub software that might allocate more time for synchronous
/// processing of incoming activities. This timeout should be slightly longer than the time we
/// expect a remote server to wait before aborting processing on its own to account for delays from
/// establishing the HTTP connection and sending the request itself.
const ACTIVITY_SENDING_TIMEOUT: Duration = Duration::from_secs(125);
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command( #[command(
version, version,
@ -173,8 +180,8 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
serve_prometheus(prometheus, context.clone())?; serve_prometheus(prometheus, context.clone())?;
} }
let mut federation_config = FederationConfig::builder(); let mut federation_config_builder = FederationConfig::builder();
federation_config federation_config_builder
.domain(SETTINGS.hostname.clone()) .domain(SETTINGS.hostname.clone())
.app_data(context.clone()) .app_data(context.clone())
.client(client.clone()) .client(client.clone())
@ -184,9 +191,9 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
.url_verifier(Box::new(VerifyUrlData(context.inner_pool().clone()))); .url_verifier(Box::new(VerifyUrlData(context.inner_pool().clone())));
if local_site.federation_signed_fetch { if local_site.federation_signed_fetch {
let site: ApubSite = site_view.site.into(); let site: ApubSite = site_view.site.into();
federation_config.signed_fetch_actor(&site); federation_config_builder.signed_fetch_actor(&site);
} }
let federation_config = federation_config.build().await?; let federation_config = federation_config_builder.build().await?;
MATCH_OUTGOING_ACTIVITIES MATCH_OUTGOING_ACTIVITIES
.set(Box::new(move |d, c| { .set(Box::new(move |d, c| {
@ -209,13 +216,23 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
} else { } else {
None None
}; };
let federate = (!args.disable_activity_sending).then(|| {
// This FederationConfig instance is exclusively used to send activities, so we can safely
// increase the timeout without affecting timeouts for resolving objects anywhere.
let federation_sender_config = if !args.disable_activity_sending {
let mut federation_sender_config = federation_config_builder.clone();
federation_sender_config.request_timeout(ACTIVITY_SENDING_TIMEOUT);
Some(federation_sender_config.build().await?)
} else {
None
};
let federate = federation_sender_config.map(|cfg| {
SendManager::run( SendManager::run(
Opts { Opts {
process_index: args.federate_process_index, process_index: args.federate_process_index,
process_count: args.federate_process_count, process_count: args.federate_process_count,
}, },
federation_config, cfg,
) )
}); });
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;