Skip to content

Commit f914287

Browse files
committed
store: Do not hold lock on batch across await point
1 parent 275ce0c commit f914287

File tree

1 file changed

+77
-4
lines changed

1 file changed

+77
-4
lines changed

store/postgres/src/writable.rs

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,72 @@ impl BlockTracker {
638638
}
639639
}
640640

641+
/// A batch that is queued for writing. The read methods on the `Queue` like
642+
/// `get` and `get_many` need to be able to read from a batch until the
643+
/// database changes that the batch contains have been committed. At the
644+
/// same time, once the background writer starts processing a batch, it must
645+
/// not be modified. This enum makes it possible to do this without cloning
646+
/// the batch or holding a lock across an await point.
647+
///
648+
/// When a batch is first added to the queue, it can still be appended to
649+
/// (`Open`). Once the background writer starts processing the batch, it is
650+
/// closed (`Closed`) and can no longer be modified.
651+
///
652+
/// The `processed` flag in the `Request` is a shortcut to determine whether
653+
/// a batch can still be appended to
654+
enum QueuedBatch {
655+
/// An open batch that can still be appended to
656+
Open(Batch),
657+
/// A closed batch that can no longer be modified
658+
Closed(Arc<Batch>),
659+
/// Temporary placeholder during state transitions. Must never be
660+
/// observed outside of `QueuedBatch::close`.
661+
Invalid,
662+
}
663+
664+
impl Deref for QueuedBatch {
665+
type Target = Batch;
666+
667+
fn deref(&self) -> &Self::Target {
668+
match self {
669+
QueuedBatch::Open(batch) => batch,
670+
QueuedBatch::Closed(batch) => batch.as_ref(),
671+
QueuedBatch::Invalid => unreachable!("deref is never called on a QueuedBatch::Invalid"),
672+
}
673+
}
674+
}
675+
676+
impl QueuedBatch {
677+
/// Append another batch to this one. Returns an error if this batch is
678+
/// closed
679+
fn append(&mut self, other: Batch) -> Result<(), StoreError> {
680+
match self {
681+
QueuedBatch::Open(batch) => batch.append(other),
682+
QueuedBatch::Closed(_) => Err(internal_error!("attempt to append to closed batch")),
683+
QueuedBatch::Invalid => {
684+
unreachable!("append is never called on a QueuedBatch::Invalid")
685+
}
686+
}
687+
}
688+
689+
/// Close the current batch, i.e., replace it by a `Closed` batch and
690+
/// return a clone of the `Arc<Batch>` that the closed batch now holds
691+
fn close(&mut self) -> Arc<Batch> {
692+
let old = std::mem::replace(self, QueuedBatch::Invalid);
693+
*self = match old {
694+
QueuedBatch::Open(batch) => QueuedBatch::Closed(Arc::new(batch)),
695+
closed @ QueuedBatch::Closed(_) => closed,
696+
QueuedBatch::Invalid => unreachable!("close is never called on a QueuedBatch::Invalid"),
697+
};
698+
match self {
699+
QueuedBatch::Closed(batch) => batch.cheap_clone(),
700+
QueuedBatch::Open(_) | QueuedBatch::Invalid => {
701+
unreachable!("close must have set self to Closed")
702+
}
703+
}
704+
}
705+
}
706+
641707
/// A write request received from the `WritableStore` frontend that gets
642708
/// queued
643709
///
@@ -654,7 +720,12 @@ enum Request {
654720
// will try to read the batch. The batch only becomes truly readonly
655721
// when we decide to process it at which point we set `processed` to
656722
// `true`
657-
batch: RwLock<Batch>,
723+
batch: RwLock<QueuedBatch>,
724+
/// True if the background writer has started processing this
725+
/// request. It is guaranteed that once the batch is a
726+
/// `QueuedBatch::Closed`, this flag is true. This flag serves as a
727+
/// shortcut to check that without having to acquire the lock around
728+
/// the batch.
658729
processed: AtomicBool,
659730
},
660731
RevertTo {
@@ -699,7 +770,7 @@ impl Request {
699770
queued: Instant::now(),
700771
store,
701772
stopwatch,
702-
batch: RwLock::new(batch),
773+
batch: RwLock::new(QueuedBatch::Open(batch)),
703774
processed: AtomicBool::new(false),
704775
}
705776
}
@@ -741,15 +812,17 @@ impl Request {
741812
processed: _,
742813
} => {
743814
let start = Instant::now();
744-
let batch = batch.read().unwrap();
815+
816+
let batch = batch.write().unwrap().close();
817+
745818
if let Some(err) = &batch.error {
746819
// This can happen when appending to the batch failed
747820
// because of an internal error. Returning an `Err` here
748821
// will poison and shut down the queue
749822
return Err(err.clone());
750823
}
751824
let res = store
752-
.transact_block_operations(batch.deref(), stopwatch)
825+
.transact_block_operations(&batch, stopwatch)
753826
.await
754827
.map(|()| ExecResult::Continue);
755828
info!(store.logger, "Committed write batch";

0 commit comments

Comments
 (0)