2023-08-24 15:27:00 +00:00
use chrono ::{ DateTime , TimeZone , Utc } ;
2023-04-25 23:28:06 +00:00
use clokwerk ::{ Scheduler , TimeUnits as CTimeUnits } ;
use diesel ::{
2023-08-24 15:27:00 +00:00
dsl ::IntervalDsl ,
sql_types ::{ Integer , Timestamptz } ,
2023-04-25 23:28:06 +00:00
Connection ,
ExpressionMethods ,
2023-06-20 06:17:54 +00:00
NullableExpressionMethods ,
2023-04-25 23:28:06 +00:00
QueryDsl ,
2023-06-27 08:13:51 +00:00
QueryableByName ,
2023-04-25 23:28:06 +00:00
} ;
2021-01-29 16:38:27 +00:00
// Import week days and WeekDay
use diesel ::{ sql_query , PgConnection , RunQueryDsl } ;
2023-06-21 08:28:20 +00:00
use lemmy_api_common ::context ::LemmyContext ;
2023-02-18 14:36:12 +00:00
use lemmy_db_schema ::{
2023-07-14 15:17:06 +00:00
schema ::{
captcha_answer ,
comment ,
community_person_ban ,
instance ,
person ,
post ,
received_activity ,
sent_activity ,
} ,
2023-02-18 14:36:12 +00:00
source ::instance ::{ Instance , InstanceForm } ,
2023-08-24 15:27:00 +00:00
utils ::{ naive_now , now , DELETED_REPLACEMENT_TEXT } ,
2023-02-18 14:36:12 +00:00
} ;
use lemmy_routes ::nodeinfo ::NodeInfo ;
2023-07-13 14:12:01 +00:00
use lemmy_utils ::{
error ::{ LemmyError , LemmyResult } ,
REQWEST_TIMEOUT ,
} ;
2023-02-18 14:36:12 +00:00
use reqwest ::blocking ::Client ;
2021-01-29 16:38:27 +00:00
use std ::{ thread , time ::Duration } ;
2023-07-13 14:12:01 +00:00
use tracing ::{ error , info , warn } ;
2021-01-29 16:38:27 +00:00
/// Schedules various cleanup tasks for lemmy in a background thread
2023-06-21 08:28:20 +00:00
pub fn setup (
db_url : String ,
user_agent : String ,
context_1 : LemmyContext ,
) -> Result < ( ) , LemmyError > {
2022-11-09 10:05:00 +00:00
// Setup the connections
2021-01-29 16:38:27 +00:00
let mut scheduler = Scheduler ::new ( ) ;
2023-06-20 09:33:03 +00:00
startup_jobs ( & db_url ) ;
2021-08-26 11:49:16 +00:00
2023-06-08 20:15:15 +00:00
// Update active counts every hour
2023-06-20 09:33:03 +00:00
let url = db_url . clone ( ) ;
2023-04-25 23:28:06 +00:00
scheduler . every ( CTimeUnits ::hour ( 1 ) ) . run ( move | | {
2023-07-19 13:00:44 +00:00
PgConnection ::establish ( & url )
. map ( | mut conn | {
active_counts ( & mut conn ) ;
update_banned_when_expired ( & mut conn ) ;
} )
. map_err ( | e | {
error! ( " Failed to establish db connection for active counts update: {e} " ) ;
} )
. ok ( ) ;
2023-06-08 20:15:15 +00:00
} ) ;
2023-06-27 08:13:51 +00:00
// Update hot ranks every 15 minutes
2023-06-20 09:33:03 +00:00
let url = db_url . clone ( ) ;
2023-06-27 08:13:51 +00:00
scheduler . every ( CTimeUnits ::minutes ( 15 ) ) . run ( move | | {
2023-07-19 13:00:44 +00:00
PgConnection ::establish ( & url )
. map ( | mut conn | {
update_hot_ranks ( & mut conn ) ;
} )
. map_err ( | e | {
error! ( " Failed to establish db connection for hot ranks update: {e} " ) ;
} )
. ok ( ) ;
2021-01-29 16:38:27 +00:00
} ) ;
2023-06-27 10:38:53 +00:00
// Delete any captcha answers older than ten minutes, every ten minutes
let url = db_url . clone ( ) ;
scheduler . every ( CTimeUnits ::minutes ( 10 ) ) . run ( move | | {
2023-07-19 13:00:44 +00:00
PgConnection ::establish ( & url )
. map ( | mut conn | {
delete_expired_captcha_answers ( & mut conn ) ;
} )
. map_err ( | e | {
error! ( " Failed to establish db connection for captcha cleanup: {e} " ) ;
} )
. ok ( ) ;
2023-06-27 10:38:53 +00:00
} ) ;
2023-06-08 20:15:15 +00:00
// Clear old activities every week
2023-06-20 09:33:03 +00:00
let url = db_url . clone ( ) ;
2023-04-25 23:28:06 +00:00
scheduler . every ( CTimeUnits ::weeks ( 1 ) ) . run ( move | | {
2023-07-19 13:00:44 +00:00
PgConnection ::establish ( & url )
. map ( | mut conn | {
clear_old_activities ( & mut conn ) ;
} )
. map_err ( | e | {
error! ( " Failed to establish db connection for activity cleanup: {e} " ) ;
} )
. ok ( ) ;
2021-01-29 16:38:27 +00:00
} ) ;
2023-06-21 08:28:20 +00:00
// Remove old rate limit buckets after 1 to 2 hours of inactivity
scheduler . every ( CTimeUnits ::hour ( 1 ) ) . run ( move | | {
let hour = Duration ::from_secs ( 3600 ) ;
context_1 . settings_updated_channel ( ) . remove_older_than ( hour ) ;
} ) ;
2023-06-20 06:17:54 +00:00
// Overwrite deleted & removed posts and comments every day
let url = db_url . clone ( ) ;
scheduler . every ( CTimeUnits ::days ( 1 ) ) . run ( move | | {
2023-07-19 13:00:44 +00:00
PgConnection ::establish ( & db_url )
. map ( | mut conn | {
overwrite_deleted_posts_and_comments ( & mut conn ) ;
} )
. map_err ( | e | {
error! ( " Failed to establish db connection for deleted content cleanup: {e} " ) ;
} )
. ok ( ) ;
2023-06-20 06:17:54 +00:00
} ) ;
2023-06-20 09:33:03 +00:00
// Update the Instance Software
2023-04-25 23:28:06 +00:00
scheduler . every ( CTimeUnits ::days ( 1 ) ) . run ( move | | {
2023-07-19 13:00:44 +00:00
PgConnection ::establish ( & url )
. map ( | mut conn | {
update_instance_software ( & mut conn , & user_agent )
. map_err ( | e | warn! ( " Failed to update instance software: {e} " ) )
. ok ( ) ;
} )
. map_err ( | e | {
error! ( " Failed to establish db connection for instance software update: {e} " ) ;
} )
2023-07-13 14:12:01 +00:00
. ok ( ) ;
2023-02-18 14:36:12 +00:00
} ) ;
2021-01-29 16:38:27 +00:00
// Manually run the scheduler in an event loop
loop {
scheduler . run_pending ( ) ;
thread ::sleep ( Duration ::from_millis ( 1000 ) ) ;
}
}
2023-06-20 09:33:03 +00:00
/// Run these on server startup
fn startup_jobs ( db_url : & str ) {
let mut conn = PgConnection ::establish ( db_url ) . expect ( " could not establish connection " ) ;
active_counts ( & mut conn ) ;
2023-07-17 09:05:55 +00:00
update_hot_ranks ( & mut conn ) ;
2023-06-20 09:33:03 +00:00
update_banned_when_expired ( & mut conn ) ;
clear_old_activities ( & mut conn ) ;
2023-06-20 06:17:54 +00:00
overwrite_deleted_posts_and_comments ( & mut conn ) ;
2023-06-20 09:33:03 +00:00
}
2023-06-08 20:15:15 +00:00
/// Update the hot_rank columns for the aggregates tables
2023-06-27 08:13:51 +00:00
/// Runs in batches until all necessary rows are updated once
2023-07-17 09:05:55 +00:00
fn update_hot_ranks ( conn : & mut PgConnection ) {
info! ( " Updating hot ranks for all history... " ) ;
2023-06-27 08:13:51 +00:00
process_hot_ranks_in_batches (
conn ,
" post_aggregates " ,
2023-07-17 09:05:55 +00:00
" a.hot_rank != 0 OR a.hot_rank_active != 0 " ,
2023-06-27 08:13:51 +00:00
" SET hot_rank = hot_rank(a.score, a.published),
hot_rank_active = hot_rank ( a . score , a . newest_comment_time_necro ) " ,
) ;
process_hot_ranks_in_batches (
conn ,
" comment_aggregates " ,
2023-07-17 09:05:55 +00:00
" a.hot_rank != 0 " ,
2023-06-27 08:13:51 +00:00
" SET hot_rank = hot_rank(a.score, a.published) " ,
) ;
process_hot_ranks_in_batches (
conn ,
" community_aggregates " ,
2023-07-17 09:05:55 +00:00
" a.hot_rank != 0 " ,
2023-06-27 08:13:51 +00:00
" SET hot_rank = hot_rank(a.subscribers, a.published) " ,
) ;
info! ( " Finished hot ranks update! " ) ;
}
2023-06-08 20:15:15 +00:00
2023-06-27 08:13:51 +00:00
#[ derive(QueryableByName) ]
struct HotRanksUpdateResult {
2023-08-24 15:27:00 +00:00
#[ diesel(sql_type = Timestamptz) ]
published : DateTime < Utc > ,
2023-06-27 08:13:51 +00:00
}
2023-06-08 20:15:15 +00:00
2023-07-17 09:05:55 +00:00
/// Runs the hot rank update query in batches until all rows have been processed.
/// In `where_clause` and `set_clause`, "a" will refer to the current aggregates table.
2023-06-27 08:13:51 +00:00
/// Locked rows are skipped in order to prevent deadlocks (they will likely get updated on the next
/// run)
fn process_hot_ranks_in_batches (
conn : & mut PgConnection ,
table_name : & str ,
2023-07-17 09:05:55 +00:00
where_clause : & str ,
2023-06-27 08:13:51 +00:00
set_clause : & str ,
) {
2023-08-24 15:27:00 +00:00
let process_start_time : DateTime < Utc > = Utc
. timestamp_opt ( 0 , 0 )
. single ( )
. expect ( " 0 timestamp creation " ) ;
2023-07-17 09:05:55 +00:00
2023-06-27 08:13:51 +00:00
let update_batch_size = 1000 ; // Bigger batches than this tend to cause seq scans
2023-07-17 09:05:55 +00:00
let mut processed_rows_count = 0 ;
2023-06-27 08:13:51 +00:00
let mut previous_batch_result = Some ( process_start_time ) ;
while let Some ( previous_batch_last_published ) = previous_batch_result {
// Raw `sql_query` is used as a performance optimization - Diesel does not support doing this
// in a single query (neither as a CTE, nor using a subquery)
let result = sql_query ( format! (
r #" WITH batch AS (SELECT a.id
FROM { aggregates_table } a
2023-07-17 09:05:55 +00:00
WHERE a . published > $ 1 AND ( { where_clause } )
2023-06-27 08:13:51 +00:00
ORDER BY a . published
LIMIT $ 2
FOR UPDATE SKIP LOCKED )
UPDATE { aggregates_table } a { set_clause }
FROM batch WHERE a . id = batch . id RETURNING a . published ;
" #,
aggregates_table = table_name ,
2023-07-17 09:05:55 +00:00
set_clause = set_clause ,
where_clause = where_clause
2023-06-27 08:13:51 +00:00
) )
2023-08-24 15:27:00 +00:00
. bind ::< Timestamptz , _ > ( previous_batch_last_published )
2023-06-27 08:13:51 +00:00
. bind ::< Integer , _ > ( update_batch_size )
. get_results ::< HotRanksUpdateResult > ( conn ) ;
match result {
2023-07-17 09:05:55 +00:00
Ok ( updated_rows ) = > {
processed_rows_count + = updated_rows . len ( ) ;
previous_batch_result = updated_rows . last ( ) . map ( | row | row . published ) ;
}
2023-06-27 08:13:51 +00:00
Err ( e ) = > {
error! ( " Failed to update {} hot_ranks: {} " , table_name , e ) ;
break ;
}
2023-06-15 09:29:12 +00:00
}
}
2023-06-27 08:13:51 +00:00
info! (
2023-07-17 09:05:55 +00:00
" Finished process_hot_ranks_in_batches execution for {} (processed {} rows) " ,
table_name , processed_rows_count
2023-06-27 08:13:51 +00:00
) ;
2021-01-29 16:38:27 +00:00
}
2023-06-27 10:38:53 +00:00
fn delete_expired_captcha_answers ( conn : & mut PgConnection ) {
2023-07-19 13:00:44 +00:00
diesel ::delete (
2023-08-24 15:27:00 +00:00
captcha_answer ::table . filter ( captcha_answer ::published . lt ( now ( ) - IntervalDsl ::minutes ( 10 ) ) ) ,
2023-06-27 10:38:53 +00:00
)
. execute ( conn )
2023-07-19 13:00:44 +00:00
. map ( | _ | {
info! ( " Done. " ) ;
} )
. map_err ( | e | error! ( " Failed to clear old captcha answers: {e} " ) )
. ok ( ) ;
2023-06-27 10:38:53 +00:00
}
2021-01-29 16:38:27 +00:00
/// Clear old activities (this table gets very large)
2022-09-26 14:09:32 +00:00
fn clear_old_activities ( conn : & mut PgConnection ) {
2021-01-29 16:38:27 +00:00
info! ( " Clearing old activities... " ) ;
2023-08-24 15:27:00 +00:00
diesel ::delete ( sent_activity ::table . filter ( sent_activity ::published . lt ( now ( ) - 3. months ( ) ) ) )
2022-11-09 10:05:00 +00:00
. execute ( conn )
2023-07-19 13:00:44 +00:00
. map_err ( | e | error! ( " Failed to clear old sent activities: {e} " ) )
2023-07-14 15:17:06 +00:00
. ok ( ) ;
diesel ::delete (
2023-08-24 15:27:00 +00:00
received_activity ::table . filter ( received_activity ::published . lt ( now ( ) - 3. months ( ) ) ) ,
2023-07-14 15:17:06 +00:00
)
. execute ( conn )
2023-07-19 13:00:44 +00:00
. map ( | _ | info! ( " Done. " ) )
. map_err ( | e | error! ( " Failed to clear old received activities: {e} " ) )
2023-07-14 15:17:06 +00:00
. ok ( ) ;
2021-01-29 16:38:27 +00:00
}
2023-06-20 06:17:54 +00:00
/// overwrite posts and comments 30d after deletion
fn overwrite_deleted_posts_and_comments ( conn : & mut PgConnection ) {
info! ( " Overwriting deleted posts... " ) ;
2023-07-19 13:00:44 +00:00
diesel ::update (
2023-06-20 06:17:54 +00:00
post ::table
. filter ( post ::deleted . eq ( true ) )
2023-08-24 15:27:00 +00:00
. filter ( post ::updated . lt ( now ( ) . nullable ( ) - 1. months ( ) ) )
2023-06-20 06:17:54 +00:00
. filter ( post ::body . ne ( DELETED_REPLACEMENT_TEXT ) ) ,
)
. set ( (
post ::body . eq ( DELETED_REPLACEMENT_TEXT ) ,
post ::name . eq ( DELETED_REPLACEMENT_TEXT ) ,
) )
. execute ( conn )
2023-07-19 13:00:44 +00:00
. map ( | _ | {
info! ( " Done. " ) ;
} )
. map_err ( | e | error! ( " Failed to overwrite deleted posts: {e} " ) )
. ok ( ) ;
2023-06-20 06:17:54 +00:00
info! ( " Overwriting deleted comments... " ) ;
2023-07-19 13:00:44 +00:00
diesel ::update (
2023-06-20 06:17:54 +00:00
comment ::table
. filter ( comment ::deleted . eq ( true ) )
2023-08-24 15:27:00 +00:00
. filter ( comment ::updated . lt ( now ( ) . nullable ( ) - 1. months ( ) ) )
2023-06-20 06:17:54 +00:00
. filter ( comment ::content . ne ( DELETED_REPLACEMENT_TEXT ) ) ,
)
. set ( comment ::content . eq ( DELETED_REPLACEMENT_TEXT ) )
. execute ( conn )
2023-07-19 13:00:44 +00:00
. map ( | _ | {
info! ( " Done. " ) ;
} )
. map_err ( | e | error! ( " Failed to overwrite deleted comments: {e} " ) )
. ok ( ) ;
2023-06-20 06:17:54 +00:00
}
2021-01-29 16:38:27 +00:00
/// Re-calculate the site and community active counts every 12 hours
2022-09-26 14:09:32 +00:00
fn active_counts ( conn : & mut PgConnection ) {
2021-01-29 16:38:27 +00:00
info! ( " Updating active site and community aggregates ... " ) ;
let intervals = vec! [
( " 1 day " , " day " ) ,
( " 1 week " , " week " ) ,
( " 1 month " , " month " ) ,
( " 6 months " , " half_year " ) ,
] ;
for i in & intervals {
let update_site_stmt = format! (
2023-07-10 15:20:39 +00:00
" update site_aggregates set users_active_{} = (select * from site_aggregates_activity('{}')) where site_id = 1 " ,
2021-01-29 16:38:27 +00:00
i . 1 , i . 0
) ;
2023-07-19 13:00:44 +00:00
sql_query ( update_site_stmt )
. execute ( conn )
. map_err ( | e | error! ( " Failed to update site stats: {e} " ) )
. ok ( ) ;
2021-01-29 16:38:27 +00:00
let update_community_stmt = format! ( " update community_aggregates ca set users_active_ {} = mv.count_ from community_aggregates_activity(' {} ') mv where ca.community_id = mv.community_id_ " , i . 1 , i . 0 ) ;
2023-07-19 13:00:44 +00:00
sql_query ( update_community_stmt )
. execute ( conn )
. map_err ( | e | error! ( " Failed to update community stats: {e} " ) )
. ok ( ) ;
2021-01-29 16:38:27 +00:00
}
info! ( " Done. " ) ;
}
2022-03-30 13:56:23 +00:00
/// Set banned to false after ban expires
2022-09-26 14:09:32 +00:00
fn update_banned_when_expired ( conn : & mut PgConnection ) {
2022-03-30 13:56:23 +00:00
info! ( " Updating banned column if it expires ... " ) ;
2023-04-25 23:28:06 +00:00
2023-07-19 13:00:44 +00:00
diesel ::update (
2023-04-25 23:28:06 +00:00
person ::table
. filter ( person ::banned . eq ( true ) )
2023-08-24 15:27:00 +00:00
. filter ( person ::ban_expires . lt ( now ( ) . nullable ( ) ) ) ,
2023-04-25 23:28:06 +00:00
)
. set ( person ::banned . eq ( false ) )
. execute ( conn )
2023-07-19 13:00:44 +00:00
. map_err ( | e | error! ( " Failed to update person.banned when expires: {e} " ) )
. ok ( ) ;
2023-08-24 15:27:00 +00:00
diesel ::delete (
community_person_ban ::table . filter ( community_person_ban ::expires . lt ( now ( ) . nullable ( ) ) ) ,
)
. execute ( conn )
. map_err ( | e | error! ( " Failed to remove community_ban expired rows: {e} " ) )
. ok ( ) ;
2022-03-30 13:56:23 +00:00
}
2022-09-07 12:12:51 +00:00
2023-02-18 14:36:12 +00:00
/// Updates the instance software and version
2023-07-13 14:12:01 +00:00
///
/// TODO: this should be async
/// TODO: if instance has been dead for a long time, it should be checked less frequently
fn update_instance_software ( conn : & mut PgConnection , user_agent : & str ) -> LemmyResult < ( ) > {
2023-02-18 14:36:12 +00:00
info! ( " Updating instances software and versions... " ) ;
2023-07-13 14:12:01 +00:00
let client = Client ::builder ( )
2023-02-18 14:36:12 +00:00
. user_agent ( user_agent )
. timeout ( REQWEST_TIMEOUT )
2023-08-21 08:53:35 +00:00
. connect_timeout ( REQWEST_TIMEOUT )
2023-07-13 14:12:01 +00:00
. build ( ) ? ;
2023-02-18 14:36:12 +00:00
2023-07-13 14:12:01 +00:00
let instances = instance ::table . get_results ::< Instance > ( conn ) ? ;
2023-02-18 14:36:12 +00:00
for instance in instances {
let node_info_url = format! ( " https:// {} /nodeinfo/2.0.json " , instance . domain ) ;
2023-07-13 14:12:01 +00:00
// The `updated` column is used to check if instances are alive. If it is more than three days
// in the past, no outgoing activities will be sent to that instance. However not every
// Fediverse instance has a valid Nodeinfo endpoint (its not required for Activitypub). That's
// why we always need to mark instances as updated if they are alive.
let default_form = InstanceForm ::builder ( )
. domain ( instance . domain . clone ( ) )
. updated ( Some ( naive_now ( ) ) )
. build ( ) ;
let form = match client . get ( & node_info_url ) . send ( ) {
Ok ( res ) if res . status ( ) . is_client_error ( ) = > {
// Instance doesnt have nodeinfo but sent a response, consider it alive
Some ( default_form )
}
Ok ( res ) = > match res . json ::< NodeInfo > ( ) {
Ok ( node_info ) = > {
// Instance sent valid nodeinfo, write it to db
2023-08-01 08:55:28 +00:00
let software = node_info . software . as_ref ( ) ;
2023-07-13 14:12:01 +00:00
Some (
InstanceForm ::builder ( )
. domain ( instance . domain )
. updated ( Some ( naive_now ( ) ) )
2023-08-01 08:55:28 +00:00
. software ( software . and_then ( | s | s . name . clone ( ) ) )
. version ( software . and_then ( | s | s . version . clone ( ) ) )
2023-07-13 14:12:01 +00:00
. build ( ) ,
)
2023-06-15 09:29:12 +00:00
}
2023-07-13 14:12:01 +00:00
Err ( _ ) = > {
// No valid nodeinfo but valid HTTP response, consider instance alive
Some ( default_form )
2023-06-15 09:29:12 +00:00
}
2023-07-13 14:12:01 +00:00
} ,
Err ( _ ) = > {
// dead instance, do nothing
None
2023-06-15 09:29:12 +00:00
}
2023-07-13 14:12:01 +00:00
} ;
if let Some ( form ) = form {
diesel ::update ( instance ::table . find ( instance . id ) )
. set ( form )
. execute ( conn ) ? ;
2023-02-18 14:36:12 +00:00
}
}
2023-07-13 14:12:01 +00:00
info! ( " Finished updating instances software and versions... " ) ;
Ok ( ( ) )
2023-02-18 14:36:12 +00:00
}
#[ cfg(test) ]
mod tests {
2023-07-17 15:04:14 +00:00
#![ allow(clippy::unwrap_used) ]
#![ allow(clippy::indexing_slicing) ]
2023-02-18 14:36:12 +00:00
use lemmy_routes ::nodeinfo ::NodeInfo ;
use reqwest ::Client ;
#[ tokio::test ]
2023-02-28 21:45:37 +00:00
#[ ignore ]
2023-02-18 14:36:12 +00:00
async fn test_nodeinfo ( ) {
let client = Client ::builder ( ) . build ( ) . unwrap ( ) ;
let lemmy_ml_nodeinfo = client
. get ( " https://lemmy.ml/nodeinfo/2.0.json " )
. send ( )
. await
. unwrap ( )
. json ::< NodeInfo > ( )
. await
. unwrap ( ) ;
assert_eq! ( lemmy_ml_nodeinfo . software . unwrap ( ) . name . unwrap ( ) , " lemmy " ) ;
}
}