This commit is contained in:
dull b 2023-12-16 20:52:19 +00:00
parent fdebc85753
commit 9b9314a1d6
7 changed files with 174 additions and 66 deletions

View file

@ -172,6 +172,7 @@ impl Object for ApubComment {
deleted: Some(false), deleted: Some(false),
ap_id: Some(note.id.into()), ap_id: Some(note.id.into()),
distinguished: note.distinguished, distinguished: note.distinguished,
path: None,
local: Some(false), local: Some(false),
language_id, language_id,
}; };

View file

@ -166,16 +166,18 @@ impl SiteLanguage {
.execute(conn) .execute(conn)
.await?; .await?;
for l in lang_ids { let forms = lang_ids
let form = SiteLanguageForm { .into_iter()
.map(|l| SiteLanguageForm {
site_id: for_site_id, site_id: for_site_id,
language_id: l, language_id: l,
}; })
insert_into(site_language) .collect::<Vec<_>>();
.values(form)
.get_result::<Self>(conn) insert_into(site_language)
.await?; .values(forms)
} .get_result::<Self>(conn)
.await?;
CommunityLanguage::limit_languages(conn, instance_id).await?; CommunityLanguage::limit_languages(conn, instance_id).await?;

View file

@ -1,6 +1,16 @@
use crate::{ use crate::{
newtypes::{CommentId, DbUrl, PersonId}, newtypes::{CommentId, DbUrl, PersonId},
schema::comment::dsl::{ap_id, comment, content, creator_id, deleted, path, removed, updated}, schema::comment::dsl::{
ap_id,
comment,
content,
creator_id,
deleted,
id,
path,
removed,
updated,
},
source::comment::{ source::comment::{
Comment, Comment,
CommentInsertForm, CommentInsertForm,
@ -11,16 +21,24 @@ use crate::{
CommentUpdateForm, CommentUpdateForm,
}, },
traits::{Crud, Likeable, Saveable}, traits::{Crud, Likeable, Saveable},
utils::{get_conn, naive_now, DbPool, DELETED_REPLACEMENT_TEXT}, utils::{functions::AsText, get_conn, naive_now, DbPool, DELETED_REPLACEMENT_TEXT},
}; };
use diesel::{ use diesel::{
dsl::{insert_into, sql_query}, dsl::{insert_into, sql_query},
result::Error, result::Error,
ExpressionMethods, ExpressionMethods,
QueryDsl, QueryDsl,
TextExpressionMethods,
}; };
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
use diesel_ltree::Ltree; use diesel_ltree::{
functions::{ltree2text, text2ltree},
Ltree,
};
use futures_util::{
future::TryFutureExt,
stream::{self, StreamExt, TryStreamExt},
};
use url::Url; use url::Url;
impl Comment { impl Comment {
@ -57,12 +75,16 @@ impl Comment {
comment_form: &CommentInsertForm, comment_form: &CommentInsertForm,
parent_path: Option<&Ltree>, parent_path: Option<&Ltree>,
) -> Result<Comment, Error> { ) -> Result<Comment, Error> {
Comment::create_batch(pool, &[(comment_form, parent_path)]).await?.into_iter().next().ok_or(Error::NotFound) Comment::create_batch(pool, &[(comment_form.clone(), parent_path.cloned())])
.await?
.into_iter()
.next()
.ok_or(Error::NotFound)
} }
pub async fn create_batch( pub async fn create_batch(
pool: &mut DbPool<'_>, pool: &mut DbPool<'_>,
items: &[(&CommentInsertForm, Option<&Ltree>)], items: &[(CommentInsertForm, Option<Ltree>)],
) -> Result<Vec<Comment>, Error> { ) -> Result<Vec<Comment>, Error> {
let conn = &mut get_conn(pool).await?; let conn = &mut get_conn(pool).await?;
@ -72,43 +94,44 @@ impl Comment {
Box::pin(async move { Box::pin(async move {
let forms = items let forms = items
.iter() .iter()
.map(|&(form, _)| form) .map(|(comment_form, parent_path)| CommentInsertForm {
.collect::<Vec<_>>(); path: Some(parent_path.clone().unwrap_or(Ltree("0".to_owned()))),
..comment_form.clone()
});
// Insert, to get the ids // Insert, to get the ids
let inserted_comments = insert_into(comment) let inserted_comments = insert_into(comment)
.values(&forms) .values(forms.clone().collect::<Vec<_>>())
/*.on_conflict(ap_id) .load::<Self>(conn)
.do_update() .or_else(|_| {
.set()*/ // `ap_id` unique constraint violation is handled individually for each row
.get_result::<Self>(conn) // because batched upsert requires having the same `set` argument for all rows
stream::iter(forms)
.then(|form| {
insert_into(comment)
.values(form.clone())
.on_conflict(ap_id)
.do_update()
.set(form)
.get_result::<Self>(conn)
})
.try_collect::<Vec<_>>()
})
.await?; .await?;
// `ap_id` unique constraint violation is handled individually for each row // For each comment, append its id to its path
// because batch upsert requires having the same `set` argument for all rows let updated_comments = diesel::update(comment)
.filter(id.eq_any(inserted_comments.into_iter().map(|c| c.id)))
.set(path.eq(text2ltree(
let comment_id = inserted_comment.id; ltree2text(path).concat(".").concat(AsText::new(id)),
)))
// You need to update the ltree column .load::<Self>(conn)
let ltree = Ltree(if let Some(parent_path) = parent_path { .await?;
// The previous parent will already have 0 in it
// Append this comment id
format!("{}.{}", parent_path.0, comment_id)
} else {
// '0' is always the first path, append to that
format!("{}.{}", 0, comment_id)
});
let updated_comment = diesel::update(comment.find(comment_id))
.set(path.eq(ltree))
.get_result::<Self>(conn)
.await;
// Update the child count for the parent comment_aggregates // Update the child count for the parent comment_aggregates
// You could do this with a trigger, but since you have to do this manually anyway, // You could do this with a trigger, but since you have to do this manually anyway,
// you can just have it here // you can just have it here
if let Some(parent_path) = parent_path { for parent_path in items.iter().filter_map(|(_, p)| p.as_ref()) {
// You have to update counts for all parents, not just the immediate one // You have to update counts for all parents, not just the immediate one
// TODO if the performance of this is terrible, it might be better to do this as part of a // TODO if the performance of this is terrible, it might be better to do this as part of a
// scheduled query... although the counts would often be wrong. // scheduled query... although the counts would often be wrong.
@ -137,7 +160,7 @@ where ca.comment_id = c.id"
sql_query(update_child_count_stmt).execute(conn).await?; sql_query(update_child_count_stmt).execute(conn).await?;
} }
} }
updated_comment Ok(updated_comments)
}) as _ }) as _
}) })
.await .await

View file

@ -67,6 +67,10 @@ pub struct CommentInsertForm {
pub deleted: Option<bool>, pub deleted: Option<bool>,
pub ap_id: Option<DbUrl>, pub ap_id: Option<DbUrl>,
pub local: Option<bool>, pub local: Option<bool>,
#[cfg(feature = "full")]
pub path: Option<Ltree>,
#[cfg(not(feature = "full"))]
pub path: Option<String>,
pub distinguished: Option<bool>, pub distinguished: Option<bool>,
pub language_id: Option<LanguageId>, pub language_id: Option<LanguageId>,
} }

View file

@ -13,13 +13,15 @@ use diesel::{
deserialize::FromSql, deserialize::FromSql,
helper_types::AsExprOf, helper_types::AsExprOf,
pg::Pg, pg::Pg,
query_builder::{Query, QueryFragment},
query_dsl::methods::LimitDsl,
result::{ConnectionError, ConnectionResult, Error as DieselError, Error::QueryBuilderError}, result::{ConnectionError, ConnectionResult, Error as DieselError, Error::QueryBuilderError},
serialize::{Output, ToSql}, serialize::{Output, ToSql},
sql_query, sql_query,
sql_types::{Text, Timestamptz}, sql_types::{Text, Timestamptz},
IntoSql, IntoSql,
PgConnection, PgConnection,
RunQueryDsl, Insertable, Table, Column, AsChangeset, Expression, SelectableExpression, expression::NonAggregate, query_builder::QueryFragment, RunQueryDsl,
}; };
use diesel_async::{ use diesel_async::{
pg::AsyncPgConnection, pg::AsyncPgConnection,
@ -153,17 +155,65 @@ macro_rules! try_join_with_pool {
}}; }};
} }
pub async fn batch_upsert<T, U, Target, R>(conn: &mut AsyncPgConnection, target: T, records: U, conflict_target: Target) -> Result<Vec<R>, DieselError> /// Includes an SQL comment before `T`, which can be used to label auto_explain output
where #[derive(QueryId)]
T: Table, pub struct Commented<T> {
T::AllColumns: Expression + SelectableExpression<T> + NonAggregate + QueryFragment<Pg>, comment: String,
U: IntoIterator + Clone, inner: T,
Vec<U::Item>: Insertable<T>, }
U::Item: Insertable<T> + AsChangeset<Target = T>,
Target: Column<Table = T>, impl<T> Commented<T> {
{ pub fn new(inner: T) -> Self {
let result = diesel::insert_into(target).values(records.clone().into_iter().collect::<Vec<_>>()).load::<R>(conn).await; Commented {
comment: String::new(),
inner,
}
}
/// Adds `text` to the comment if `condition` is true
pub fn text_if(mut self, text: &str, condition: bool) -> Self {
if condition {
if !self.comment.is_empty() {
self.comment.push_str(", ");
}
self.comment.push_str(text);
}
self
}
/// Adds `text` to the comment
pub fn text(self, text: &str) -> Self {
self.text_if(text, true)
}
}
impl<T: Query> Query for Commented<T> {
type SqlType = T::SqlType;
}
impl<T: QueryFragment<Pg>> QueryFragment<Pg> for Commented<T> {
fn walk_ast<'b>(
&'b self,
mut out: diesel::query_builder::AstPass<'_, 'b, Pg>,
) -> Result<(), DieselError> {
for line in self.comment.lines() {
out.push_sql("\n-- ");
out.push_sql(line);
}
out.push_sql("\n");
self.inner.walk_ast(out.reborrow())
}
}
impl<T: LimitDsl> LimitDsl for Commented<T> {
type Output = Commented<T::Output>;
fn limit(self, limit: i64) -> Self::Output {
Commented {
comment: self.comment,
inner: self.inner.limit(limit),
}
}
} }
pub fn fuzzy_search(q: &str) -> String { pub fn fuzzy_search(q: &str) -> String {
@ -368,7 +418,10 @@ static EMAIL_REGEX: Lazy<Regex> = Lazy::new(|| {
}); });
pub mod functions { pub mod functions {
use diesel::sql_types::{BigInt, Text, Timestamptz}; use diesel::{
pg::Pg,
sql_types::{BigInt, Text, Timestamptz},
};
sql_function! { sql_function! {
fn hot_rank(score: BigInt, time: Timestamptz) -> Double; fn hot_rank(score: BigInt, time: Timestamptz) -> Double;
@ -386,6 +439,9 @@ pub mod functions {
// really this function is variadic, this just adds the two-argument version // really this function is variadic, this just adds the two-argument version
sql_function!(fn coalesce<T: diesel::sql_types::SqlType + diesel::sql_types::SingleValue>(x: diesel::sql_types::Nullable<T>, y: T) -> T); sql_function!(fn coalesce<T: diesel::sql_types::SqlType + diesel::sql_types::SingleValue>(x: diesel::sql_types::Nullable<T>, y: T) -> T);
// Use `AsText::new`
postfix_operator!(AsText, "::text", Text, backend: Pg);
} }
pub const DELETED_REPLACEMENT_TEXT: &str = "*Permanently Deleted*"; pub const DELETED_REPLACEMENT_TEXT: &str = "*Permanently Deleted*";

View file

@ -43,6 +43,7 @@ use lemmy_db_schema::{
get_conn, get_conn,
limit_and_offset, limit_and_offset,
now, now,
Commented,
DbConn, DbConn,
DbPool, DbPool,
ListFn, ListFn,
@ -282,7 +283,10 @@ fn queries<'a>() -> Queries<
); );
} }
query.first::<PostView>(&mut conn).await Commented::new(query)
.text("PostView::read")
.first::<PostView>(&mut conn)
.await
}; };
let list = move |mut conn: DbConn<'a>, options: PostQuery<'a>| async move { let list = move |mut conn: DbConn<'a>, options: PostQuery<'a>| async move {
@ -557,7 +561,14 @@ fn queries<'a>() -> Queries<
debug!("Post View Query: {:?}", debug_query::<Pg, _>(&query)); debug!("Post View Query: {:?}", debug_query::<Pg, _>(&query));
query.load::<PostView>(&mut conn).await Commented::new(query)
.text("PostQuery::list")
.text_if(
"getting upper bound for next query",
options.community_id_just_for_prefetch,
)
.load::<PostView>(&mut conn)
.await
}; };
Queries::new(read, list) Queries::new(read, list)
@ -1457,12 +1468,13 @@ mod tests {
.post_id(inserted_post.id) .post_id(inserted_post.id)
.content("yes".to_owned()) .content("yes".to_owned())
.build(); .build();
let inserted_comment = Comment::create(pool, &comment_form, None).await.unwrap(); comment_forms.push((comment_form, None));
inserted_comment_ids.push(inserted_comment.id);
} }
} }
} }
Comment::create_batch(pool, &comment_forms).await.unwrap();
let mut listed_post_ids = vec![]; let mut listed_post_ids = vec![];
let mut page_after = None; let mut page_after = None;
loop { loop {

View file

@ -17,24 +17,34 @@ then
rm -rf $PGDATA rm -rf $PGDATA
fi fi
# Create cluster config_args=(
initdb --username=postgres --auth=trust --no-instructions \
# Only listen to socket in current directory # Only listen to socket in current directory
-c listen_addresses= -c unix_socket_directories=$PWD \ -c listen_addresses=
-c unix_socket_directories=$PWD
# Write logs to a file in $PGDATA/log # Write logs to a file in $PGDATA/log
-c logging_collector=on \ -c logging_collector=on
# Log all query plans by default # Log all query plans by default
-c session_preload_libraries=auto_explain -c auto_explain.log_min_duration=0 -c session_preload_libraries=auto_explain
-c auto_explain.log_min_duration=0
# Include actual row amounts and run times for query plan nodes # Include actual row amounts and run times for query plan nodes
-c auto_explain.log_analyze=on -c auto_explain.log_analyze=on
# Avoid sequential scans so query plans show what index scans can be done # Avoid sequential scans so query plans show what index scans can be done
# (index scan is normally avoided in some cases, such as the table being small enough) # (index scan is normally avoided in some cases, such as the table being small enough)
-c enable_seqscan=off -c enable_seqscan=off
# Don't log parameter values # Don't log parameter values
-c auto_explain.log_parameter_max_length=0 -c auto_explain.log_parameter_max_length=0
)
# Create cluster
initdb --username=postgres --auth=trust --no-instructions
# Start server that only listens to socket in current directory # Start server that only listens to socket in current directory
pg_ctl start pg_ctl start --options="${config_args[*]}"
# Setup database # Setup database
psql -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres psql -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres