diff --git a/crates/db_schema/replaceable_schema/triggers.sql b/crates/db_schema/replaceable_schema/triggers.sql index d869a5e1e..fa5b01018 100644 --- a/crates/db_schema/replaceable_schema/triggers.sql +++ b/crates/db_schema/replaceable_schema/triggers.sql @@ -5,6 +5,12 @@ -- (even if only other columns are updated) because triggers can run after the deletion of referenced rows and -- before the automatic deletion of the row that references it. This is not a problem for insert or delete. -- +-- After a row update begins, a concurrent update on the same row can't begin until the whole +-- transaction that contains the first update is finished. To reduce this locking, statements in +-- triggers should be ordered based on the likelihood of concurrent writers. For example, updating +-- site_aggregates should be done last because the same row is updated for all local stuff. If +-- it were not last, then the locking period for concurrent writers would extend to include the +-- time consumed by statements that come after. -- -- -- Create triggers for both post and comments @@ -38,16 +44,18 @@ BEGIN (thing_like).thing_id, coalesce(sum(count_diff) FILTER (WHERE (thing_like).score = 1), 0) AS upvotes, coalesce(sum(count_diff) FILTER (WHERE (thing_like).score != 1), 0) AS downvotes FROM select_old_and_new_rows AS old_and_new_rows GROUP BY (thing_like).thing_id) AS diff WHERE a.thing_id = diff.thing_id - RETURNING - r.creator_id_from_thing_aggregates (a.*) AS creator_id, diff.upvotes - diff.downvotes AS score) - UPDATE - person_aggregates AS a - SET - thing_score = a.thing_score + diff.score FROM ( - SELECT - creator_id, sum(score) AS score FROM thing_diff GROUP BY creator_id) AS diff - WHERE - a.person_id = diff.creator_id; + AND (diff.upvotes, diff.downvotes) != (0, 0) + RETURNING + r.creator_id_from_thing_aggregates (a.*) AS creator_id, diff.upvotes - diff.downvotes AS score) + UPDATE + person_aggregates AS a + SET + thing_score = a.thing_score + diff.score FROM ( + SELECT + creator_id, sum(score) AS score FROM thing_diff GROUP BY creator_id) AS diff + WHERE + a.person_id = diff.creator_id + AND diff.score != 0; RETURN NULL; END; $$); @@ -62,6 +70,21 @@ CALL r.post_or_comment ('post'); CALL r.post_or_comment ('comment'); -- Create triggers that update counts in parent aggregates +CREATE FUNCTION r.parent_comment_ids (path ltree) + RETURNS SETOF int + LANGUAGE sql + IMMUTABLE parallel safe +BEGIN + ATOMIC + SELECT + comment_id::int + FROM + string_to_table (ltree2text (path), '.') AS comment_id + -- Skip first and last +LIMIT (nlevel (path) - 2) OFFSET 1; + +END; + CALL r.create_triggers ('comment', $$ BEGIN UPDATE @@ -76,60 +99,84 @@ BEGIN r.is_counted (comment) GROUP BY (comment).creator_id) AS diff WHERE - a.person_id = diff.creator_id; + a.person_id = diff.creator_id + AND diff.comment_count != 0; UPDATE - site_aggregates AS a + comment_aggregates AS a SET - comments = a.comments + diff.comments + child_count = a.child_count + diff.child_count FROM ( SELECT - coalesce(sum(count_diff), 0) AS comments - FROM - select_old_and_new_rows AS old_and_new_rows - WHERE - r.is_counted (comment) - AND (comment).local) AS diff; + parent_id, + coalesce(sum(count_diff), 0) AS child_count + FROM ( + -- For each inserted or deleted comment, this outputs 1 row for each parent comment. + -- For example, this: + -- + -- count_diff | (comment).path + -- ------------+---------------- + -- 1 | 0.5.6.7 + -- 1 | 0.5.6.7.8 + -- + -- becomes this: + -- + -- count_diff | parent_id + -- ------------+----------- + -- 1 | 5 + -- 1 | 6 + -- 1 | 5 + -- 1 | 6 + -- 1 | 7 + SELECT + count_diff, + parent_id + FROM + select_old_and_new_rows AS old_and_new_rows, + LATERAL r.parent_comment_ids ((comment).path) AS parent_id) AS expanded_old_and_new_rows + GROUP BY + parent_id) AS diff +WHERE + a.comment_id = diff.parent_id + AND diff.child_count != 0; WITH post_diff AS ( UPDATE post_aggregates AS a SET comments = a.comments + diff.comments, - newest_comment_time = GREATEST (a.newest_comment_time, ( - SELECT - published - FROM select_new_rows AS new_comment - WHERE - a.post_id = new_comment.post_id ORDER BY published DESC LIMIT 1)), - newest_comment_time_necro = GREATEST (a.newest_comment_time_necro, ( - SELECT - published - FROM select_new_rows AS new_comment - WHERE - a.post_id = new_comment.post_id - -- Ignore comments from the post's creator - AND a.creator_id != new_comment.creator_id - -- Ignore comments on old posts - AND a.published > (new_comment.published - '2 days'::interval) - ORDER BY published DESC LIMIT 1)) + newest_comment_time = GREATEST (a.newest_comment_time, diff.newest_comment_time), + newest_comment_time_necro = GREATEST (a.newest_comment_time_necro, diff.newest_comment_time_necro) FROM ( SELECT - (comment).post_id, - coalesce(sum(count_diff), 0) AS comments + post.id AS post_id, + coalesce(sum(count_diff), 0) AS comments, + -- Old rows are excluded using `count_diff = 1` + max((comment).published) FILTER (WHERE count_diff = 1) AS newest_comment_time, + max((comment).published) FILTER (WHERE count_diff = 1 + -- Ignore comments from the post's creator + AND post.creator_id != (comment).creator_id + -- Ignore comments on old posts + AND post.published > ((comment).published - '2 days'::interval)) AS newest_comment_time_necro, + r.is_counted (post.*) AS include_in_community_aggregates FROM select_old_and_new_rows AS old_and_new_rows + LEFT JOIN post ON post.id = (comment).post_id WHERE r.is_counted (comment) GROUP BY - (comment).post_id) AS diff - LEFT JOIN post ON post.id = diff.post_id + post.id) AS diff WHERE a.post_id = diff.post_id + AND (diff.comments, + GREATEST (a.newest_comment_time, diff.newest_comment_time), + GREATEST (a.newest_comment_time_necro, diff.newest_comment_time_necro)) != (0, + a.newest_comment_time, + a.newest_comment_time_necro) RETURNING a.community_id, diff.comments, - r.is_counted (post.*) AS include_in_community_aggregates) + diff.include_in_community_aggregates) UPDATE community_aggregates AS a SET @@ -145,7 +192,23 @@ FROM ( GROUP BY community_id) AS diff WHERE - a.community_id = diff.community_id; + a.community_id = diff.community_id + AND diff.comments != 0; + +UPDATE + site_aggregates AS a +SET + comments = a.comments + diff.comments +FROM ( + SELECT + coalesce(sum(count_diff), 0) AS comments + FROM + select_old_and_new_rows AS old_and_new_rows + WHERE + r.is_counted (comment) + AND (comment).local) AS diff +WHERE + diff.comments != 0; RETURN NULL; @@ -167,20 +230,8 @@ BEGIN r.is_counted (post) GROUP BY (post).creator_id) AS diff WHERE - a.person_id = diff.creator_id; - -UPDATE - site_aggregates AS a -SET - posts = a.posts + diff.posts -FROM ( - SELECT - coalesce(sum(count_diff), 0) AS posts - FROM - select_old_and_new_rows AS old_and_new_rows - WHERE - r.is_counted (post) - AND (post).local) AS diff; + a.person_id = diff.creator_id + AND diff.post_count != 0; UPDATE community_aggregates AS a @@ -197,7 +248,23 @@ FROM ( GROUP BY (post).community_id) AS diff WHERE - a.community_id = diff.community_id; + a.community_id = diff.community_id + AND diff.posts != 0; + +UPDATE + site_aggregates AS a +SET + posts = a.posts + diff.posts +FROM ( + SELECT + coalesce(sum(count_diff), 0) AS posts + FROM + select_old_and_new_rows AS old_and_new_rows + WHERE + r.is_counted (post) + AND (post).local) AS diff +WHERE + diff.posts != 0; RETURN NULL; @@ -217,7 +284,9 @@ BEGIN FROM select_old_and_new_rows AS old_and_new_rows WHERE r.is_counted (community) - AND (community).local) AS diff; + AND (community).local) AS diff +WHERE + diff.communities != 0; RETURN NULL; @@ -235,7 +304,9 @@ BEGIN SELECT coalesce(sum(count_diff), 0) AS users FROM select_old_and_new_rows AS old_and_new_rows - WHERE (person).local) AS diff; + WHERE (person).local) AS diff +WHERE + diff.users != 0; RETURN NULL; @@ -270,7 +341,8 @@ BEGIN GROUP BY old_post.community_id) AS diff WHERE - a.community_id = diff.community_id; + a.community_id = diff.community_id + AND diff.comments != 0; RETURN NULL; END; $$; @@ -296,7 +368,8 @@ BEGIN LEFT JOIN community ON community.id = (community_follower).community_id LEFT JOIN person ON person.id = (community_follower).person_id GROUP BY (community_follower).community_id) AS diff WHERE - a.community_id = diff.community_id; + a.community_id = diff.community_id + AND (diff.subscribers, diff.subscribers_local) != (0, 0); RETURN NULL; @@ -474,3 +547,24 @@ CREATE TRIGGER delete_follow FOR EACH ROW EXECUTE FUNCTION r.delete_follow_before_person (); +-- Triggers that change values before insert or update +CREATE FUNCTION r.comment_change_values () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +DECLARE + id text = NEW.id::text; +BEGIN + -- Make `path` end with `id` if it doesn't already + IF NOT (NEW.path ~ ('*.' || id)::lquery) THEN + NEW.path = NEW.path || id; + END IF; + RETURN NEW; +END +$$; + +CREATE TRIGGER change_values + BEFORE INSERT OR UPDATE ON comment + FOR EACH ROW + EXECUTE FUNCTION r.comment_change_values (); + diff --git a/crates/db_schema/src/impls/comment.rs b/crates/db_schema/src/impls/comment.rs index 30c058b89..eff7da26f 100644 --- a/crates/db_schema/src/impls/comment.rs +++ b/crates/db_schema/src/impls/comment.rs @@ -15,12 +15,7 @@ use crate::{ utils::{functions::coalesce, get_conn, naive_now, DbPool, DELETED_REPLACEMENT_TEXT}, }; use chrono::{DateTime, Utc}; -use diesel::{ - dsl::{insert_into, sql_query}, - result::Error, - ExpressionMethods, - QueryDsl, -}; +use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; use diesel_ltree::Ltree; use url::Url; @@ -72,81 +67,23 @@ impl Comment { parent_path: Option<&Ltree>, ) -> Result { let conn = &mut get_conn(pool).await?; + let comment_form = (comment_form, parent_path.map(|p| comment::path.eq(p))); - conn - .build_transaction() - .run(|conn| { - Box::pin(async move { - // Insert, to get the id - let inserted_comment = if let Some(timestamp) = timestamp { - insert_into(comment::table) - .values(comment_form) - .on_conflict(comment::ap_id) - .filter_target(coalesce(comment::updated, comment::published).lt(timestamp)) - .do_update() - .set(comment_form) - .get_result::(conn) - .await? - } else { - insert_into(comment::table) - .values(comment_form) - .get_result::(conn) - .await? - }; - - let comment_id = inserted_comment.id; - - // You need to update the ltree column - let ltree = Ltree(if let Some(parent_path) = parent_path { - // The previous parent will already have 0 in it - // Append this comment id - format!("{}.{}", parent_path.0, comment_id) - } else { - // '0' is always the first path, append to that - format!("{}.{}", 0, comment_id) - }); - - let updated_comment = diesel::update(comment::table.find(comment_id)) - .set(comment::path.eq(ltree)) - .get_result::(conn) - .await?; - - // Update the child count for the parent comment_aggregates - // You could do this with a trigger, but since you have to do this manually anyway, - // you can just have it here - if let Some(parent_path) = parent_path { - // You have to update counts for all parents, not just the immediate one - // TODO if the performance of this is terrible, it might be better to do this as part of a - // scheduled query... although the counts would often be wrong. - // - // The child_count query for reference: - // select c.id, c.path, count(c2.id) as child_count from comment c - // left join comment c2 on c2.path <@ c.path and c2.path != c.path - // group by c.id - - let parent_id = parent_path.0.split('.').nth(1); - - if let Some(parent_id) = parent_id { - let top_parent = format!("0.{}", parent_id); - let update_child_count_stmt = format!( - " -update comment_aggregates ca set child_count = c.child_count -from ( - select c.id, c.path, count(c2.id) as child_count from comment c - join comment c2 on c2.path <@ c.path and c2.path != c.path - and c.path <@ '{top_parent}' - group by c.id -) as c -where ca.comment_id = c.id" - ); - - sql_query(update_child_count_stmt).execute(conn).await?; - } - } - Ok(updated_comment) - }) as _ - }) - .await + if let Some(timestamp) = timestamp { + insert_into(comment::table) + .values(comment_form) + .on_conflict(comment::ap_id) + .filter_target(coalesce(comment::updated, comment::published).lt(timestamp)) + .do_update() + .set(comment_form) + .get_result::(conn) + .await + } else { + insert_into(comment::table) + .values(comment_form) + .get_result::(conn) + .await + } } pub async fn read_from_apub_id( diff --git a/migrations/2024-05-04-140749_separate_triggers/down.sql b/migrations/2024-05-04-140749_separate_triggers/down.sql new file mode 100644 index 000000000..deb75def2 --- /dev/null +++ b/migrations/2024-05-04-140749_separate_triggers/down.sql @@ -0,0 +1,3 @@ +SELECT + 1; + diff --git a/migrations/2024-05-04-140749_separate_triggers/up.sql b/migrations/2024-05-04-140749_separate_triggers/up.sql new file mode 100644 index 000000000..2e9d05052 --- /dev/null +++ b/migrations/2024-05-04-140749_separate_triggers/up.sql @@ -0,0 +1,4 @@ +-- This migration exists to trigger re-execution of replaceable_schema +SELECT + 1; +