mirror of
https://github.com/LemmyNet/lemmy.git
synced 2025-01-18 20:40:14 +00:00
attempt parallel pg_dump
This commit is contained in:
parent
ced9bb5216
commit
592a127954
5
Cargo.lock
generated
5
Cargo.lock
generated
|
@ -1255,9 +1255,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crossbeam-channel"
|
name = "crossbeam-channel"
|
||||||
version = "0.5.12"
|
version = "0.5.13"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95"
|
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"crossbeam-utils",
|
"crossbeam-utils",
|
||||||
]
|
]
|
||||||
|
@ -2814,6 +2814,7 @@ dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"bcrypt",
|
"bcrypt",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
"crossbeam-channel",
|
||||||
"deadpool 0.10.0",
|
"deadpool 0.10.0",
|
||||||
"diesel",
|
"diesel",
|
||||||
"diesel-async",
|
"diesel-async",
|
||||||
|
|
|
@ -86,6 +86,7 @@ moka.workspace = true
|
||||||
serial_test = { workspace = true }
|
serial_test = { workspace = true }
|
||||||
pretty_assertions = { workspace = true }
|
pretty_assertions = { workspace = true }
|
||||||
diff = "0.1.13"
|
diff = "0.1.13"
|
||||||
|
crossbeam-channel = "0.5.13"
|
||||||
|
|
||||||
[package.metadata.cargo-machete]
|
[package.metadata.cargo-machete]
|
||||||
ignored = ["strum"]
|
ignored = ["strum"]
|
||||||
|
|
|
@ -48,6 +48,8 @@ const REPLACEABLE_SCHEMA: &[&str] = &[
|
||||||
struct MigrationHarnessWrapper<'a, 'b> {
|
struct MigrationHarnessWrapper<'a, 'b> {
|
||||||
conn: &'a mut PgConnection,
|
conn: &'a mut PgConnection,
|
||||||
options: &'b Options,
|
options: &'b Options,
|
||||||
|
#[cfg(test)]
|
||||||
|
diff_checker:diff_check::DiffChecker,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, 'b> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b> {
|
impl<'a, 'b> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b> {
|
||||||
|
@ -59,13 +61,12 @@ impl<'a, 'b> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b> {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
if self.options.enable_diff_check {
|
if self.options.enable_diff_check {
|
||||||
let before = diff_check::get_dump(&mut self.conn);
|
let before = self.diff_checker.get_dump();
|
||||||
self.conn.run_migration(migration)?;
|
self.conn.run_migration(migration)?;
|
||||||
self.conn.revert_migration(migration)?;
|
self.conn.revert_migration(migration)?;
|
||||||
diff_check::check_dump_diff(
|
self.diff_checker.check_dump_diff(
|
||||||
&mut self.conn,
|
|
||||||
before,
|
before,
|
||||||
&format!("migrations/{name}/down.sql"),
|
format!("migrations/{name}/down.sql"),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,6 +76,10 @@ impl<'a, 'b> MigrationHarness<Pg> for MigrationHarnessWrapper<'a, 'b> {
|
||||||
|
|
||||||
let duration = start_time.elapsed().as_millis();
|
let duration = start_time.elapsed().as_millis();
|
||||||
info!("{duration}ms run {name}");
|
info!("{duration}ms run {name}");
|
||||||
|
#[cfg(test)]
|
||||||
|
if result.is_err() {
|
||||||
|
self.diff_checker.finish();
|
||||||
|
}
|
||||||
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
@ -153,6 +158,8 @@ impl Options {
|
||||||
// TODO return struct with field `ran_replaceable_schema`
|
// TODO return struct with field `ran_replaceable_schema`
|
||||||
pub fn run(options: Options) -> LemmyResult<()> {
|
pub fn run(options: Options) -> LemmyResult<()> {
|
||||||
let db_url = SETTINGS.get_database_url();
|
let db_url = SETTINGS.get_database_url();
|
||||||
|
#[cfg(test)]
|
||||||
|
let mut diff_checker = diff_check::DiffChecker::new(&db_url)?;
|
||||||
|
|
||||||
// Migrations don't support async connection, and this function doesn't need to be async
|
// Migrations don't support async connection, and this function doesn't need to be async
|
||||||
let mut conn =
|
let mut conn =
|
||||||
|
@ -198,6 +205,8 @@ pub fn run(options: Options) -> LemmyResult<()> {
|
||||||
let mut wrapper = MigrationHarnessWrapper {
|
let mut wrapper = MigrationHarnessWrapper {
|
||||||
conn,
|
conn,
|
||||||
options: &options,
|
options: &options,
|
||||||
|
#[cfg(test)]
|
||||||
|
diff_checker,
|
||||||
};
|
};
|
||||||
|
|
||||||
// * Prevent other lemmy_server processes from running this transaction simultaneously by repurposing
|
// * Prevent other lemmy_server processes from running this transaction simultaneously by repurposing
|
||||||
|
@ -247,7 +256,7 @@ pub fn run(options: Options) -> LemmyResult<()> {
|
||||||
if !(options.revert && !options.redo_after_revert) {
|
if !(options.revert && !options.redo_after_revert) {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
if options.enable_diff_check {
|
if options.enable_diff_check {
|
||||||
let before = diff_check::get_dump(&mut wrapper.conn);
|
let before = wrapper.diff_checker.get_dump();
|
||||||
// todo move replaceable_schema dir path to let/const?
|
// todo move replaceable_schema dir path to let/const?
|
||||||
wrapper
|
wrapper
|
||||||
.conn
|
.conn
|
||||||
|
@ -258,7 +267,7 @@ pub fn run(options: Options) -> LemmyResult<()> {
|
||||||
.conn
|
.conn
|
||||||
.batch_execute("DROP SCHEMA IF EXISTS r CASCADE;")?;
|
.batch_execute("DROP SCHEMA IF EXISTS r CASCADE;")?;
|
||||||
// todo use different first output line in this case
|
// todo use different first output line in this case
|
||||||
diff_check::check_dump_diff(&mut wrapper.conn, before, "replaceable_schema");
|
wrapper.diff_checker.check_dump_diff(before, "replaceable_schema".to_owned());
|
||||||
}
|
}
|
||||||
|
|
||||||
wrapper
|
wrapper
|
||||||
|
@ -272,6 +281,8 @@ pub fn run(options: Options) -> LemmyResult<()> {
|
||||||
|
|
||||||
debug_assert_eq!(num_rows_updated, 1);
|
debug_assert_eq!(num_rows_updated, 1);
|
||||||
}
|
}
|
||||||
|
#[cfg(test)]
|
||||||
|
wrapper.diff_checker.finish();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,46 +1,122 @@
|
||||||
use diesel::{PgConnection, RunQueryDsl};
|
use diesel::{PgConnection, RunQueryDsl,connection::SimpleConnection,Connection};
|
||||||
use lemmy_utils::settings::SETTINGS;
|
use lemmy_utils::settings::SETTINGS;
|
||||||
use std::{
|
use std::{
|
||||||
borrow::Cow,
|
borrow::Cow,
|
||||||
collections::BTreeSet,
|
collections::BTreeSet,
|
||||||
fmt::Write,
|
fmt::Write,
|
||||||
process::{Command, Stdio},
|
process::{Command, Stdio},thread,cell::OnceCell,sync::{Arc,Mutex},collections::HashMap,any::Any
|
||||||
};
|
};
|
||||||
|
use crossbeam_channel::{Sender, Receiver};
|
||||||
|
|
||||||
|
enum DumpAction {
|
||||||
|
Send(Sender<String>),
|
||||||
|
Compare(Receiver<String>, String),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DiffChecker {
|
||||||
|
snapshot_conn: PgConnection,
|
||||||
|
handles: Vec<thread::JoinHandle<()>>,
|
||||||
|
snapshot_sender: Option<Sender<(String, DumpAction)>>,
|
||||||
|
error: Receiver<Box<dyn Any + Send + 'static>>,
|
||||||
|
// todo rename to channels
|
||||||
|
//dump_receivers: Arc<Mutex<HashMap<String, Receiver<String>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
diesel::sql_function! {
|
diesel::sql_function! {
|
||||||
fn pg_export_snapshot() -> diesel::sql_types::Text;
|
fn pg_export_snapshot() -> diesel::sql_types::Text;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_dump(conn: &mut PgConnection) -> String {
|
impl DiffChecker {
|
||||||
/*// Required for pg_dump to see uncommitted changes from a different database connection
|
pub fn new(db_url: &str) -> diesel::result::QueryResult<Self> {
|
||||||
|
// todo use settings
|
||||||
|
let mut snapshot_conn = PgConnection::establish(db_url).expect("conn");
|
||||||
|
snapshot_conn.batch_execute("BEGIN;")?;
|
||||||
|
let (tx, rx) = crossbeam_channel::unbounded();
|
||||||
|
let (error_t, error_r) = crossbeam_channel::unbounded();
|
||||||
|
//let dump_receivers = Arc::new(Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
// The pg_dump command runs independently from `conn`, which means it can't see changes from
|
let mut handles = Vec::new();
|
||||||
// an uncommitted transaction. NASA made each migration run in a separate transaction. When
|
let n = usize::from(thread::available_parallelism().expect("parallelism"));
|
||||||
// it was discovered that
|
// todo remove
|
||||||
let snapshot = diesel::select(pg_export_snapshot())
|
assert_eq!(16,n);
|
||||||
.get_result::<String>(conn)
|
for _ in 0..(n){
|
||||||
.expect("pg_export_snapshot failed");
|
let rx2 = rx.clone();
|
||||||
let snapshot_arg = format!("--snapshot={snapshot}");*/
|
let error_t = error_t.clone();
|
||||||
|
handles.push(thread::spawn(move || if let Err(e) = std::panic::catch_unwind(move || {
|
||||||
|
while let Ok((snapshot, action)) = rx2.recv() {
|
||||||
|
let snapshot_arg = format!("--snapshot={snapshot}");
|
||||||
let output = Command::new("pg_dump")
|
let output = Command::new("pg_dump")
|
||||||
.args(["--schema-only"])
|
.args(["--schema-only", &snapshot_arg])
|
||||||
.env("DATABASE_URL", SETTINGS.get_database_url())
|
.env("DATABASE_URL", SETTINGS.get_database_url())
|
||||||
.stderr(Stdio::inherit())
|
|
||||||
.output()
|
.output()
|
||||||
.expect("failed to start pg_dump process");
|
.expect("failed to start pg_dump process");
|
||||||
|
|
||||||
// TODO: use exit_ok method when it's stable
|
if !output.status.success() {
|
||||||
assert!(output.status.success());
|
panic!("{}", String::from_utf8(output.stderr).expect(""));
|
||||||
|
}
|
||||||
|
|
||||||
String::from_utf8(output.stdout).expect("pg_dump output is not valid UTF-8 text")
|
let output_string = String::from_utf8(output.stdout).expect("pg_dump output is not valid UTF-8 text");
|
||||||
|
match action {
|
||||||
|
DumpAction::Send(x) => {x.send(output_string).ok();},
|
||||||
|
DumpAction::Compare(x, name) => {
|
||||||
|
if let Ok(before) = x.recv() {
|
||||||
|
if let Some(e) = check_dump_diff(before, output_string, &name) {
|
||||||
|
panic!("{e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}){
|
||||||
|
error_t.send(e).ok();
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(DiffChecker {snapshot_conn,handles,snapshot_sender:Some(tx),error:error_r})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_err(&mut self) {
|
||||||
|
if let Ok(e) = self.error.try_recv() {
|
||||||
|
std::panic::resume_unwind(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn finish(&mut self) {
|
||||||
|
self.snapshot_sender.take(); // stop threads from waiting
|
||||||
|
for handle in self.handles.drain(..) {
|
||||||
|
handle.join().expect("");
|
||||||
|
}
|
||||||
|
self.check_err();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_snapshot(&mut self) -> String {
|
||||||
|
diesel::select(pg_export_snapshot())
|
||||||
|
.get_result::<String>(&mut self.snapshot_conn)
|
||||||
|
.expect("pg_export_snapshot failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_dump(&mut self) -> Receiver<String> {
|
||||||
|
self.check_err();
|
||||||
|
let snapshot = self.get_snapshot();
|
||||||
|
let (tx, rx) = crossbeam_channel::unbounded(); // ::bounded(1);
|
||||||
|
self.snapshot_sender.as_mut().expect("").send((snapshot, DumpAction::Send(tx))).expect("send msg");
|
||||||
|
rx
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn check_dump_diff(&mut self, before: Receiver<String>, name: String) {
|
||||||
|
self.check_err();
|
||||||
|
let snapshot = self.get_snapshot();
|
||||||
|
self.snapshot_sender.as_mut().expect("").send((snapshot, DumpAction::Compare(before, name))).expect("compare msg");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
const PATTERN_LEN: usize = 19;
|
const PATTERN_LEN: usize = 19;
|
||||||
|
|
||||||
// TODO add unit test for output
|
// TODO add unit test for output
|
||||||
pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str) {
|
pub fn check_dump_diff(mut before: String, mut after: String, name: &str) -> Option<String> {
|
||||||
let mut after = get_dump(conn);
|
|
||||||
if after == before {
|
if after == before {
|
||||||
return;
|
return None;
|
||||||
}
|
}
|
||||||
// Ignore timestamp differences by removing timestamps
|
// Ignore timestamp differences by removing timestamps
|
||||||
for dump in [&mut before, &mut after] {
|
for dump in [&mut before, &mut after] {
|
||||||
|
@ -97,7 +173,7 @@ pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str)
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
});
|
});
|
||||||
if only_in_before.is_empty() && only_in_after.is_empty() {
|
if only_in_before.is_empty() && only_in_after.is_empty() {
|
||||||
return;
|
return None;
|
||||||
}
|
}
|
||||||
let after_has_more =
|
let after_has_more =
|
||||||
only_in_before.len() < only_in_after.len();
|
only_in_before.len() < only_in_after.len();
|
||||||
|
@ -142,13 +218,13 @@ pub fn check_dump_diff(conn: &mut PgConnection, mut before: String, name: &str)
|
||||||
}
|
}
|
||||||
.expect("failed to build string");
|
.expect("failed to build string");
|
||||||
}
|
}
|
||||||
write!(&mut output, "\n{most_similar_chunk_filtered}");
|
write!(&mut output, "\n{most_similar_chunk_filtered}").expect("");
|
||||||
if !chunks_gt.is_empty() {
|
if !chunks_gt.is_empty() {
|
||||||
chunks_gt.swap_remove(most_similar_chunk_index);}
|
chunks_gt.swap_remove(most_similar_chunk_index);}
|
||||||
}
|
}
|
||||||
// should have all been removed
|
// should have all been removed
|
||||||
assert_eq!(chunks_gt.len(), 0);
|
assert_eq!(chunks_gt.len(), 0);
|
||||||
panic!("{output}");
|
Some(output)
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo inline?
|
// todo inline?
|
||||||
|
|
Loading…
Reference in a new issue