Skip to content

Commit 481e792

Browse files
Gradual state reconstruction
Co-authored-by: Michael Sproul <[email protected]>
1 parent 2f6ffff commit 481e792

File tree

5 files changed

+108
-41
lines changed

5 files changed

+108
-41
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/beacon_chain/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ hex = "0.4.2"
6464
exit-future = "0.2.0"
6565
unused_port = {path = "../../common/unused_port"}
6666
oneshot_broadcast = { path = "../../common/oneshot_broadcast" }
67+
crossbeam-channel = "0.5.5"
6768

6869
[[test]]
6970
name = "beacon_chain_tests"

beacon_node/beacon_chain/src/migrate.rs

Lines changed: 88 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
77
use slog::{debug, error, info, trace, warn, Logger};
88
use std::collections::{HashMap, HashSet};
99
use std::mem;
10-
use std::sync::{mpsc, Arc};
10+
use std::sync::Arc;
1111
use std::thread;
1212
use std::time::{Duration, SystemTime, UNIX_EPOCH};
1313
use store::hot_cold_store::{migrate_database, HotColdDBError};
@@ -25,6 +25,7 @@ const MAX_COMPACTION_PERIOD_SECONDS: u64 = 604800;
2525
const MIN_COMPACTION_PERIOD_SECONDS: u64 = 7200;
2626
/// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`.
2727
const COMPACTION_FINALITY_DISTANCE: u64 = 1024;
28+
const BLOCKS_PER_RECONSTRUCTION: usize = 8192 * 4;
2829

2930
/// Default number of epochs to wait between finalization migrations.
3031
pub const DEFAULT_EPOCHS_PER_RUN: u64 = 4;
@@ -33,10 +34,14 @@ pub const DEFAULT_EPOCHS_PER_RUN: u64 = 4;
3334
/// to the cold database.
3435
pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
3536
db: Arc<HotColdDB<E, Hot, Cold>>,
36-
#[allow(clippy::type_complexity)]
37-
tx_thread: Option<Mutex<(mpsc::Sender<Notification>, thread::JoinHandle<()>)>>,
3837
/// Record of when the last migration ran, for enforcing `epochs_per_run`.
3938
prev_migration: Arc<Mutex<PrevMigration>>,
39+
tx_thread: Option<
40+
Mutex<(
41+
crossbeam_channel::Sender<Notification>,
42+
thread::JoinHandle<()>,
43+
)>,
44+
>,
4045
/// Genesis block root, for persisting the `PersistedBeaconChain`.
4146
genesis_block_root: Hash256,
4247
log: Logger,
@@ -112,11 +117,13 @@ pub enum PruningError {
112117
}
113118

114119
/// Message sent to the migration thread containing the information it needs to run.
120+
#[derive(Debug)]
115121
pub enum Notification {
116122
Finalization(FinalizationNotification),
117123
Reconstruction,
118124
}
119125

126+
#[derive(Clone, Debug)]
120127
pub struct FinalizationNotification {
121128
finalized_state_root: BeaconStateHash,
122129
finalized_checkpoint: Checkpoint,
@@ -203,7 +210,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
203210
}
204211

205212
pub fn run_reconstruction(db: Arc<HotColdDB<E, Hot, Cold>>, log: &Logger) {
206-
if let Err(e) = db.reconstruct_historic_states() {
213+
if let Err(e) = db.reconstruct_historic_states(None) {
207214
error!(
208215
log,
209216
"State reconstruction failed";
@@ -359,39 +366,83 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
359366
db: Arc<HotColdDB<E, Hot, Cold>>,
360367
prev_migration: Arc<Mutex<PrevMigration>>,
361368
log: Logger,
362-
) -> (mpsc::Sender<Notification>, thread::JoinHandle<()>) {
363-
let (tx, rx) = mpsc::channel();
369+
) -> (
370+
crossbeam_channel::Sender<Notification>,
371+
thread::JoinHandle<()>,
372+
) {
373+
let (tx, rx) = crossbeam_channel::unbounded();
374+
let tx_thread = tx.clone();
364375
let thread = thread::spawn(move || {
365-
while let Ok(notif) = rx.recv() {
366-
// Read the rest of the messages in the channel, preferring any reconstruction
367-
// notification, or the finalization notification with the greatest finalized epoch.
368-
let notif =
369-
rx.try_iter()
370-
.fold(notif, |best, other: Notification| match (&best, &other) {
371-
(Notification::Reconstruction, _)
372-
| (_, Notification::Reconstruction) => Notification::Reconstruction,
373-
(
374-
Notification::Finalization(fin1),
375-
Notification::Finalization(fin2),
376-
) => {
377-
if fin2.finalized_checkpoint.epoch > fin1.finalized_checkpoint.epoch
378-
{
379-
other
380-
} else {
381-
best
382-
}
383-
}
384-
});
376+
let mut sel = crossbeam_channel::Select::new();
377+
sel.recv(&rx);
378+
379+
loop {
380+
// Block until sth is in queue
381+
let _queue_size = sel.ready();
382+
let queue: Vec<Notification> = rx.try_iter().collect();
383+
debug!(
384+
log,
385+
"New worker thread poll";
386+
"queue" => ?queue
387+
);
388+
389+
// Find a reconstruction notification and best finalization notification.
390+
let reconstruction_notif = queue
391+
.iter()
392+
.find(|n| matches!(n, Notification::Reconstruction));
393+
let migrate_notif = queue
394+
.iter()
395+
.filter_map(|n| match n {
396+
// should not be present anymore
397+
Notification::Reconstruction => None,
398+
Notification::Finalization(f) => Some(f),
399+
})
400+
.max_by_key(|f| f.finalized_checkpoint.epoch);
401+
402+
// Do a bit of state reconstruction first if required.
403+
if let Some(_) = reconstruction_notif {
404+
let timer = std::time::Instant::now();
405+
406+
match db.reconstruct_historic_states(Some(BLOCKS_PER_RECONSTRUCTION)) {
407+
Err(Error::StateReconstructionDidNotComplete) => {
408+
info!(
409+
log,
410+
"Finished reconstruction batch";
411+
"batch_time_ms" => timer.elapsed().as_millis()
412+
);
413+
// Handle send error
414+
let _ = tx_thread.send(Notification::Reconstruction);
415+
}
416+
Err(e) => {
417+
error!(
418+
log,
419+
"State reconstruction failed";
420+
"error" => ?e,
421+
);
422+
}
423+
Ok(()) => {
424+
info!(
425+
log,
426+
"Finished state reconstruction";
427+
"batch_time_ms" => timer.elapsed().as_millis()
428+
);
429+
}
430+
}
431+
}
432+
433+
// Do the finalization migration.
434+
if let Some(notif) = migrate_notif {
435+
let timer = std::time::Instant::now();
385436

386-
// Do not run too frequently.
387-
if let Some(epoch) = notif.epoch() {
388437
let mut prev_migration = prev_migration.lock();
389438

439+
// Do not run too frequently.
440+
let epoch = notif.finalized_checkpoint.epoch;
390441
if let Some(prev_epoch) = prev_migration.epoch {
391442
if epoch < prev_epoch + prev_migration.epochs_per_run {
392443
debug!(
393444
log,
394-
"Database consolidation deferred";
445+
"Finalization migration deferred";
395446
"last_finalized_epoch" => prev_epoch,
396447
"new_finalized_epoch" => epoch,
397448
"epochs_per_run" => prev_migration.epochs_per_run,
@@ -404,11 +455,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
404455
// at which we ran. This value isn't tracked on disk so we will always migrate
405456
// on the first finalization after startup.
406457
prev_migration.epoch = Some(epoch);
407-
}
408458

409-
match notif {
410-
Notification::Reconstruction => Self::run_reconstruction(db.clone(), &log),
411-
Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log),
459+
Self::run_migration(db.clone(), notif.to_owned(), &log);
460+
461+
info!(
462+
log,
463+
"Finished finalization migration";
464+
"running_time_ms" => timer.elapsed().as_millis()
465+
);
412466
}
413467
}
414468
});
@@ -534,6 +588,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
534588

535589
for maybe_tuple in iter {
536590
let (block_root, state_root, slot) = maybe_tuple?;
591+
537592
let block_root = SignedBeaconBlockHash::from(block_root);
538593
let state_root = BeaconStateHash::from(state_root);
539594

beacon_node/store/src/chunked_vector.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,10 @@ fn range_query<S: KeyValueStore<E>, E: EthSpec, T: Decode + Encode>(
444444

445445
for chunk_index in range {
446446
let key = &chunk_key(chunk_index)[..];
447-
let chunk = Chunk::load(store, column, key)?.ok_or(ChunkError::Missing { chunk_index })?;
447+
let chunk = Chunk::load(store, column, key)?.ok_or(ChunkError::Missing {
448+
column,
449+
chunk_index,
450+
})?;
448451
result.push(chunk);
449452
}
450453

@@ -675,6 +678,7 @@ pub enum ChunkError {
675678
actual: usize,
676679
},
677680
Missing {
681+
column: DBColumn,
678682
chunk_index: usize,
679683
},
680684
MissingGenesisValue,

beacon_node/store/src/reconstruct.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ where
1616
Hot: KeyValueStore<E> + ItemStore<E>,
1717
Cold: KeyValueStore<E> + ItemStore<E>,
1818
{
19-
pub fn reconstruct_historic_states(self: &Arc<Self>) -> Result<(), Error> {
19+
pub fn reconstruct_historic_states(
20+
self: &Arc<Self>,
21+
num_blocks: Option<usize>,
22+
) -> Result<(), Error> {
2023
let mut anchor = if let Some(anchor) = self.get_anchor_info() {
2124
anchor
2225
} else {
@@ -48,12 +51,15 @@ where
4851
// Use a dummy root, as we never read the block for the upper limit state.
4952
let upper_limit_block_root = Hash256::repeat_byte(0xff);
5053

51-
let block_root_iter = self.forwards_block_roots_iterator(
52-
lower_limit_slot,
53-
upper_limit_state,
54-
upper_limit_block_root,
55-
&self.spec,
56-
)?;
54+
// If `num_blocks` is not specified iterate all blocks.
55+
let block_root_iter = self
56+
.forwards_block_roots_iterator(
57+
lower_limit_slot,
58+
upper_limit_state,
59+
upper_limit_block_root,
60+
&self.spec,
61+
)?
62+
.take(num_blocks.unwrap_or(usize::MAX));
5763

5864
// The state to be advanced.
5965
let mut state = self

0 commit comments

Comments
 (0)