Revert "attempt parallel pg_dump"

This reverts commit 592a127954.
This commit is contained in:
Dull Bananas 2024-05-20 19:35:55 +00:00
parent 47a4c35d8f
commit 9a528fb38a
4 changed files with 34 additions and 123 deletions

5
Cargo.lock generated
View file

@ -1255,9 +1255,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.13" version = "0.5.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95"
dependencies = [ dependencies = [
"crossbeam-utils", "crossbeam-utils",
] ]
@ -2814,7 +2814,6 @@ dependencies = [
"async-trait", "async-trait",
"bcrypt", "bcrypt",
"chrono", "chrono",
"crossbeam-channel",
"deadpool 0.10.0", "deadpool 0.10.0",
"diesel", "diesel",
"diesel-async", "diesel-async",

View file

@ -86,7 +86,6 @@ moka.workspace = true
serial_test = { workspace = true } serial_test = { workspace = true }
pretty_assertions = { workspace = true } pretty_assertions = { workspace = true }
diff = "0.1.13" diff = "0.1.13"
crossbeam-channel = "0.5.13"
[package.metadata.cargo-machete] [package.metadata.cargo-machete]
ignored = ["strum"] ignored = ["strum"]

View file

@ -48,8 +48,6 @@ const REPLACEABLE_SCHEMA: &[&str] = &[
struct MigrationHarnessWrapper<'a, 'b> { struct MigrationHarnessWrapper<'a, 'b> {
conn: &'a mut PgConnection, conn: &'a mut PgConnection,
options: &'b Options, options: &'b Options,
#[cfg(test)]
diff_checker:diff_check::DiffChecker,
} }
impl<'a, 'b> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b> { impl<'a, 'b> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b> {
@ -61,12 +59,13 @@ impl<'a, 'b> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b> {
#[cfg(test)] #[cfg(test)]
if self.options.enable_diff_check { if self.options.enable_diff_check {
let before = self.diff_checker.get_dump(); let before = diff_check::get_dump(&mut self.conn);
self.conn.run_migration(migration)?; self.conn.run_migration(migration)?;
self.conn.revert_migration(migration)?; self.conn.revert_migration(migration)?;
self.diff_checker.check_dump_diff( diff_check::check_dump_diff(
&mut self.conn,
before, before,
format!("migrations/{name}/down.sql"), &format!("migrations/{name}/down.sql"),
); );
} }
@ -76,10 +75,6 @@ impl<'a, 'b> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b> {
let duration = start_time.elapsed().as_millis(); let duration = start_time.elapsed().as_millis();
info!("{duration}ms run {name}"); info!("{duration}ms run {name}");
#[cfg(test)]
if result.is_err() {
self.diff_checker.finish();
}
result result
} }
@ -158,8 +153,6 @@ impl Options {
// TODO return struct with field `ran_replaceable_schema` // TODO return struct with field `ran_replaceable_schema`
pub fn run(options: Options) -> LemmyResult<()> { pub fn run(options: Options) -> LemmyResult<()> {
let db_url = SETTINGS.get_database_url(); let db_url = SETTINGS.get_database_url();
#[cfg(test)]
let mut diff_checker = diff_check::DiffChecker::new(&db_url)?;
// Migrations don't support async connection, and this function doesn't need to be async // Migrations don't support async connection, and this function doesn't need to be async
let mut conn = let mut conn =
@ -205,8 +198,6 @@ pub fn run(options: Options) -> LemmyResult<()> {
let mut wrapper = MigrationHarnessWrapper { let mut wrapper = MigrationHarnessWrapper {
conn, conn,
options: &options, options: &options,
#[cfg(test)]
diff_checker,
}; };
// * Prevent other lemmy_server processes from running this transaction simultaneously by repurposing // * Prevent other lemmy_server processes from running this transaction simultaneously by repurposing
@ -256,7 +247,7 @@ pub fn run(options: Options) -> LemmyResult<()> {
if !(options.revert && !options.redo_after_revert) { if !(options.revert && !options.redo_after_revert) {
#[cfg(test)] #[cfg(test)]
if options.enable_diff_check { if options.enable_diff_check {
let before = wrapper.diff_checker.get_dump(); let before = diff_check::get_dump(&mut wrapper.conn);
// todo move replaceable_schema dir path to let/const? // todo move replaceable_schema dir path to let/const?
wrapper wrapper
.conn .conn
@ -267,7 +258,7 @@ pub fn run(options: Options) -> LemmyResult<()> {
.conn .conn
.batch_execute("DROP SCHEMA IF EXISTS r CASCADE;")?; .batch_execute("DROP SCHEMA IF EXISTS r CASCADE;")?;
// todo use different first output line in this case // todo use different first output line in this case
wrapper.diff_checker.check_dump_diff(before, "replaceable_schema".to_owned()); diff_check::check_dump_diff(&mut wrapper.conn, before, "replaceable_schema");
} }
wrapper wrapper
@ -281,8 +272,6 @@ pub fn run(options: Options) -> LemmyResult<()> {
debug_assert_eq!(num_rows_updated, 1); debug_assert_eq!(num_rows_updated, 1);
} }
#[cfg(test)]
wrapper.diff_checker.finish();
Ok(()) Ok(())
}; };

View file

@ -1,122 +1,46 @@
use diesel::{PgConnection, RunQueryDsl,connection::SimpleConnection,Connection}; use diesel::{PgConnection, RunQueryDsl};
use lemmy_utils::settings::SETTINGS; use lemmy_utils::settings::SETTINGS;
use std::{ use std::{
borrow::Cow, borrow::Cow,
collections::BTreeSet, collections::BTreeSet,
fmt::Write, fmt::Write,
process::{Command, Stdio},thread,cell::OnceCell,sync::{Arc,Mutex},collections::HashMap,any::Any process::{Command, Stdio},
}; };
use crossbeam_channel::{Sender, Receiver};
enum DumpAction {
Send(Sender<String>),
Compare(Receiver<String>, String),
}
pub struct DiffChecker {
snapshot_conn: PgConnection,
handles: Vec<thread::JoinHandle<()>>,
snapshot_sender: Option<Sender<(String, DumpAction)>>,
error: Receiver<Box<dyn Any + Send + 'static>>,
// todo rename to channels
//dump_receivers: Arc<Mutex<HashMap<String, Receiver<String>>>>,
}
diesel::sql_function! { diesel::sql_function! {
fn pg_export_snapshot() -> diesel::sql_types::Text; fn pg_export_snapshot() -> diesel::sql_types::Text;
} }
impl DiffChecker { pub fn get_dump(conn: &mut PgConnection) -> String {
pub fn new(db_url: &str) -> diesel::result::QueryResult<Self> { /*// Required for pg_dump to see uncommitted changes from a different database connection
// todo use settings
let mut snapshot_conn = PgConnection::establish(db_url).expect("conn");
snapshot_conn.batch_execute("BEGIN;")?;
let (tx, rx) = crossbeam_channel::unbounded();
let (error_t, error_r) = crossbeam_channel::unbounded();
//let dump_receivers = Arc::new(Mutex::new(HashMap::new()));
let mut handles = Vec::new(); // The pg_dump command runs independently from `conn`, which means it can't see changes from
let n = usize::from(thread::available_parallelism().expect("parallelism")); // an uncommitted transaction. NASA made each migration run in a separate transaction. When
// todo remove // it was discovered that
assert_eq!(16,n); let snapshot = diesel::select(pg_export_snapshot())
for _ in 0..(n){ .get_result::<String>(conn)
let rx2 = rx.clone(); .expect("pg_export_snapshot failed");
let error_t = error_t.clone(); let snapshot_arg = format!("--snapshot={snapshot}");*/
handles.push(thread::spawn(move || if let Err(e) = std::panic::catch_unwind(move || {
while let Ok((snapshot, action)) = rx2.recv() {
let snapshot_arg = format!("--snapshot={snapshot}");
let output = Command::new("pg_dump") let output = Command::new("pg_dump")
.args(["--schema-only", &snapshot_arg]) .args(["--schema-only"])
.env("DATABASE_URL", SETTINGS.get_database_url()) .env("DATABASE_URL", SETTINGS.get_database_url())
.stderr(Stdio::inherit())
.output() .output()
.expect("failed to start pg_dump process"); .expect("failed to start pg_dump process");
if !output.status.success() { // TODO: use exit_ok method when it's stable
panic!("{}", String::from_utf8(output.stderr).expect("")); assert!(output.status.success());
}
let output_string = String::from_utf8(output.stdout).expect("pg_dump output is not valid UTF-8 text"); String::from_utf8(output.stdout).expect("pg_dump output is not valid UTF-8 text")
match action {
DumpAction::Send(x) => {x.send(output_string).ok();},
DumpAction::Compare(x, name) => {
if let Ok(before) = x.recv() {
if let Some(e) = check_dump_diff(before, output_string, &name) {
panic!("{e}");
} }
}
}
}
}
}){
error_t.send(e).ok();
}));
}
Ok(DiffChecker {snapshot_conn,handles,snapshot_sender:Some(tx),error:error_r})
}
fn check_err(&mut self) {
if let Ok(e) = self.error.try_recv() {
std::panic::resume_unwind(e);
}
}
pub fn finish(&mut self) {
self.snapshot_sender.take(); // stop threads from waiting
for handle in self.handles.drain(..) {
handle.join().expect("");
}
self.check_err();
}
fn get_snapshot(&mut self) -> String {
diesel::select(pg_export_snapshot())
.get_result::<String>(&mut self.snapshot_conn)
.expect("pg_export_snapshot failed")
}
pub fn get_dump(&mut self) -> Receiver<String> {
self.check_err();
let snapshot = self.get_snapshot();
let (tx, rx) = crossbeam_channel::unbounded(); // ::bounded(1);
self.snapshot_sender.as_mut().expect("").send((snapshot, DumpAction::Send(tx))).expect("send msg");
rx
}
pub fn check_dump_diff(&mut self, before: Receiver<String>, name: String) {
self.check_err();
let snapshot = self.get_snapshot();
self.snapshot_sender.as_mut().expect("").send((snapshot, DumpAction::Compare(before, name))).expect("compare msg");
}
}
const PATTERN_LEN: usize = 19; const PATTERN_LEN: usize = 19;
// TODO add unit test for output // TODO add unit test for output
pub fn check_dump_diff(mut before: String, mut after: String, name: &str) -> Option<String> { pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str) {
let mut after = get_dump(conn);
if after == before { if after == before {
return None; return;
} }
// Ignore timestamp differences by removing timestamps // Ignore timestamp differences by removing timestamps
for dump in [&mut before, &mut after] { for dump in [&mut before, &mut after] {
@ -173,7 +97,7 @@ pub fn check_dump_diff(mut before: String, mut after: String, name: &str) -> Opt
.collect::<Vec<_>>() .collect::<Vec<_>>()
}); });
if only_in_before.is_empty() && only_in_after.is_empty() { if only_in_before.is_empty() && only_in_after.is_empty() {
return None; return;
} }
let after_has_more = let after_has_more =
only_in_before.len() < only_in_after.len(); only_in_before.len() < only_in_after.len();
@ -218,13 +142,13 @@ pub fn check_dump_diff(mut before: String, mut after: String, name: &str) -> Opt
} }
.expect("failed to build string"); .expect("failed to build string");
} }
write!(&mut output, "\n{most_similar_chunk_filtered}").expect(""); write!(&mut output, "\n{most_similar_chunk_filtered}");
if !chunks_gt.is_empty() { if !chunks_gt.is_empty() {
chunks_gt.swap_remove(most_similar_chunk_index);} chunks_gt.swap_remove(most_similar_chunk_index);}
} }
// should have all been removed // should have all been removed
assert_eq!(chunks_gt.len(), 0); assert_eq!(chunks_gt.len(), 0);
Some(output) panic!("{output}");
} }
// todo inline? // todo inline?