Merge branch 'main' into add_diesel_migration_check

This commit is contained in:
Dessalines 2023-07-20 13:25:49 -04:00 committed by GitHub
commit 957fde7992
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 44 additions and 38 deletions

16
Cargo.lock generated
View file

@ -10,9 +10,9 @@ checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
[[package]]
name = "activitypub_federation"
version = "0.4.5"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ab3ac148d9c0b4163a6d41040c17de7558a42224b9ecbd4e8f033aef6c254d9"
checksum = "4e6e7fefba6602240fcf612931b70640ad1e249dff833551ebc218f1c96a4193"
dependencies = [
"activitystreams-kinds",
"actix-web",
@ -22,7 +22,6 @@ dependencies = [
"bytes",
"chrono",
"derive_builder",
"displaydoc",
"dyn-clone",
"enum_delegate",
"futures-core",
@ -1548,17 +1547,6 @@ dependencies = [
"chrono",
]
[[package]]
name = "displaydoc"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.25",
]
[[package]]
name = "dlv-list"
version = "0.3.0"

View file

@ -67,7 +67,7 @@ lemmy_routes = { version = "=0.18.1", path = "./crates/routes" }
lemmy_db_views = { version = "=0.18.1", path = "./crates/db_views" }
lemmy_db_views_actor = { version = "=0.18.1", path = "./crates/db_views_actor" }
lemmy_db_views_moderator = { version = "=0.18.1", path = "./crates/db_views_moderator" }
activitypub_federation = { version = "0.4.5", default-features = false, features = [
activitypub_federation = { version = "0.4.6", default-features = false, features = [
"actix-web",
] }
diesel = "2.1.0"

View file

@ -4,10 +4,13 @@ use futures::future::BoxFuture;
use lemmy_db_schema::source::post::Post;
use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
use once_cell::sync::{Lazy, OnceCell};
use tokio::sync::{
mpsc,
mpsc::{UnboundedReceiver, UnboundedSender},
Mutex,
use tokio::{
sync::{
mpsc,
mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
Mutex,
},
task::JoinHandle,
};
type MatchOutgoingActivitiesBoxed =
@ -21,17 +24,22 @@ pub enum SendActivityData {
CreatePost(Post),
}
// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
// ctrl+c still works.
static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
let (sender, receiver) = mpsc::unbounded_channel();
let weak_sender = sender.downgrade();
ActivityChannel {
sender,
weak_sender,
receiver: Mutex::new(receiver),
keepalive_sender: Mutex::new(Some(sender)),
}
});
pub struct ActivityChannel {
sender: UnboundedSender<SendActivityData>,
weak_sender: WeakUnboundedSender<SendActivityData>,
receiver: Mutex<UnboundedReceiver<SendActivityData>>,
keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
}
impl ActivityChannel {
@ -49,10 +57,18 @@ impl ActivityChannel {
.get()
.expect("retrieve function pointer")(data, context)
.await?;
} else {
let lock = &ACTIVITY_CHANNEL.sender;
lock.send(data)?;
}
// could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
// not sure which way is more efficient
else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
sender.send(data)?;
}
Ok(())
}
pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
outgoing_activities_task.await??;
Ok(())
}
}

View file

@ -78,18 +78,20 @@ impl Collection for ApubCommunityModerators {
// Add new mods to database which have been added to moderators collection
for mod_id in apub.ordered_items {
let mod_user: ApubPerson = mod_id.dereference(data).await?;
if !current_moderators
.iter()
.map(|c| c.moderator.actor_id.clone())
.any(|x| x == mod_user.actor_id)
{
let community_moderator_form = CommunityModeratorForm {
community_id: owner.id,
person_id: mod_user.id,
};
CommunityModerator::join(&mut data.pool(), &community_moderator_form).await?;
// Ignore errors as mod accounts might be deleted or instances unavailable.
let mod_user: Option<ApubPerson> = mod_id.dereference(data).await.ok();
if let Some(mod_user) = mod_user {
if !current_moderators
.iter()
.map(|c| c.moderator.actor_id.clone())
.any(|x| x == mod_user.actor_id)
{
let community_moderator_form = CommunityModeratorForm {
community_id: owner.id,
person_id: mod_user.id,
};
CommunityModerator::join(&mut data.pool(), &community_moderator_form).await?;
}
}
}

View file

@ -21,7 +21,7 @@ use lemmy_api_common::{
context::LemmyContext,
lemmy_db_views::structs::SiteView,
request::build_user_agent,
send_activity::MATCH_OUTGOING_ACTIVITIES,
send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES},
utils::{
check_private_instance_and_federation_enabled,
local_site_rate_limit_to_rate_limit_config,
@ -227,7 +227,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
.await?;
// Wait for outgoing apub sends to complete
outgoing_activities_task.await??;
ActivityChannel::close(outgoing_activities_task).await?;
Ok(())
}