From 93db35a41615a8c3f8e7d3b003874303a5fff1b9 Mon Sep 17 00:00:00 2001 From: Andrey Lepikhov Date: Wed, 17 Nov 2021 11:13:37 +0500 Subject: [PATCH] Add Commit Sequence Number (CSN) machinery into MVCC implementation for a timestamp-based resolving of visibility conflicts. It allows to achieve proper snapshot isolation semantics in the case of distributed transactions involving more than one Postgres instance. Authors: K.Knizhnik, S.Kelvich, A.Sher, A.Lepikhov, M.Usama. Discussion: (2020/05/21 -) https://www.postgresql.org/message-id/flat/CA%2Bfd4k6HE8xLGEvqWzABEg8kkju5MxU%2Bif7bf-md0_2pjzXp9Q%40mail.gmail.com#ed1359340871688bed2e643921f73365 (2018/05/01 - 2019/04/21) https://www.postgresql.org/message-id/flat/21BC916B-80A1-43BF-8650-3363CCDAE09C%40postgrespro.ru --- doc/src/sgml/config.sgml | 50 +- src/backend/access/rmgrdesc/Makefile | 1 + src/backend/access/rmgrdesc/csnlogdesc.c | 95 +++ src/backend/access/rmgrdesc/xlogdesc.c | 6 +- src/backend/access/transam/Makefile | 2 + src/backend/access/transam/csn_log.c | 748 ++++++++++++++++++ src/backend/access/transam/csn_snapshot.c | 687 ++++++++++++++++ src/backend/access/transam/rmgr.c | 1 + src/backend/access/transam/twophase.c | 153 ++++ src/backend/access/transam/varsup.c | 2 + src/backend/access/transam/xact.c | 32 + src/backend/access/transam/xlog.c | 23 +- src/backend/access/transam/xloginsert.c | 2 + src/backend/commands/vacuum.c | 3 +- src/backend/replication/logical/snapbuild.c | 4 + src/backend/storage/ipc/ipci.c | 6 + src/backend/storage/ipc/procarray.c | 87 ++ src/backend/storage/lmgr/lwlock.c | 2 + src/backend/storage/lmgr/lwlocknames.txt | 2 + src/backend/storage/lmgr/proc.c | 6 + src/backend/storage/sync/sync.c | 5 + src/backend/utils/misc/guc_tables.c | 37 + src/backend/utils/probes.d | 2 + src/backend/utils/time/snapmgr.c | 183 ++++- src/bin/initdb/initdb.c | 3 +- src/bin/pg_controldata/pg_controldata.c | 2 + src/bin/pg_upgrade/pg_upgrade.c | 5 + src/bin/pg_upgrade/pg_upgrade.h | 2 + src/bin/pg_waldump/csnlogdesc.c | 1 + src/bin/pg_waldump/rmgrdesc.c | 1 + src/include/access/csn_log.h | 99 +++ src/include/access/csn_snapshot.h | 54 ++ src/include/access/rmgrlist.h | 1 + src/include/access/xlog_internal.h | 2 + src/include/catalog/pg_control.h | 1 + src/include/catalog/pg_proc.dat | 17 + src/include/datatype/timestamp.h | 3 + src/include/fmgr.h | 1 + src/include/storage/lwlock.h | 1 + src/include/storage/proc.h | 14 + src/include/storage/procarray.h | 7 + src/include/storage/sync.h | 1 + src/include/utils/snapmgr.h | 7 +- src/include/utils/snapshot.h | 11 + src/test/modules/Makefile | 1 + src/test/modules/csnsnapshot/Makefile | 22 + .../csnsnapshot/expected/csnsnapshot.out | 1 + src/test/modules/csnsnapshot/t/001_base.pl | 100 +++ src/test/modules/csnsnapshot/t/002_standby.pl | 68 ++ .../csnsnapshot/t/003_parallel_safe.pl | 67 ++ src/test/modules/snapshot_too_old/sto.conf | 1 + src/test/perl/PostgreSQL/Test/Cluster.pm | 28 + src/test/regress/expected/sysviews.out | 4 +- 53 files changed, 2653 insertions(+), 11 deletions(-) create mode 100644 src/backend/access/rmgrdesc/csnlogdesc.c create mode 100644 src/backend/access/transam/csn_log.c create mode 100644 src/backend/access/transam/csn_snapshot.c create mode 120000 src/bin/pg_waldump/csnlogdesc.c create mode 100644 src/include/access/csn_log.h create mode 100644 src/include/access/csn_snapshot.h create mode 100644 src/test/modules/csnsnapshot/Makefile create mode 100644 src/test/modules/csnsnapshot/expected/csnsnapshot.out create mode 100644 src/test/modules/csnsnapshot/t/001_base.pl create mode 100644 src/test/modules/csnsnapshot/t/002_standby.pl create mode 100644 src/test/modules/csnsnapshot/t/003_parallel_safe.pl diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index eaf4dc9bd42..2ea953f5726 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -10295,8 +10295,56 @@ dynamic_library_path = 'C:\tools\postgresql;H:\my_project\lib;$libdir' - + + CSN Based Snapshot + + By default, snapshots in PostgreSQL contains a + XID (TransactionID) that allows to identify the status of a transaction + and make arbitrary visibility calculations. + + + + PostgreSQL also provides a CSN (Commit + Sequence Number) based machinery as an additional tool for visibility + calculations. It may be used within distributed transactions when a xid of + a local transaction can't correctly identify order of the distributed one. + + + + + enable_csn_snapshot (boolean) + + enable_csn_snapshot configuration parameter + + + + + + Enable/disable the CSN tracking for the snapshot. + + + + PostgreSQL uses a physical clock timestamp as + a CSN, so enabling the CSN based snapshots can be useful for implementing + cross-instance snapshots and visibility of distributed transaction. + + + + when enabled PostgreSQL creates + pg_csn directory under PGDATA to keep + the track of CSN and XID mappings. + + + + The default value is on. + + + + + + + Version and Platform Compatibility diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index cd95eec37f1..4b36d64eb6f 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -11,6 +11,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ brindesc.o \ clogdesc.o \ + csnlogdesc.o \ committsdesc.o \ dbasedesc.o \ genericdesc.o \ diff --git a/src/backend/access/rmgrdesc/csnlogdesc.c b/src/backend/access/rmgrdesc/csnlogdesc.c new file mode 100644 index 00000000000..f8c644e9064 --- /dev/null +++ b/src/backend/access/rmgrdesc/csnlogdesc.c @@ -0,0 +1,95 @@ +/*------------------------------------------------------------------------- + * + * clogdesc.c + * rmgr descriptor routines for access/transam/csn_log.c + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/csnlogdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/csn_log.h" + + +void +csnlog_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_CSN_ZEROPAGE) + { + int pageno; + + memcpy(&pageno, XLogRecGetData(record), sizeof(int)); + appendStringInfo(buf, "pageno %d", pageno); + } + else if (info == XLOG_CSN_TRUNCATE) + { + int pageno; + + memcpy(&pageno, XLogRecGetData(record), sizeof(int)); + appendStringInfo(buf, "pageno %d", pageno); + } + else if (info == XLOG_CSN_ASSIGNMENT) + { + CSN csn; + + memcpy(&csn, XLogRecGetData(record), sizeof(CSN)); + appendStringInfo(buf, "assign "INT64_FORMAT"", csn); + } + else if (info == XLOG_CSN_SETCSN) + { + xl_csn_set *xlrec = (xl_csn_set *) rec; + int nsubxids; + + appendStringInfo(buf, "set "INT64_FORMAT" for: %u", + xlrec->csn, + xlrec->xtop); + nsubxids = ((XLogRecGetDataLen(record) - MinSizeOfCSNSet) / + sizeof(TransactionId)); + if (nsubxids > 0) + { + int i; + TransactionId *subxids; + + subxids = palloc(sizeof(TransactionId) * nsubxids); + memcpy(subxids, + XLogRecGetData(record) + MinSizeOfCSNSet, + sizeof(TransactionId) * nsubxids); + for (i = 0; i < nsubxids; i++) + appendStringInfo(buf, ", %u", subxids[i]); + pfree(subxids); + } + } +} + +const char * +csnlog_identify(uint8 info) +{ + const char *id = NULL; + + switch (info & ~XLR_INFO_MASK) + { + case XLOG_CSN_ASSIGNMENT: + id = "ASSIGNMENT"; + break; + case XLOG_CSN_SETCSN: + id = "SETCSN"; + break; + case XLOG_CSN_ZEROPAGE: + id = "ZEROPAGE"; + break; + case XLOG_CSN_TRUNCATE: + id = "TRUNCATE"; + break; + } + + return id; +} diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index f390c177e4d..76619c6297c 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -113,7 +113,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "max_connections=%d max_worker_processes=%d " "max_wal_senders=%d max_prepared_xacts=%d " "max_locks_per_xact=%d wal_level=%s " - "wal_log_hints=%s track_commit_timestamp=%s", + "wal_log_hints=%s track_commit_timestamp=%s " + "enable_csn_snapshot=%s", xlrec.MaxConnections, xlrec.max_worker_processes, xlrec.max_wal_senders, @@ -121,7 +122,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record) xlrec.max_locks_per_xact, wal_level_str, xlrec.wal_log_hints ? "on" : "off", - xlrec.track_commit_timestamp ? "on" : "off"); + xlrec.track_commit_timestamp ? "on" : "off", + xlrec.enable_csn_snapshot ? "on" : "off"); } else if (info == XLOG_FPW_CHANGE) { diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 661c55a9db7..73081d4d77b 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -15,6 +15,8 @@ include $(top_builddir)/src/Makefile.global OBJS = \ clog.o \ commit_ts.o \ + csn_log.o \ + csn_snapshot.o \ generic_xlog.o \ multixact.o \ parallel.o \ diff --git a/src/backend/access/transam/csn_log.c b/src/backend/access/transam/csn_log.c new file mode 100644 index 00000000000..6b19551c537 --- /dev/null +++ b/src/backend/access/transam/csn_log.c @@ -0,0 +1,748 @@ +/*----------------------------------------------------------------------------- + * + * csn_log.c + * Track commit sequence numbers of finished transactions + * + * This module provides SLRU to store CSN for each transaction. This + * mapping need to be kept only for xid's greater then oldestXid, but + * that can require arbitrary large amounts of memory in case of long-lived + * transactions. Because of same lifetime and persistancy requirements + * this module is quite similar to subtrans.c + * + * If we switch database from CSN-base snapshot to xid-base snapshot then, + * nothing wrong. But if we switch xid-base snapshot to CSN-base snapshot + * it should decide a new xid which begin csn-base check. It can not be + * oldestActiveXID because of prepared transaction. + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/transam/csn_log.c + * + *----------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/csn_log.h" +#include "access/slru.h" +#include "access/csn_snapshot.h" +#include "access/subtrans.h" +#include "access/transam.h" +#include "access/xlogutils.h" +#include "miscadmin.h" +#include "pg_trace.h" +#include "portability/instr_time.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "utils/snapmgr.h" +#include "access/xlog_internal.h" + +/* + * We use csnSnapshotActive to judge if csn snapshot enabled instead of by + * enable_csn_snapshot, this design is similar to 'track_commit_timestamp'. + * + * Because in process of replication if master changes 'enable_csn_snapshot' + * in a database restart, standby should apply wal record for GUC changed, + * then it's difficult to notice all backends about that. So they can get + * the message by 'csnSnapshotActive' which in shared buffer. It will not + * acquire a lock, so without performance issue. + * last_max_csn - Record the max csn till now. + * last_csn_log_wal - for interval we log the assign csn to wal + * oldestXmin - first sensible Xmin on the first existed page in the CSN Log + */ +typedef struct CSNShared +{ + bool csnSnapshotActive; + pg_atomic_uint32 oldestXmin; + CSN last_max_csn; + CSN last_csn_log_wal; + volatile slock_t lock; +} CSNShared; + +CSNShared *csnShared; + +/* + * Defines for CSNLog page sizes. A page is the same BLCKSZ as is used + * everywhere else in Postgres. + * + * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, + * CSNLog page numbering also wraps around at + * 0xFFFFFFFF/CSN_LOG_XACTS_PER_PAGE, and CSNLog segment numbering at + * 0xFFFFFFFF/CLOG_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no + * explicit notice of that fact in this module, except when comparing segment + * and page numbers in TruncateCSNLog (see CSNLogPagePrecedes). + */ + +/* We store the commit CSN for each xid */ +#define CSN_LOG_XACTS_PER_PAGE (BLCKSZ / sizeof(CSN)) + +#define TransactionIdToPage(xid) ((xid) / (TransactionId) CSN_LOG_XACTS_PER_PAGE) +#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CSN_LOG_XACTS_PER_PAGE) + +/* + * Link to shared-memory data structures for CLOG control + */ +static SlruCtlData CSNLogCtlData; +#define CsnlogCtl (&CSNLogCtlData) + +static int ZeroCSNLogPage(int pageno, bool write_xlog); +static void ZeroTruncateCSNLogPage(int pageno, bool write_xlog); +static bool CSNLogPagePrecedes(int page1, int page2); +static void CSNLogSetPageStatus(TransactionId xid, int nsubxids, + TransactionId *subxids, + CSN csn, int pageno); +static void CSNLogSetCSNInSlot(TransactionId xid, CSN csn, int slotno); + +static void WriteCSNXlogRec(TransactionId xid, int nsubxids, + TransactionId *subxids, CSN csn); +static void WriteZeroCSNPageXlogRec(int pageno); +static void WriteTruncateCSNXlogRec(int pageno); +static void set_oldest_xmin(TransactionId xid); + + +/* + * Number of shared CSNLog buffers. + */ +static Size +CSNLogShmemBuffers(void) +{ + return Min(32, Max(4, NBuffers / 512)); +} + +/* + * Reserve shared memory for CsnlogCtl. + */ +Size +CSNLogShmemSize(void) +{ + return SimpleLruShmemSize(CSNLogShmemBuffers(), 0); +} + +/* + * Initialization of shared memory for CSNLog. + */ +void +CSNLogShmemInit(void) +{ + bool found; + + CsnlogCtl->PagePrecedes = CSNLogPagePrecedes; + SimpleLruInit(CsnlogCtl, "CSNLog Ctl", CSNLogShmemBuffers(), 0, + CSNLogSLRULock, "pg_csn", LWTRANCHE_CSN_LOG_BUFFERS, + SYNC_HANDLER_CSN); + + csnShared = ShmemInitStruct("CSNlog shared", + sizeof(CSNShared), + &found); + if (!found) + { + csnShared->csnSnapshotActive = false; + pg_atomic_init_u32(&csnShared->oldestXmin, InvalidTransactionId); + csnShared->last_max_csn = InvalidCSN; + csnShared->last_csn_log_wal = InvalidCSN; + SpinLockInit(&csnShared->lock); + } +} + +/* + * CSNLogSetCSN + * + * Record CSN of transaction and its subtransaction tree. + * + * xid is a single xid to set status for. This will typically be the top level + * transactionid for a top level commit or abort. It can also be a + * subtransaction when we record transaction aborts. + * + * subxids is an array of xids of length nsubxids, representing subtransactions + * in the tree of xid. In various cases nsubxids may be zero. + * + * csn is the commit sequence number of the transaction. It should be + * AbortedCSN for abort cases. + */ +void +CSNLogSetCSN(TransactionId xid, int nsubxids, TransactionId *subxids, CSN csn, + bool write_xlog) +{ + int pageno; + int i = 0; + int offset = 0; + + Assert(TransactionIdIsValid(xid)); + + pageno = TransactionIdToPage(xid); /* get page of parent */ + + if(write_xlog) + WriteCSNXlogRec(xid, nsubxids, subxids, csn); + + for (;;) + { + int num_on_page = 0; + + /* Form subtransactions bucket that can be written on the same page */ + while (i < nsubxids && TransactionIdToPage(subxids[i]) == pageno) + { + num_on_page++; + i++; + } + + CSNLogSetPageStatus(xid, + num_on_page, subxids + offset, + csn, pageno); + if (i >= nsubxids) + break; + + offset = i; + pageno = TransactionIdToPage(subxids[offset]); + xid = InvalidTransactionId; + } +} + +/* + * Record the final state of transaction entries in the csn log for + * all entries on a single page. Atomic only on this page. + * + * Otherwise API is same as TransactionIdSetTreeStatus() + */ +static void +CSNLogSetPageStatus(TransactionId xid, int nsubxids, TransactionId *subxids, + CSN csn, int pageno) +{ + int slotno; + int i; + + LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE); + + slotno = SimpleLruReadPage(CsnlogCtl, pageno, true, xid); + + /* Subtransactions first, if needed ... */ + for (i = 0; i < nsubxids; i++) + { + Assert(CsnlogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i])); + CSNLogSetCSNInSlot(subxids[i], csn, slotno); + } + + /* ... then the main transaction */ + if (TransactionIdIsValid(xid)) + CSNLogSetCSNInSlot(xid, csn, slotno); + + CsnlogCtl->shared->page_dirty[slotno] = true; + + LWLockRelease(CSNLogSLRULock); +} + +/* + * Sets the commit status of a single transaction. + */ +static void +CSNLogSetCSNInSlot(TransactionId xid, CSN csn, int slotno) +{ + int entryno = TransactionIdToPgIndex(xid); + CSN *ptr; + + Assert(LWLockHeldByMe(CSNLogSLRULock)); + + ptr = (CSN *) (CsnlogCtl->shared->page_buffer[slotno] + + entryno * sizeof(CSN)); + *ptr = csn; +} + +/* + * Interrogate the state of a transaction in the log. + * + * NB: this is a low-level routine and is NOT the preferred entry point + * for most uses; TransactionIdGetCSN() in csn_snapshot.c is the + * intended caller. + */ +CSN +CSNLogGetCSNByXid(TransactionId xid) +{ + int pageno = TransactionIdToPage(xid); + int entryno = TransactionIdToPgIndex(xid); + int slotno; + CSN csn; + + /* lock is acquired by SimpleLruReadPage_ReadOnly */ + slotno = SimpleLruReadPage_ReadOnly(CsnlogCtl, pageno, xid); + csn = *(CSN *) (CsnlogCtl->shared->page_buffer[slotno] + + entryno * sizeof(CSN)); + LWLockRelease(CSNLogSLRULock); + + return csn; +} + +/* + * Initialize (or reinitialize) a page of CSNLog to zeroes. + * + * The page is not actually written, just set up in shared memory. + * The slot number of the new page is returned. + * + * Control lock must be held at entry, and will be held at exit. + */ +static int +ZeroCSNLogPage(int pageno, bool write_xlog) +{ + Assert(LWLockHeldByMe(CSNLogSLRULock)); + if(write_xlog) + WriteZeroCSNPageXlogRec(pageno); + return SimpleLruZeroPage(CsnlogCtl, pageno); +} + +static void +ZeroTruncateCSNLogPage(int pageno, bool write_xlog) +{ + if(write_xlog) + WriteTruncateCSNXlogRec(pageno); + SimpleLruTruncate(CsnlogCtl, pageno); +} + +void +ActivateCSNlog(void) +{ + int pageno; + TransactionId nextXid = InvalidTransactionId; + TransactionId oldest_xid = InvalidTransactionId; + + if (csnShared->csnSnapshotActive) + return; + + nextXid = XidFromFullTransactionId(ShmemVariableCache->nextXid); + pageno = TransactionIdToPage(nextXid); + + LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE); + + /* + * Create the current segment file, if necessary. + * This means that + */ + if (!SimpleLruDoesPhysicalPageExist(CsnlogCtl, pageno)) + { + int slotno; + TransactionId curxid = nextXid; + + slotno = ZeroCSNLogPage(pageno, false); + SimpleLruWritePage(CsnlogCtl, slotno); + + elog(LOG, "Create SLRU page=%d, slotno=%d for xid %u on a CSN log activation", + pageno, slotno, nextXid); + + /* + * nextXid isn't first xid on the page. It is the first page in the CSN + * log. Set UnclearCSN value into all previous slots on this page. + * This xid value can be used as an oldest xid in the CSN log. + */ + if (TransactionIdToPgIndex(nextXid) > 0) + { + /* Cleaning procedure. Can be optimized. */ + do + { + curxid--; + CSNLogSetCSNInSlot(curxid, UnclearCSN, slotno); + } while (TransactionIdToPgIndex(curxid) > 0); + + elog(LOG, + "Set UnclearCSN values for %d xids in the range [%u,%u]", + nextXid - curxid, curxid, nextXid-1); + + /* Oldest XID found on this page */ + oldest_xid = nextXid; + } + } + LWLockRelease(CSNLogSLRULock); + + if (!TransactionIdIsValid(oldest_xid)) + { + TransactionId curxid; + + elog(LOG, "Search for the oldest xid across previous pages"); + + /* Need to scan previous pages for an oldest xid. */ + while (pageno > 0 && SimpleLruDoesPhysicalPageExist(CsnlogCtl, pageno - 1)) + pageno--; + + /* look up for the first clear xid value. */ + curxid = pageno * (TransactionId) CSN_LOG_XACTS_PER_PAGE; + while(CSNLogGetCSNByXid(curxid) == UnclearCSN) + curxid++; + oldest_xid = curxid; + } + + set_oldest_xmin(oldest_xid); + csnShared->csnSnapshotActive = true; +} + +bool +get_csnlog_status(void) +{ + return csnShared->csnSnapshotActive; +} + +void +DeactivateCSNlog(void) +{ + csnShared->csnSnapshotActive = false; + set_oldest_xmin(InvalidTransactionId); + LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE); + (void) SlruScanDirectory(CsnlogCtl, SlruScanDirCbDeleteAll, NULL); + LWLockRelease(CSNLogSLRULock); + elog(LOG, "CSN log has deactivated"); +} + +void +StartupCSN(void) +{ + ActivateCSNlog(); +} + +void +CompleteCSNInitialization(void) +{ + /* + * If the feature is not enabled, turn it off for good. This also removes + * any leftover data. + * + * Conversely, we activate the module if the feature is enabled. This is + * necessary for primary and standby as the activation depends on the + * control file contents at the beginning of recovery or when a + * XLOG_PARAMETER_CHANGE is replayed. + */ + if (!enable_csn_snapshot) + DeactivateCSNlog(); + else + ActivateCSNlog(); +} + +void +CSNlogParameterChange(bool newvalue, bool oldvalue) +{ + if (newvalue) + { + if (!csnShared->csnSnapshotActive) + ActivateCSNlog(); + } + else if (csnShared->csnSnapshotActive) + DeactivateCSNlog(); +} + +/* + * Perform a checkpoint --- either during shutdown, or on-the-fly + */ +void +CheckPointCSNLog(void) +{ + if (!get_csnlog_status()) + return; + + /* + * Flush dirty CSNLog pages to disk. + * + * This is not actually necessary from a correctness point of view. We do + * it merely to improve the odds that writing of dirty pages is done by + * the checkpoint process and not by backends. + */ + TRACE_POSTGRESQL_CSNLOG_CHECKPOINT_START(true); + SimpleLruWriteAll(CsnlogCtl, true); + TRACE_POSTGRESQL_CSNLOG_CHECKPOINT_DONE(true); +} + +/* + * Make sure that CSNLog has room for a newly-allocated XID. + * + * NB: this is called while holding XidGenLock. We want it to be very fast + * most of the time; even when it's not so fast, no actual I/O need happen + * unless we're forced to write out a dirty clog or xlog page to make room + * in shared memory. + */ +void +ExtendCSNLog(TransactionId newestXact) +{ + int pageno; + + if (!get_csnlog_status()) + return; + + /* + * No work except at first XID of a page. But beware: just after + * wraparound, the first XID of page zero is FirstNormalTransactionId. + */ + if (TransactionIdToPgIndex(newestXact) != 0 && + !TransactionIdEquals(newestXact, FirstNormalTransactionId)) + return; + + pageno = TransactionIdToPage(newestXact); + + LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE); + + /* Zero the page and make an XLOG entry about it */ + ZeroCSNLogPage(pageno, !InRecovery); + + LWLockRelease(CSNLogSLRULock); +} + +/* + * Remove all CSNLog segments before the one holding the passed + * transaction ID. + * + * This is normally called during checkpoint, with oldestXact being the + * oldest TransactionXmin of any running transaction. + */ +void +TruncateCSNLog(TransactionId oldestXact) +{ + int cutoffPage; + TransactionId oldestXmin; + + /* Can't do truncation because WAL messages isn't allowed during recovery */ + if (RecoveryInProgress() || !get_csnlog_status()) + return; + + /* + * The cutoff point is the start of the segment containing oldestXact. We + * pass the *page* containing oldestXact to SimpleLruTruncate. We step + * back one transaction to avoid passing a cutoff page that hasn't been + * created yet in the rare case that oldestXact would be the first item on + * a page and oldestXact == next XID. In that case, if we didn't subtract + * one, we'd trigger SimpleLruTruncate's wraparound detection. + */ + TransactionIdRetreat(oldestXact); + cutoffPage = TransactionIdToPage(oldestXact); + + /* Detect, that we really need to cut CSN log. */ + oldestXmin = pg_atomic_read_u32(&csnShared->oldestXmin); + + if (TransactionIdToPage(oldestXmin) < cutoffPage) + { + /* OldestXact is located in the same page as oldestXmin. No actions needed. */ + return; + } + + /* + * Shift oldestXmin to the start of new first page. Use first position + * on the page because all transactions on this page is created with enabled + * CSN snapshot machinery. + */ + pg_atomic_write_u32(&csnShared->oldestXmin, + oldestXact - TransactionIdToPgIndex(oldestXact)); + + SpinLockRelease(&csnShared->lock); + ZeroTruncateCSNLogPage(cutoffPage, true); +} + +/* + * Decide which of two CSNLog page numbers is "older" for truncation + * purposes. + * + * We need to use comparison of TransactionIds here in order to do the right + * thing with wraparound XID arithmetic. However, if we are asked about + * page number zero, we don't want to hand InvalidTransactionId to + * TransactionIdPrecedes: it'll get weird about permanent xact IDs. So, + * offset both xids by FirstNormalTransactionId to avoid that. + */ +static bool +CSNLogPagePrecedes(int page1, int page2) +{ + TransactionId xid1; + TransactionId xid2; + + xid1 = ((TransactionId) page1) * CSN_LOG_XACTS_PER_PAGE; + xid1 += FirstNormalTransactionId; + xid2 = ((TransactionId) page2) * CSN_LOG_XACTS_PER_PAGE; + xid2 += FirstNormalTransactionId; + + return TransactionIdPrecedes(xid1, xid2); +} + +void +WriteAssignCSNXlogRec(CSN csn) +{ + Assert(enable_csn_wal && csn <= csnShared->last_csn_log_wal); + + XLogBeginInsert(); + XLogRegisterData((char *) (&csn), sizeof(CSN)); + XLogInsert(RM_CSNLOG_ID, XLOG_CSN_ASSIGNMENT); +} + +static void +WriteCSNXlogRec(TransactionId xid, int nsubxids, + TransactionId *subxids, CSN csn) +{ + xl_csn_set xlrec; + + if(!enable_csn_wal) + return; + + xlrec.xtop = xid; + xlrec.nsubxacts = nsubxids; + xlrec.csn = csn; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, MinSizeOfCSNSet); + XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId)); + XLogInsert(RM_CSNLOG_ID, XLOG_CSN_SETCSN); +} + +/* + * Write a ZEROPAGE xlog record + */ +static void +WriteZeroCSNPageXlogRec(int pageno) +{ + if(!enable_csn_wal) + { + return; + } + XLogBeginInsert(); + XLogRegisterData((char *) (&pageno), sizeof(int)); + (void) XLogInsert(RM_CSNLOG_ID, XLOG_CSN_ZEROPAGE); +} + +/* + * Write a TRUNCATE xlog record + */ +static void +WriteTruncateCSNXlogRec(int pageno) +{ + if(!enable_csn_wal) + { + return; + } + XLogBeginInsert(); + XLogRegisterData((char *) (&pageno), sizeof(int)); + XLogInsert(RM_CSNLOG_ID, XLOG_CSN_TRUNCATE); +} + + +void +csnlog_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + /* Backup blocks are not used in csnlog records */ + Assert(!XLogRecHasAnyBlockRefs(record)); + + if (info == XLOG_CSN_ASSIGNMENT) + { + CSN csn; + + memcpy(&csn, XLogRecGetData(record), sizeof(CSN)); + /* XXX: Do we really not needed to acquire the lock here? */ + csnShared->last_max_csn = csn; + } + else if (info == XLOG_CSN_SETCSN) + { + xl_csn_set *xlrec = (xl_csn_set *) XLogRecGetData(record); + CSNLogSetCSN(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub, xlrec->csn, false); + } + else if (info == XLOG_CSN_ZEROPAGE) + { + int pageno; + int slotno; + + memcpy(&pageno, XLogRecGetData(record), sizeof(int)); + LWLockAcquire(CSNLogSLRULock, LW_EXCLUSIVE); + slotno = ZeroCSNLogPage(pageno, false); + SimpleLruWritePage(CsnlogCtl, slotno); + LWLockRelease(CSNLogSLRULock); + Assert(!CsnlogCtl->shared->page_dirty[slotno]); + + } + else if (info == XLOG_CSN_TRUNCATE) + { + int pageno; + + memcpy(&pageno, XLogRecGetData(record), sizeof(int)); + CsnlogCtl->shared->latest_page_number = pageno; + ZeroTruncateCSNLogPage(pageno, false); + } + else + elog(PANIC, "csnlog_redo: unknown op code %u", info); +} + +/* + * Entrypoint for sync.c to sync members files. + */ +int +csnsyncfiletag(const FileTag *ftag, char *path) +{ + return SlruSyncFileTag(&CSNLogCtlData, ftag, path); +} + +/* + * GenerateCSN + * + * Generate CSN which is actually a local time. Also we are forcing + * this time to be always increasing. Since now it is not uncommon to have + * millions of read transactions per second we are trying to use nanoseconds + * if such time resolution is available. + */ +CSN +GenerateCSN(bool locked, CSN assign) +{ + instr_time current_time; + CSN csn; + CSN log_csn = InvalidCSN; + + Assert(get_csnlog_status() || csn_snapshot_defer_time > 0); + + /* TODO: create some macro that add small random shift to current time. */ + INSTR_TIME_SET_CURRENT(current_time); + csn = (CSN) INSTR_TIME_GET_NANOSEC(current_time) + (int64) (csn_time_shift * 1E9); + + if(assign != InvalidCSN && csn < assign) + csn = assign; + + /* TODO: change to atomics? */ + if (!locked) + SpinLockAcquire(&csnShared->lock); + + if (csn <= csnShared->last_max_csn) + csn = csnShared->last_max_csn + 1; + csnShared->last_max_csn = csn; + + if (enable_csn_wal && csn > csnShared->last_csn_log_wal) + { + /* + * We log the CSN 5s greater than generated, you can see comments on + * the CSN_ASSIGN_TIME_INTERVAL. + */ + log_csn = CSNAddByNanosec(csn, CSN_ASSIGN_TIME_INTERVAL); + csnShared->last_csn_log_wal = log_csn; + } + + if (!locked) + SpinLockRelease(&csnShared->lock); + + if (log_csn != InvalidCSN) + WriteAssignCSNXlogRec(csn); + + return csn; +} + +CSN +GetLastGeneratedCSN(void) +{ + CSN csn; + + SpinLockAcquire(&csnShared->lock); + csn = csnShared->last_max_csn; + SpinLockRelease(&csnShared->lock); + return csn; +} + +/* + * Mostly for debug purposes. + */ +static void +set_oldest_xmin(TransactionId xid) +{ + elog(LOG, "Oldest Xmin for CSN will be changed from %u to %u", + pg_atomic_read_u32(&csnShared->oldestXmin), xid); + + pg_atomic_write_u32(&csnShared->oldestXmin, xid); +} + +TransactionId +GetOldestXmin(void) +{ + Assert(get_csnlog_status()); + return pg_atomic_read_u32(&csnShared->oldestXmin); +} diff --git a/src/backend/access/transam/csn_snapshot.c b/src/backend/access/transam/csn_snapshot.c new file mode 100644 index 00000000000..b1c051bae9b --- /dev/null +++ b/src/backend/access/transam/csn_snapshot.c @@ -0,0 +1,687 @@ +/*------------------------------------------------------------------------- + * + * csn_snapshot.c + * Support for cross-node snapshot isolation. + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/transam/csn_snapshot.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/csn_snapshot.h" +#include "access/subtrans.h" +#include "access/transam.h" +#include "access/twophase.h" +#include "access/xact.h" +#include "portability/instr_time.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/snapmgr.h" +#include "miscadmin.h" + +/* Raise a warning if imported snapshot_csn exceeds ours by this value. */ +#define SNAP_DESYNC_COMPLAIN (1*NSECS_PER_SEC) /* 1 second */ + +static TransactionId xmin_for_csn = InvalidTransactionId; + + +/* + * GUC to delay advance of oldestXid for this amount of time. Also determines + * the size CSNSnapshotXidMap circular buffer. + */ +int csn_snapshot_defer_time; + +int csn_time_shift; + +/* + * CSNSnapshotXidMap + * + * To be able to install csn snapshot that points to past we need to keep + * old versions of tuples and therefore delay advance of oldestXid. Here we + * keep track of correspondence between snapshot's snapshot_csn and oldestXid + * that was set at the time when the snapshot was taken. Much like the + * snapshot too old's OldSnapshotControlData does, but with finer granularity + * to seconds. + * + * Different strategies can be employed to hold oldestXid (e.g. we can track + * oldest csn-based snapshot among cluster nodes and map it oldestXid + * on each node). + * + * On each snapshot acquisition CSNSnapshotMapXmin() is called and stores + * correspondence between current snapshot_csn and oldestXmin in a sparse way: + * snapshot_csn is rounded to seconds (and here we use the fact that snapshot_csn + * is just a timestamp) and oldestXmin is stored in the circular buffer where + * rounded snapshot_csn acts as an offset from current circular buffer head. + * Size of the circular buffer is controlled by csn_snapshot_defer_time GUC. + * + * When csn snapshot arrives we check that its + * snapshot_csn is still in our map, otherwise we'll error out with "snapshot too + * old" message. If snapshot_csn is successfully mapped to oldestXid we move + * backend's pgxact->xmin to proc->originalXmin and fill pgxact->xmin to + * mapped oldestXid. That way GetOldestXmin() can take into account backends + * with imported csn snapshot and old tuple versions will be preserved. + * + * Also while calculating oldestXmin for our map in presence of imported + * csn snapshots we should use proc->originalXmin instead of pgxact->xmin + * that was set during import. Otherwise, we can create a feedback loop: + * xmin's of imported csn snapshots were calculated using our map and new + * entries in map going to be calculated based on that xmin's, and there is + * a risk to stuck forever with one non-increasing oldestXmin. All other + * callers of GetOldestXmin() are using pgxact->xmin so the old tuple versions + * are preserved. + */ +typedef struct CSNSnapshotXidMap +{ + int head; /* offset of current freshest value */ + int size; /* total size of circular buffer */ + CSN_atomic last_csn_seconds; /* last rounded csn that changed + * xmin_by_second[] */ + TransactionId *xmin_by_second; /* circular buffer of oldestXmin's */ +} +CSNSnapshotXidMap; + +static CSNSnapshotXidMap *csnXidMap; + + +/* Estimate shared memory space needed */ +Size +CSNSnapshotShmemSize(void) +{ + Size size = 0; + + if (csn_snapshot_defer_time > 0) + { + size += sizeof(CSNSnapshotXidMap); + size += csn_snapshot_defer_time*sizeof(TransactionId); + size = MAXALIGN(size); + } + + return size; +} + +/* Init shared memory structures */ +void +CSNSnapshotShmemInit() +{ + bool found; + + if (csn_snapshot_defer_time > 0) + { + csnXidMap = ShmemInitStruct("csnXidMap", + sizeof(CSNSnapshotXidMap), + &found); + if (!found) + { + int i; + + pg_atomic_init_u64(&csnXidMap->last_csn_seconds, 0); + csnXidMap->head = 0; + csnXidMap->size = csn_snapshot_defer_time; + csnXidMap->xmin_by_second = + ShmemAlloc(sizeof(TransactionId)*csnXidMap->size); + + for (i = 0; i < csnXidMap->size; i++) + csnXidMap->xmin_by_second[i] = InvalidTransactionId; + } + } +} + +/* + * CSNSnapshotStartup + * + * Set csnXidMap entries to oldestActiveXID during startup. + */ +void +CSNSnapshotStartup(TransactionId oldestActiveXID) +{ + /* + * Run only if we have initialized shared memory and csnXidMap + * is enabled. + */ + if (IsNormalProcessingMode() && + enable_csn_snapshot && csn_snapshot_defer_time > 0) + { + int i; + + Assert(TransactionIdIsValid(oldestActiveXID)); + for (i = 0; i < csnXidMap->size; i++) + csnXidMap->xmin_by_second[i] = oldestActiveXID; + ProcArraySetCSNSnapshotXmin(oldestActiveXID); + + elog(LOG, "CSN map initialized with oldest active xid %u", oldestActiveXID); + } +} + +/* + * CSNSnapshotMapXmin + * + * Maintain circular buffer of oldestXmins for several seconds in past. This + * buffer allows to shift oldestXmin in the past when backend is importing + * CSN snapshot. Otherwise old versions of tuples that were needed for + * this transaction can be recycled by other processes (vacuum, HOT, etc). + * + * Locking here is not trivial. Called upon each snapshot creation after + * ProcArrayLock is released. Such usage creates several race conditions. It + * is possible that backend who got csn called CSNSnapshotMapXmin() + * only after other backends managed to get snapshot and complete + * CSNSnapshotMapXmin() call, or even committed. This is safe because + * + * * We already hold our xmin in MyPgXact, so our snapshot will not be + * harmed even though ProcArrayLock is released. + * + * * snapshot_csn is always pessmistically rounded up to the next + * second. + * + * * For performance reasons, xmin value for particular second is filled + * only once. Because of that instead of writing to buffer just our + * xmin (which is enough for our snapshot), we bump oldestXmin there -- + * it mitigates the possibility of damaging someone else's snapshot by + * writing to the buffer too advanced value in case of slowness of + * another backend who generated csn earlier, but didn't manage to + * insert it before us. + * + * * if CSNSnapshotMapXmin() founds a gap in several seconds between + * current call and latest completed call then it should fill that gap + * with latest known values instead of new one. Otherwise it is + * possible (however highly unlikely) that this gap also happend + * between taking snapshot and call to CSNSnapshotMapXmin() for some + * backend. And we are at risk to fill circullar buffer with + * oldestXmin's that are bigger then they actually were. + */ +void +CSNSnapshotMapXmin(SnapshotCSN snapshot_csn) +{ + int offset, gap, i; + SnapshotCSN csn_seconds; + SnapshotCSN last_csn_seconds; + volatile TransactionId oldest_deferred_xmin; + TransactionId current_oldest_xmin, previous_oldest_xmin; + TransactionId ImportedXmin; + + /* Callers should check config values */ + Assert(csn_snapshot_defer_time > 0); + Assert(csnXidMap != NULL); + /* + * Round up snapshot_csn to the next second -- pessimistically and safely. + */ + csn_seconds = (snapshot_csn / NSECS_PER_SEC + 1); + + /* + * Fast-path check. Avoid taking exclusive CSNSnapshotXidMapLock lock + * if oldestXid was already written to xmin_by_second[] for this rounded + * snapshot_csn. + */ + if (pg_atomic_read_u64(&csnXidMap->last_csn_seconds) >= csn_seconds) + return; + + /* Ok, we have new entry (or entries) */ + LWLockAcquire(CSNSnapshotXidMapLock, LW_EXCLUSIVE); + + /* Re-check last_csn_seconds under lock */ + last_csn_seconds = pg_atomic_read_u64(&csnXidMap->last_csn_seconds); + if (last_csn_seconds >= csn_seconds) + { + LWLockRelease(CSNSnapshotXidMapLock); + return; + } + pg_atomic_write_u64(&csnXidMap->last_csn_seconds, csn_seconds); + + /* + * Count oldest_xmin. + * + * It was possible to calculate oldest_xmin during corresponding snapshot + * creation, but GetSnapshotData() intentionally reads only PgXact, but not + * PgProc. And we need info about originalXmin (see comment to csnXidMap) + * which is stored in PgProc because of threats in comments around PgXact + * about extending it with new fields. So just calculate oldest_xmin again, + * that anyway happens quite rarely. + */ + + /* + * Don't afraid here because csn_snapshot_xmin will hold border of + * minimal non-removable from vacuuming. + */ + ImportedXmin = MyProc->xmin; + MyProc->xmin = MyProc->originalXmin; + current_oldest_xmin = GetOldestNonRemovableTransactionId(NULL); + MyProc->xmin = ImportedXmin; + Assert(TransactionIdIsNormal(current_oldest_xmin)); + + previous_oldest_xmin = csnXidMap->xmin_by_second[csnXidMap->head]; + Assert(TransactionIdIsNormal(previous_oldest_xmin) || !enable_csn_snapshot); + + gap = csn_seconds - last_csn_seconds; + offset = csn_seconds % csnXidMap->size; + + /* Sanity check before we update head and gap */ + Assert( gap >= 1 ); + Assert( (csnXidMap->head + gap) % csnXidMap->size == offset ); + + gap = gap > csnXidMap->size ? csnXidMap->size : gap; + csnXidMap->head = offset; + + /* Fill new entry with current_oldest_xmin */ + csnXidMap->xmin_by_second[offset] = current_oldest_xmin; + + /* + * If we have gap then fill it with previous_oldest_xmin for reasons + * outlined in comment above this function. + */ + for (i = 1; i < gap; i++) + { + offset = (offset + csnXidMap->size - 1) % csnXidMap->size; + csnXidMap->xmin_by_second[offset] = previous_oldest_xmin; + } + + oldest_deferred_xmin = + csnXidMap->xmin_by_second[ (csnXidMap->head + 1) % csnXidMap->size ]; + + LWLockRelease(CSNSnapshotXidMapLock); + + elog(DEBUG5, "Advance xmin for CSN. Oldest deferred xmin = %u", + oldest_deferred_xmin); + + /* + * Advance procArray->csn_snapshot_xmin after we released + * CSNSnapshotXidMapLock. Since we gather not xmin but oldestXmin, it + * never goes backwards regardless of how slow we can do that. + */ + /*Assert(TransactionIdFollowsOrEquals(oldest_deferred_xmin, + ProcArrayGetCSNSnapshotXmin()));*/ + ProcArraySetCSNSnapshotXmin(oldest_deferred_xmin); +} + + +/* + * CSNSnapshotToXmin + * + * Get oldestXmin that took place when snapshot_csn was taken. + */ +TransactionId +CSNSnapshotToXmin(SnapshotCSN snapshot_csn) +{ + TransactionId xmin; + SnapshotCSN csn_seconds; + volatile SnapshotCSN last_csn_seconds; + + /* Callers should check config values */ + Assert(csn_snapshot_defer_time > 0); + Assert(csnXidMap != NULL); + + /* Round down to get conservative estimates */ + csn_seconds = (snapshot_csn / NSECS_PER_SEC); + + LWLockAcquire(CSNSnapshotXidMapLock, LW_SHARED); + last_csn_seconds = pg_atomic_read_u64(&csnXidMap->last_csn_seconds); + if (csn_seconds > last_csn_seconds) + { + /* we don't have entry for this snapshot_csn yet, return latest known */ + xmin = csnXidMap->xmin_by_second[csnXidMap->head]; + } + else if (last_csn_seconds - csn_seconds < csnXidMap->size) + { + /* we are good, retrieve value from our map */ + Assert(last_csn_seconds % csnXidMap->size == csnXidMap->head); + xmin = csnXidMap->xmin_by_second[csn_seconds % csnXidMap->size]; + } + else + { + /* requested snapshot_csn is too old, let caller know */ + xmin = InvalidTransactionId; + } + LWLockRelease(CSNSnapshotXidMapLock); + + return xmin; +} + +/* + * CSNSnapshotPrepareCurrent + * + * Set InDoubt state for currently active transaction and return commit's + * global snapshot. + */ +SnapshotCSN +CSNSnapshotPrepareCurrent(void) +{ + TransactionId xid = GetCurrentTransactionIdIfAny(); + + if (!enable_csn_snapshot) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "enable_csn_snapshot"))); + + if (TransactionIdIsValid(xid)) + { + TransactionId *subxids; + int nsubxids = xactGetCommittedChildren(&subxids); + CSNLogSetCSN(xid, nsubxids, subxids, InDoubtCSN, true); + } + + /* Nothing to write if we don't have xid */ + + return GenerateCSN(false, InvalidCSN); +} + + +/* + * CSNSnapshotAssignCurrent + * + * Assign SnapshotCSN to the currently active transaction. SnapshotCSN is supposedly + * maximal among of values returned by CSNSnapshotPrepareCurrent and + * pg_csn_snapshot_prepare. + */ +void +CSNSnapshotAssignCurrent(SnapshotCSN snapshot_csn) +{ + if (!enable_csn_snapshot) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "enable_csn_snapshot"))); + + if (!CSNIsNormal(snapshot_csn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_csn_snapshot_assign expects normal snapshot_csn"))); + + Assert(snapshot_csn != InvalidCSN); + /* We do not care the Generate result, we just want to make sure max + * csnShared->last_max_csn value. + */ + GenerateCSN(false, snapshot_csn); + + /* Set csn and defuse ProcArrayEndTransaction from assigning one */ + pg_atomic_write_u64(&MyProc->assignedCSN, snapshot_csn); +} + +/* + * CSNSnapshotSync + * + * Due to time desynchronization on different nodes we can receive snapshot_csn + * which is greater than snapshot_csn on this node. To preserve proper isolation + * this node needs to wait when such snapshot_csn comes on local clock. + * + * This should happend relatively rare if nodes have running NTP/PTP/etc. + * Complain if wait time is more than SNAP_SYNC_COMPLAIN. + */ +void +CSNSnapshotSync(SnapshotCSN remote_csn) +{ + SnapshotCSN local_csn; + SnapshotCSN delta; + + Assert(enable_csn_snapshot); + + for(;;) + { + if (GetLastGeneratedCSN() > remote_csn) + return; + + local_csn = GenerateCSN(true, InvalidCSN); + + if (local_csn >= remote_csn) + /* + * Everything is fine too, but last_max_csn wasn't updated for + * some time. + */ + return; + + /* Okay we need to sleep now */ + delta = remote_csn - local_csn; + if (delta > SNAP_DESYNC_COMPLAIN) + ereport(WARNING, + (errmsg("remote global snapshot exceeds ours by more than a second"), + errhint("Consider running NTPd on servers participating in global transaction"))); + + /* TODO: report this sleeptime somewhere? */ + pg_usleep((long) (delta/NSECS_PER_USEC)); + + /* + * Loop that checks to ensure that we actually slept for specified + * amount of time. + */ + } + + Assert(false); /* Should not happend */ + return; +} + +/* + * TransactionIdGetCSN + * + * Get CSN for specified TransactionId taking care about special xids, + * xids beyond TransactionXmin and InDoubt states. + */ +CSN +TransactionIdGetCSN(TransactionId xid) +{ + CSN csn; + + /* Handle permanent TransactionId's for which we don't have mapping */ + if (!TransactionIdIsNormal(xid)) + { + if (xid == InvalidTransactionId) + return AbortedCSN; + if (xid == FrozenTransactionId || xid == BootstrapTransactionId) + return FrozenCSN; + Assert(false); /* Should not happend */ + } + + /* + * If we just switch a xid-snapsot to a csn_snapshot, we should handle a start + * xid for csn base check. Just in case we have prepared transaction which + * hold the TransactionXmin but without CSN. + */ + xmin_for_csn = GetOldestXmin(); + + /* + * For the xid with 'xid >= TransactionXmin and xid < xmin_for_csn', + * it defined as unclear csn which follow xid-snapshot result. + */ + if(!TransactionIdPrecedes(xid, TransactionXmin) && + TransactionIdPrecedes(xid, xmin_for_csn)) + { + elog(LOG, "UnclearCSN was returned. xid=%u, TransactionXmin=%u, xmin_for_csn=%u", + xid, TransactionXmin, xmin_for_csn); + return UnclearCSN; + } + /* + * For xids which less then TransactionXmin CSNLog can be already + * trimmed but we know that such transaction is definitely not concurrently + * running according to any snapshot including timetravel ones. Callers + * should check TransactionDidCommit after. + */ + if (TransactionIdPrecedes(xid, TransactionXmin)) + return FrozenCSN; + + /* Read CSN from SLRU */ + csn = CSNLogGetCSNByXid(xid); + + /* + * If we faced InDoubt state then transaction is being committed and we + * should wait until CSN will be assigned so that visibility check + * could decide whether tuple is in snapshot. See also comments in + * CSNSnapshotPrecommit(). + */ + if (CSNIsInDoubt(csn)) + { + XactLockTableWait(SubTransGetTopmostTransaction(xid), NULL, NULL, XLTW_None); + csn = CSNLogGetCSNByXid(xid); + Assert(CSNIsNormal(csn) || CSNIsAborted(csn)); + } + + Assert(CSNIsNormal(csn) || CSNIsInProgress(csn) || CSNIsAborted(csn)); + return csn; +} + +/* + * XidInCSNSnapshot + * + * Version of XidInMVCCSnapshot for transactions. For non-imported + * csn snapshots this should give same results as XidInLocalMVCCSnapshot + * (except that aborts will be shown as invisible without going to clog) and to + * ensure such behaviour XidInMVCCSnapshot is coated with asserts that checks + * identicalness of XidInCSNSnapshot/XidInLocalMVCCSnapshot in + * case of ordinary snapshot. + */ +bool +XidInCSNSnapshot(TransactionId xid, Snapshot snapshot) +{ + CSN csn; + + csn = TransactionIdGetCSN(xid); + + if (CSNIsNormal(csn)) + return (csn >= snapshot->snapshot_csn); + else if (CSNIsFrozen(csn)) + { + /* It is bootstrap or frozen transaction */ + return false; + } + else if(CSNIsUnclear(csn)) + { + /* + * Some xid can not figure out csn because of snapshot switch, + * and we can follow xid-base result. + */ + return true; + } + else + { + /* It is aborted or in-progress */ + Assert(CSNIsAborted(csn) || CSNIsInProgress(csn)); + if (CSNIsAborted(csn)) + Assert(TransactionIdDidAbort(xid)); + return true; + } +} + + +/***************************************************************************** + * Functions to handle transactions commit. + * + * For local transactions CSNSnapshotPrecommit sets InDoubt state before + * ProcArrayEndTransaction is called and transaction data potetntially becomes + * visible to other backends. ProcArrayEndTransaction (or ProcArrayRemove in + * twophase case) then acquires csn under ProcArray lock and stores it + * in proc->assignedCSN. It's important that csn for commit is + * generated under ProcArray lock, otherwise snapshots won't + * be equivalent. Consequent call to CSNSnapshotCommit will write + * proc->assignedCSN to CSNLog. + * + * + * CSNSnapshotAbort is slightly different comparing to commit because abort + * can skip InDoubt phase and can be called for transaction subtree. + *****************************************************************************/ + + +/* + * CSNSnapshotAbort + * + * Abort transaction in CsnLog. We can skip InDoubt state for aborts + * since no concurrent transactions allowed to see aborted data anyway. + */ +void +CSNSnapshotAbort(PGPROC *proc, TransactionId xid, + int nsubxids, TransactionId *subxids) +{ + if (!get_csnlog_status()) + return; + + CSNLogSetCSN(xid, nsubxids, subxids, AbortedCSN, true); + + /* + * Clean assignedCSN anyway, as it was possibly set in + * XidSnapshotAssignCsnCurrent. + */ + pg_atomic_write_u64(&proc->assignedCSN, InProgressCSN); +} + +/* + * CSNSnapshotPrecommit + * + * Set InDoubt status for local transaction that we are going to commit. + * This step is needed to achieve consistency between local snapshots and + * csn-based snapshots. We don't hold ProcArray lock while writing + * csn for transaction in SLRU but instead we set InDoubt status before + * transaction is deleted from ProcArray so the readers who will read csn + * in the gap between ProcArray removal and CSN assignment can wait + * until CSN is finally assigned. See also TransactionIdGetCSN(). + * + * This should be called only from parallel group leader before backend is + * deleted from ProcArray. + */ +void +CSNSnapshotPrecommit(PGPROC *proc, TransactionId xid, + int nsubxids, TransactionId *subxids) +{ + CSN oldassignedCSN = InProgressCSN; + bool in_progress; + + if (!get_csnlog_status()) + return; + + /* Set InDoubt status if it is local transaction */ + in_progress = pg_atomic_compare_exchange_u64(&proc->assignedCSN, + &oldassignedCSN, + InDoubtCSN); + if (in_progress) + { + Assert(CSNIsInProgress(oldassignedCSN)); + CSNLogSetCSN(xid, nsubxids, subxids, InDoubtCSN, true); + } + else + { + /* Otherwise we should have valid CSN by this time */ + Assert(CSNIsNormal(oldassignedCSN)); + Assert(CSNIsInDoubt(CSNLogGetCSNByXid(xid))); + } +} + +/* + * CSNSnapshotCommit + * + * Write CSN that were acquired earlier to CsnLog. Should be + * preceded by CSNSnapshotPrecommit() so readers can wait until we finally + * finished writing to SLRU. + * + * Should be called after ProcArrayEndTransaction, but before releasing + * transaction locks, so that TransactionIdGetCSN can wait on this + * lock for CSN. + */ +void +CSNSnapshotCommit(PGPROC *proc, TransactionId xid, + int nsubxids, TransactionId *subxids) +{ + volatile CSN assignedCSN; + + if (!get_csnlog_status()) + return; + + if (!TransactionIdIsValid(xid)) + { + assignedCSN = pg_atomic_read_u64(&proc->assignedCSN); + Assert(CSNIsInProgress(assignedCSN)); + return; + } + + /* Finally write resulting CSN in SLRU */ + assignedCSN = pg_atomic_read_u64(&proc->assignedCSN); + Assert(CSNIsNormal(assignedCSN)); + CSNLogSetCSN(xid, nsubxids, subxids, assignedCSN, true); + + /* Reset for next transaction */ + pg_atomic_write_u64(&proc->assignedCSN, InProgressCSN); +} diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 7d67eda5f79..89c61834ad3 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -10,6 +10,7 @@ #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" +#include "access/csn_log.h" #include "access/generic_xlog.h" #include "access/ginxlog.h" #include "access/gistxlog.h" diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c6af8cfd7e2..7e8e67d68d1 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -77,6 +77,7 @@ #include #include "access/commit_ts.h" +#include "access/csn_snapshot.h" #include "access/htup_details.h" #include "access/subtrans.h" #include "access/transam.h" @@ -1565,8 +1566,33 @@ FinishPreparedTransaction(const char *gid, bool isCommit) abortstats, gid); + /* + * CSNSnapshot callbacks that should be called right before we are + * going to become visible. Details in comments to this functions. + */ + if (isCommit) + CSNSnapshotPrecommit(proc, xid, hdr->nsubxacts, children); + else + CSNSnapshotAbort(proc, xid, hdr->nsubxacts, children); + ProcArrayRemove(proc, latestXid); + /* + * Stamp our transaction with CSN in CSNLog. + * Should be called after ProcArrayEndTransaction, but before releasing + * transaction locks, since TransactionIdGetCSN relies on + * XactLockTableWait to await csn. + */ + if (isCommit) + { + CSNSnapshotCommit(proc, xid, hdr->nsubxacts, children); + } + else + { + Assert(CSNIsInProgress( + pg_atomic_read_u64(&proc->assignedCSN))); + } + /* * In case we fail while running the callbacks, mark the gxact invalid so * no one else will try to commit/rollback, and so it will be recycled if @@ -2658,3 +2684,130 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, LWLockRelease(TwoPhaseStateLock); return found; } + +/* + * CSNSnapshotPrepareTwophase + * + * Set InDoubt state for currently active transaction and return commit's + * global snapshot. + */ +static SnapshotCSN +CSNSnapshotPrepareTwophase(const char *gid) +{ + GlobalTransaction gxact; + PGPROC *proc; + char *buf; + TransactionId xid; + xl_xact_parsed_prepare parsed; + + if (!enable_csn_snapshot) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "enable_csn_snapshot"))); + + /* + * Validate the GID, and lock the GXACT to ensure that two backends do not + * try to access the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + proc = &ProcGlobal->allProcs[gxact->pgprocno]; + xid = proc->xid; + + if (gxact->ondisk) + buf = ReadTwoPhaseFile(xid, true); + else + XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL); + + ParsePrepareRecord(0, (xl_xact_prepare *)buf, &parsed); + + CSNLogSetCSN(xid, parsed.nsubxacts, + parsed.subxacts, InDoubtCSN, true); + + /* Unlock our GXACT */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + gxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); + + pfree(buf); + return GenerateCSN(false, InvalidCSN); +} + +/* + * CSNSnapshotAssignTwoPhase + * + * Asign SnapshotCSN for currently active transaction. SnapshotCSN is supposedly + * maximal among of values returned by CSNSnapshotPrepareCurrent and + * pg_csn_snapshot_prepare. + * + * This function is a counterpart of CSNSnapshotAssignCurrent() for + * twophase transactions. + */ +static void +CSNSnapshotAssignTwoPhase(const char *gid, SnapshotCSN csn) +{ + GlobalTransaction gxact; + PGPROC *proc; + + if (!enable_csn_snapshot) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not prepare transaction for global commit"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "enable_csn_snapshot"))); + + if (!CSNIsNormal(csn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_csn_snapshot_assign expects normal snapshot_csn"))); + + /* + * Validate the GID, and lock the GXACT to ensure that two backends do not + * try to access the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + proc = &ProcGlobal->allProcs[gxact->pgprocno]; + + Assert(csn != InvalidCSN); + /* We do not care the Generate result, we just want to make sure max + * csnShared->last_max_csn value. + */ + GenerateCSN(false, csn); + /* Set snapshot_csn and defuse ProcArrayRemove from assigning one. */ + pg_atomic_write_u64(&proc->assignedCSN, csn); + + /* Unlock our GXACT */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + gxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); +} + +/* + * SQL interface to CSNSnapshotPrepareTwophase() + * + * TODO: Rewrite this as PREPARE TRANSACTION 'gid' RETURNING SNAPSHOT + */ +Datum +pg_csn_snapshot_prepare(PG_FUNCTION_ARGS) +{ + const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0)); + SnapshotCSN csn = CSNSnapshotPrepareTwophase(gid); + + PG_RETURN_INT64(csn); +} + +/* + * SQL interface to CSNSnapshotAssignTwoPhase() + * + * TODO: Rewrite this as COMMIT PREPARED 'gid' SNAPSHOT 'csn' + */ +Datum +pg_csn_snapshot_assign(PG_FUNCTION_ARGS) +{ + const char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0)); + SnapshotCSN csn = PG_GETARG_INT64(1); + + CSNSnapshotAssignTwoPhase(gid, csn); + PG_RETURN_VOID(); +} diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 334adac09e8..8708c90c950 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -15,6 +15,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/csn_log.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" @@ -175,6 +176,7 @@ GetNewTransactionId(bool isSubXact) * Extend pg_subtrans and pg_commit_ts too. */ ExtendCLOG(xid); + ExtendCSNLog(xid); ExtendCommitTs(xid); ExtendSUBTRANS(xid); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 489c3942fee..efb7fd15b37 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -21,6 +21,7 @@ #include #include "access/commit_ts.h" +#include "access/csn_snapshot.h" #include "access/multixact.h" #include "access/parallel.h" #include "access/subtrans.h" @@ -1427,6 +1428,12 @@ RecordTransactionCommit(void) TransactionTreeSetCommitTsData(xid, nchildren, children, replorigin_session_origin_timestamp, replorigin_session_origin); + + /* + * Mark our transaction as InDoubt in CsnLog and get ready for + * commit. + */ + CSNSnapshotPrecommit(MyProc, xid, nchildren, children); } /* @@ -1799,6 +1806,9 @@ RecordTransactionAbort(bool isSubXact) */ TransactionIdAbortTree(xid, nchildren, children); + /* Mark our transaction as Aborted in CSN Log. */ + CSNSnapshotAbort(MyProc, xid, nchildren, children); + END_CRIT_SECTION(); /* Compute latestXid while we have the child XIDs handy */ @@ -2143,6 +2153,13 @@ StartTransaction(void) ShowTransactionState("StartTransaction"); } +Datum +pg_current_csn(PG_FUNCTION_ARGS) +{ + SnapshotCSN csn = GenerateCSN(false, InvalidCSN); + + PG_RETURN_INT64(csn); +} /* * CommitTransaction @@ -2291,6 +2308,21 @@ CommitTransaction(void) */ ProcArrayEndTransaction(MyProc, latestXid); + /* + * Stamp our transaction with CSN in CsnLog. + * Should be called after ProcArrayEndTransaction, but before releasing + * transaction locks. + */ + if (!is_parallel_worker) + { + TransactionId xid = GetTopTransactionIdIfAny(); + TransactionId *subxids; + int nsubxids; + + nsubxids = xactGetCommittedChildren(&subxids); + CSNSnapshotCommit(MyProc, xid, nsubxids, subxids); + } + /* * This is all post-commit cleanup. Note that if an error is raised here, * it's too late to abort the transaction. This should be just diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8b0710abe60..bd46feb2bab 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -48,6 +48,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/csn_log.h" #include "access/heaptoast.h" #include "access/multixact.h" #include "access/rewriteheap.h" @@ -3882,6 +3883,7 @@ InitControlFile(uint64 sysidentifier) ControlFile->wal_level = wal_level; ControlFile->wal_log_hints = wal_log_hints; ControlFile->track_commit_timestamp = track_commit_timestamp; + ControlFile->enable_csn_snapshot = enable_csn_snapshot; ControlFile->data_checksum_version = bootstrap_data_checksum_version; } @@ -5229,6 +5231,9 @@ StartupXLOG(void) if (ControlFile->track_commit_timestamp) StartupCommitTs(); + if(ControlFile->enable_csn_snapshot) + StartupCSN(); + /* * Recover knowledge about replay progress of known replication partners. */ @@ -5407,6 +5412,8 @@ StartupXLOG(void) */ StartupSUBTRANS(oldestActiveXID); + CSNSnapshotStartup(oldestActiveXID); + /* * If we're beginning at a shutdown checkpoint, we know that * nothing was running on the primary at this point. So fake-up an @@ -5681,7 +5688,10 @@ StartupXLOG(void) * timestamps are started below, if necessary.) */ if (standbyState == STANDBY_DISABLED) + { StartupSUBTRANS(oldestActiveXID); + CSNSnapshotStartup(oldestActiveXID); + } /* * Perform end of recovery actions for any SLRUs that need it. @@ -5738,6 +5748,7 @@ StartupXLOG(void) * commit timestamp. */ CompleteCommitTsInitialization(); + CompleteCSNInitialization(); /* * All done with end-of-recovery actions. @@ -7024,6 +7035,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags); CheckpointStats.ckpt_write_t = GetCurrentTimestamp(); CheckPointCLOG(); + CheckPointCSNLog(); CheckPointCommitTs(); CheckPointSUBTRANS(); CheckPointMultiXact(); @@ -7314,7 +7326,10 @@ CreateRestartPoint(int flags) * this because StartupSUBTRANS hasn't been called yet. */ if (EnableHotStandby) + { TruncateSUBTRANS(GetOldestTransactionIdConsideredRunning()); + TruncateCSNLog(GetOldestTransactionIdConsideredRunning()); + } /* Real work is done; log and update stats. */ LogCheckpointEnd(true); @@ -7594,7 +7609,8 @@ XLogReportParameters(void) max_wal_senders != ControlFile->max_wal_senders || max_prepared_xacts != ControlFile->max_prepared_xacts || max_locks_per_xact != ControlFile->max_locks_per_xact || - track_commit_timestamp != ControlFile->track_commit_timestamp) + track_commit_timestamp != ControlFile->track_commit_timestamp || + enable_csn_snapshot != ControlFile->enable_csn_snapshot) { /* * The change in number of backend slots doesn't need to be WAL-logged @@ -7616,6 +7632,7 @@ XLogReportParameters(void) xlrec.wal_level = wal_level; xlrec.wal_log_hints = wal_log_hints; xlrec.track_commit_timestamp = track_commit_timestamp; + xlrec.enable_csn_snapshot = enable_csn_snapshot; XLogBeginInsert(); XLogRegisterData((char *) &xlrec, sizeof(xlrec)); @@ -7634,6 +7651,7 @@ XLogReportParameters(void) ControlFile->wal_level = wal_level; ControlFile->wal_log_hints = wal_log_hints; ControlFile->track_commit_timestamp = track_commit_timestamp; + ControlFile->enable_csn_snapshot = enable_csn_snapshot; UpdateControlFile(); LWLockRelease(ControlFileLock); @@ -8034,6 +8052,9 @@ xlog_redo(XLogReaderState *record) CommitTsParameterChange(xlrec.track_commit_timestamp, ControlFile->track_commit_timestamp); ControlFile->track_commit_timestamp = xlrec.track_commit_timestamp; + CSNlogParameterChange(xlrec.enable_csn_snapshot, + ControlFile->enable_csn_snapshot); + ControlFile->enable_csn_snapshot = xlrec.enable_csn_snapshot; UpdateControlFile(); LWLockRelease(ControlFileLock); diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index 258cbd70355..ca42d1367d2 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -87,6 +87,8 @@ typedef struct char compressed_page[COMPRESS_BUFSIZE]; } registered_buffer; +bool enable_csn_wal = true; + static registered_buffer *registered_buffers; static int max_registered_buffers; /* allocated size */ static int max_registered_block_id = 0; /* highest block_id + 1 currently diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 69ac276687b..ef76d1770d9 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -34,6 +34,7 @@ #include "access/tableam.h" #include "access/transam.h" #include "access/xact.h" +#include "access/csn_log.h" #include "catalog/namespace.h" #include "catalog/index.h" #include "catalog/pg_database.h" @@ -62,7 +63,6 @@ #include "utils/snapmgr.h" #include "utils/syscache.h" - /* * GUC parameters */ @@ -1932,6 +1932,7 @@ vac_truncate_clog(TransactionId frozenXID, */ TruncateCLOG(frozenXID, oldestxid_datoid); TruncateCommitTs(frozenXID); + TruncateCSNLog(frozenXID); TruncateMultiXact(minMulti, minmulti_datoid); /* diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 7a7aba33e16..76bff0e4650 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -120,6 +120,7 @@ #include #include +#include "access/csn_log.h" #include "access/heapam_xlog.h" #include "access/transam.h" #include "access/xact.h" @@ -524,6 +525,9 @@ SnapBuildBuildSnapshot(SnapBuild *builder) snapshot->xmin = builder->xmin; snapshot->xmax = builder->xmax; + snapshot->snapshot_csn = FrozenCSN; + snapshot->imported_csn = false; + /* store all transactions to be treated as committed by this snapshot */ snapshot->xip = (TransactionId *) ((char *) snapshot + sizeof(SnapshotData)); diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 8f1ded7338f..14dd9598028 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -16,6 +16,8 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/csn_log.h" +#include "access/csn_snapshot.h" #include "access/heapam.h" #include "access/multixact.h" #include "access/nbtree.h" @@ -118,6 +120,8 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, XLOGShmemSize()); size = add_size(size, XLogRecoveryShmemSize()); size = add_size(size, CLOGShmemSize()); + size = add_size(size, CSNLogShmemSize()); + size = add_size(size, CSNSnapshotShmemSize()); size = add_size(size, CommitTsShmemSize()); size = add_size(size, SUBTRANSShmemSize()); size = add_size(size, TwoPhaseShmemSize()); @@ -242,6 +246,8 @@ CreateSharedMemoryAndSemaphores(void) XLogPrefetchShmemInit(); XLogRecoveryShmemInit(); CLOGShmemInit(); + CSNLogShmemInit(); + CSNSnapshotShmemInit(); CommitTsShmemInit(); SUBTRANSShmemInit(); MultiXactShmemInit(); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index eaceefa0571..963642adc15 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -48,6 +48,7 @@ #include #include "access/clog.h" +#include "access/csn_snapshot.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/twophase.h" @@ -97,6 +98,8 @@ typedef struct ProcArrayStruct TransactionId replication_slot_xmin; /* oldest catalog xmin of any replication slot */ TransactionId replication_slot_catalog_xmin; + /* xmin of oldest active csn snapshot */ + TransactionId csn_snapshot_xmin; /* indexes into allProcs[], has PROCARRAY_MAXPROCS entries */ int pgprocnos[FLEXIBLE_ARRAY_MEMBER]; @@ -445,6 +448,7 @@ CreateSharedProcArray(void) procArray->lastOverflowedXid = InvalidTransactionId; procArray->replication_slot_xmin = InvalidTransactionId; procArray->replication_slot_catalog_xmin = InvalidTransactionId; + procArray->csn_snapshot_xmin = InvalidTransactionId; ShmemVariableCache->xactCompletionCount = 1; } @@ -593,6 +597,14 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid) /* Advance global latestCompletedXid while holding the lock */ MaintainLatestCompletedXid(latestXid); + /* + * Assign xid csn while holding ProcArrayLock for non-distributed + * COMMIT PREPARED. After lock is released consequent + * CSNSnapshotCommit() will write this value to CsnLog. + */ + if (CSNIsInDoubt(pg_atomic_read_u64(&proc->assignedCSN))) + pg_atomic_write_u64(&proc->assignedCSN, GenerateCSN(false, InvalidCSN)); + /* Same with xactCompletionCount */ ShmemVariableCache->xactCompletionCount++; @@ -711,6 +723,8 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) proc->recoveryConflictPending = false; + proc->originalXmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ /* avoid unnecessarily dirtying shared cachelines */ if (proc->statusFlags & PROC_VACUUM_STATE_MASK) @@ -753,6 +767,8 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid) proc->recoveryConflictPending = false; + proc->originalXmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ /* avoid unnecessarily dirtying shared cachelines */ if (proc->statusFlags & PROC_VACUUM_STATE_MASK) @@ -775,6 +791,16 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid) /* Also advance global latestCompletedXid while holding the lock */ MaintainLatestCompletedXid(latestXid); + /* + * Assign xid csn while holding ProcArrayLock for + * COMMIT. + * + * TODO: in case of group commit we can generate one CSNSnapshot for + * whole group to save time on timestamp aquisition. + */ + if (CSNIsInDoubt(pg_atomic_read_u64(&proc->assignedCSN))) + pg_atomic_write_u64(&proc->assignedCSN, GenerateCSN(false, InvalidCSN)); + /* Same with xactCompletionCount */ ShmemVariableCache->xactCompletionCount++; } @@ -934,6 +960,7 @@ ProcArrayClearTransaction(PGPROC *proc) proc->lxid = InvalidLocalTransactionId; proc->xmin = InvalidTransactionId; proc->recoveryConflictPending = false; + proc->originalXmin = InvalidTransactionId; Assert(!(proc->statusFlags & PROC_VACUUM_STATE_MASK)); Assert(!proc->delayChkptFlags); @@ -1231,6 +1258,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) while (TransactionIdPrecedes(latestObservedXid, running->nextXid)) { ExtendSUBTRANS(latestObservedXid); + ExtendCSNLog(latestObservedXid); TransactionIdAdvance(latestObservedXid); } TransactionIdRetreat(latestObservedXid); /* = running->nextXid - 1 */ @@ -1722,6 +1750,7 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) TransactionId kaxmin; bool in_recovery = RecoveryInProgress(); TransactionId *other_xids = ProcGlobal->xids; + TransactionId csn_snapshot_xmin = InvalidTransactionId; /* inferred after ProcArrayLock is released */ h->catalog_oldest_nonremovable = InvalidTransactionId; @@ -1855,6 +1884,10 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) if (in_recovery) kaxmin = KnownAssignedXidsGetOldestXmin(); + /* Get value of xmin, delayed by a CSN snapshot settings. */ + if (get_csnlog_status() && csn_snapshot_defer_time > 0 && IsUnderPostmaster) + csn_snapshot_xmin = ProcArrayGetCSNSnapshotXmin(); + /* * No other information from shared state is needed, release the lock * immediately. The rest of the computations can be done without a lock. @@ -1885,6 +1918,15 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h) h->data_oldest_nonremovable = TransactionIdOlder(h->data_oldest_nonremovable, h->slot_xmin); + /* + * Hold non-removable border because distributed transactions + * can wish to see old data. + */ + h->shared_oldest_nonremovable = + TransactionIdOlder(h->shared_oldest_nonremovable, csn_snapshot_xmin); + h->data_oldest_nonremovable = + TransactionIdOlder(h->data_oldest_nonremovable, csn_snapshot_xmin); + /* * The only difference between catalog / data horizons is that the slot's * catalog xmin is applied to the catalog one (so catalogs can be accessed @@ -2117,6 +2159,9 @@ GetSnapshotDataReuse(Snapshot snapshot) if (curXactCompletionCount != snapshot->snapXactCompletionCount) return false; + if (get_csnlog_status()) + return false; + /* * If the current xactCompletionCount is still the same as it was at the * time the snapshot was built, we can be sure that rebuilding the @@ -2196,6 +2241,8 @@ GetSnapshotData(Snapshot snapshot) int count = 0; int subcount = 0; bool suboverflowed = false; + CSN csn = FrozenCSN; + TransactionId csn_snapshot_xmin = InvalidTransactionId; FullTransactionId latest_completed; TransactionId oldestxid; int mypgxactoff; @@ -2428,6 +2475,20 @@ GetSnapshotData(Snapshot snapshot) if (!TransactionIdIsValid(MyProc->xmin)) MyProc->xmin = TransactionXmin = xmin; + /* Take CSN under ProcArrayLock so the snapshot stays synchronized. */ + if (!snapshot->takenDuringRecovery && get_csnlog_status()) + csn = GenerateCSN(false, InvalidCSN); + + if (get_csnlog_status() && csn_snapshot_defer_time > 0 && IsUnderPostmaster) + { + CSNSnapshotMapXmin(snapshot->snapshot_csn); + + /* Get value of xmin, delayed by a CSN snapshot settings. */ + csn_snapshot_xmin = ProcArrayGetCSNSnapshotXmin(); + /* Adjust an oldest xid value with a xmin, delayed by CSN options. */ + oldestxid = TransactionIdOlder(oldestxid, csn_snapshot_xmin); + } + LWLockRelease(ProcArrayLock); /* maintain state for GlobalVis* */ @@ -2449,6 +2510,10 @@ GetSnapshotData(Snapshot snapshot) def_vis_xid_data = TransactionIdOlder(xmin, replication_slot_xmin); + /* The csn-related settings can require an older xmin. */ + def_vis_xid_data = + TransactionIdOlder(def_vis_xid_data, csn_snapshot_xmin); + /* * Rows in non-shared, non-catalog tables possibly could be vacuumed * if older than this xid. @@ -2529,6 +2594,8 @@ GetSnapshotData(Snapshot snapshot) snapshot->active_count = 0; snapshot->regd_count = 0; snapshot->copied = false; + snapshot->imported_csn = false; + snapshot->snapshot_csn = csn; GetSnapshotDataInitOldSnapshot(snapshot); @@ -3914,6 +3981,25 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin, LWLockRelease(ProcArrayLock); } +/* + * ProcArraySetCSNSnapshotXmin + */ +void +ProcArraySetCSNSnapshotXmin(TransactionId xmin) +{ + /* We rely on atomic fetch/store of xid */ + procArray->csn_snapshot_xmin = xmin; +} + +/* + * ProcArrayGetCSNSnapshotXmin + */ +TransactionId +ProcArrayGetCSNSnapshotXmin(void) +{ + return procArray->csn_snapshot_xmin; +} + /* * XidCacheRemoveRunningXids * @@ -4396,6 +4482,7 @@ RecordKnownAssignedTransactionIds(TransactionId xid) while (TransactionIdPrecedes(next_expected_xid, xid)) { TransactionIdAdvance(next_expected_xid); + ExtendCSNLog(next_expected_xid); ExtendSUBTRANS(next_expected_xid); } Assert(next_expected_xid == xid); diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 01d738f306b..b1481715f99 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -138,6 +138,8 @@ static const char *const BuiltinTrancheNames[] = { "CommitTsBuffer", /* LWTRANCHE_SUBTRANS_BUFFER: */ "SubtransBuffer", + /* LWTRANCHE_CSN_LOG_BUFFERS */ + "CSNLogBuffer", /* LWTRANCHE_MULTIXACTOFFSET_BUFFER: */ "MultiXactOffsetBuffer", /* LWTRANCHE_MULTIXACTMEMBER_BUFFER: */ diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index 6c7cf6c2956..e8ca3936118 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -53,3 +53,5 @@ XactTruncationLock 44 # 45 was XactTruncationLock until removal of BackendRandomLock WrapLimitsVacuumLock 46 NotifyQueueTailLock 47 +CSNLogSLRULock 48 +CSNSnapshotXidMapLock 49 diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index e9e445bb216..53bae04f5a3 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -33,9 +33,11 @@ #include #include +#include "access/csn_snapshot.h" #include "access/transam.h" #include "access/twophase.h" #include "access/xlogutils.h" +#include "access/xact.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -432,6 +434,9 @@ InitProcess(void) MyProc->clogGroupMemberLsn = InvalidXLogRecPtr; Assert(pg_atomic_read_u32(&MyProc->clogGroupNext) == INVALID_PGPROCNO); + MyProc->originalXmin = InvalidTransactionId; + pg_atomic_init_u64(&MyProc->assignedCSN, InProgressCSN); + /* * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch * on it. That allows us to repoint the process latch, which so far @@ -577,6 +582,7 @@ InitAuxiliaryProcess(void) MyProc->lwWaitMode = 0; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; + MyProc->originalXmin = InvalidTransactionId; pg_atomic_write_u64(&MyProc->waitStart, 0); #ifdef USE_ASSERT_CHECKING { diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c index 04fcb06056d..0b4dcb2eb33 100644 --- a/src/backend/storage/sync/sync.c +++ b/src/backend/storage/sync/sync.c @@ -20,6 +20,7 @@ #include "access/commit_ts.h" #include "access/clog.h" +#include "access/csn_log.h" #include "access/multixact.h" #include "access/xlog.h" #include "access/xlogutils.h" @@ -119,6 +120,10 @@ static const SyncOps syncsw[] = { /* pg_multixact/members */ [SYNC_HANDLER_MULTIXACT_MEMBER] = { .sync_syncfiletag = multixactmemberssyncfiletag + }, + /* pg_multixact/members */ + [SYNC_HANDLER_CSN] = { + .sync_syncfiletag = csnsyncfiletag } }; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index e0291d3a009..42b925237b7 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -1057,6 +1057,24 @@ struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + { + {"enable_csn_snapshot", PGC_POSTMASTER, RESOURCES_MEM, + gettext_noop("Enable csn-base snapshot."), + gettext_noop("Used to achieve REPEATABLE READ isolation level for postgres_fdw transactions.") + }, + &enable_csn_snapshot, + true, + NULL, NULL, NULL + }, + { + {"enable_csn_wal", PGC_POSTMASTER, RESOURCES_MEM, + gettext_noop("Enable csn-wal record."), + gettext_noop("Used to enable csn-wal record") + }, + &enable_csn_wal, + true, + NULL, NULL, NULL + }, { {"ssl", PGC_SIGHUP, CONN_AUTH_SSL, gettext_noop("Enables SSL connections."), @@ -3118,6 +3136,25 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"csn_snapshot_defer_time", PGC_POSTMASTER, REPLICATION_PRIMARY, + gettext_noop("Minimal age of records which allowed to be vacuumed, in seconds."), + NULL + }, + &csn_snapshot_defer_time, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + { + {"csn_time_shift", PGC_USERSET, RESOURCES_MEM, + gettext_noop("Do the time shift in the CSN generator."), + gettext_noop("Used for debug purposes.") + }, + &csn_time_shift, + 0, INT_MIN, INT_MAX, + NULL, NULL, NULL + }, + { {"block_size", PGC_INTERNAL, PRESET_OPTIONS, gettext_noop("Shows the size of a disk block."), diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index 0af275587b7..27bb9859b6f 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -77,6 +77,8 @@ provider postgresql { probe clog__checkpoint__done(bool); probe subtrans__checkpoint__start(bool); probe subtrans__checkpoint__done(bool); + probe csnlog__checkpoint__start(bool); + probe csnlog__checkpoint__done(bool); probe multixact__checkpoint__start(bool); probe multixact__checkpoint__done(bool); probe twophase__checkpoint__start(); diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 3a419e348fa..1fa1cff97f2 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -48,6 +48,7 @@ #include #include +#include "access/csn_log.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/xact.h" @@ -78,6 +79,8 @@ */ int old_snapshot_threshold; /* number of minutes, -1 disables */ +bool enable_csn_snapshot; + volatile OldSnapshotControlData *oldSnapshotControl; @@ -174,6 +177,7 @@ static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts); static Snapshot CopySnapshot(Snapshot snapshot); static void FreeSnapshot(Snapshot snapshot); static void SnapshotResetXmin(void); +static bool XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot); /* * Snapshot fields to be serialized. @@ -192,6 +196,8 @@ typedef struct SerializedSnapshotData CommandId curcid; TimestampTz whenTaken; XLogRecPtr lsn; + CSN csn; + bool imported_csn; } SerializedSnapshotData; Size @@ -547,6 +553,8 @@ SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, sourcesnap->subxcnt * sizeof(TransactionId)); CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed; CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery; + CurrentSnapshot->snapshot_csn = sourcesnap->snapshot_csn; + CurrentSnapshot->imported_csn = sourcesnap->imported_csn; /* NB: curcid should NOT be copied, it's a local matter */ CurrentSnapshot->snapXactCompletionCount = 0; @@ -1213,6 +1221,10 @@ ExportSnapshot(Snapshot snapshot) appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin); appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax); + appendStringInfo(&buf, "snapshot_csn:"UINT64_FORMAT"\n", + snapshot->snapshot_csn); + appendStringInfo(&buf, "imported_csn:%u\n", snapshot->imported_csn); + /* * We must include our own top transaction ID in the top-xid data, since * by definition we will still be running when the importing transaction @@ -1337,6 +1349,31 @@ parseIntFromText(const char *prefix, char **s, const char *filename) return val; } +static CSN +parseCSNFromText(const char *prefix, char **s, const char *filename) +{ + char *ptr = *s; + int prefixlen = strlen(prefix); + uint64 val; + + if (strncmp(ptr, prefix, prefixlen) != 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + ptr += prefixlen; + if (sscanf(ptr, UINT64_FORMAT, &val) != 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + ptr = strchr(ptr, '\n'); + if (!ptr) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + *s = ptr + 1; + return val; +} + static TransactionId parseXidFromText(const char *prefix, char **s, const char *filename) { @@ -1478,6 +1515,9 @@ ImportSnapshot(const char *idstr) snapshot.xmin = parseXidFromText("xmin:", &filebuf, path); snapshot.xmax = parseXidFromText("xmax:", &filebuf, path); + snapshot.snapshot_csn = parseCSNFromText("snapshot_csn:", &filebuf, path); + snapshot.imported_csn = parseIntFromText("imported_csn:", &filebuf, path); + snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path); /* sanity-check the xid count before palloc */ @@ -2160,6 +2200,8 @@ SerializeSnapshot(Snapshot snapshot, char *start_address) serialized_snapshot.curcid = snapshot->curcid; serialized_snapshot.whenTaken = snapshot->whenTaken; serialized_snapshot.lsn = snapshot->lsn; + serialized_snapshot.csn = snapshot->snapshot_csn; + serialized_snapshot.imported_csn = snapshot->imported_csn; /* * Ignore the SubXID array if it has overflowed, unless the snapshot was @@ -2234,6 +2276,8 @@ RestoreSnapshot(char *start_address) snapshot->curcid = serialized_snapshot.curcid; snapshot->whenTaken = serialized_snapshot.whenTaken; snapshot->lsn = serialized_snapshot.lsn; + snapshot->snapshot_csn = serialized_snapshot.csn; + snapshot->imported_csn = serialized_snapshot.imported_csn; snapshot->snapXactCompletionCount = 0; /* Copy XIDs, if present. */ @@ -2275,6 +2319,44 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc) /* * XidInMVCCSnapshot + * + * Check whether this xid is in snapshot. When enable_csn_snapshot is + * switched off just call XidInLocalMVCCSnapshot(). + */ +bool +XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) +{ + bool in_snapshot; + + if (snapshot->imported_csn) + { + Assert(enable_csn_snapshot); + /* No point to using snapshot info except CSN */ + return XidInCSNSnapshot(xid, snapshot); + } + + in_snapshot = XidInLocalMVCCSnapshot(xid, snapshot); + + if (!get_csnlog_status()) + { + Assert(CSNIsFrozen(snapshot->snapshot_csn)); + return in_snapshot; + } + + if (in_snapshot) + { + /* + * This xid may be already in unknown state and in that case + * we must wait and recheck. + */ + return XidInCSNSnapshot(xid, snapshot); + } + else + return false; +} + +/* + * XidInLocalMVCCSnapshot * Is the given XID still-in-progress according to the snapshot? * * Note: GetSnapshotData never stores either top xid or subxids of our own @@ -2283,8 +2365,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc) * TransactionIdIsCurrentTransactionId first, except when it's known the * XID could not be ours anyway. */ -bool -XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) +static bool +XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot) { /* * Make a quick range check to eliminate most XIDs without looking at the @@ -2379,3 +2461,100 @@ XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot) return false; } + + +/* + * ExportCSNSnapshot + * + * Export snapshot_csn so that caller can expand this transaction to other + * nodes. + * + * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and + * add some additional checks that transaction did not yet acquired xid, but + * for current iteration of this patch I don't want to hack on parser. + */ +SnapshotCSN +ExportCSNSnapshot() +{ + if (!get_csnlog_status()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not export csn snapshot"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "enable_csn_snapshot"))); + + elog(DEBUG5, "Export CSN Snapshot: csn = %lu", + CurrentSnapshot->snapshot_csn); + return CurrentSnapshot->snapshot_csn; +} + +/* SQL accessor to ExportCSNSnapshot() */ +Datum +pg_csn_snapshot_export(PG_FUNCTION_ARGS) +{ + SnapshotCSN csn = ExportCSNSnapshot(); + + PG_RETURN_UINT64(csn); +} + +/* + * ImportCSNSnapshot + * + * Import csn and retract this backends xmin to the value that was + * actual when we had such csn. + * + * TODO: it's better to do this through EXPORT/IMPORT SNAPSHOT syntax and + * add some additional checks that transaction did not yet acquired xid, but + * for current iteration of this patch I don't want to hack on parser. + */ +void +ImportCSNSnapshot(SnapshotCSN snapshot_csn) +{ + volatile TransactionId xmin; + + if (!get_csnlog_status()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import csn snapshot"), + errhint("Make sure the configuration parameter \"%s\" is enabled.", + "enable_csn_snapshot"))); + + if (csn_snapshot_defer_time <= 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import csn snapshot"), + errhint("Make sure the configuration parameter \"%s\" is positive.", + "csn_snapshot_defer_time"))); + + /* + * Call CSNSnapshotToXmin under ProcArrayLock to avoid situation that + * resulting xmin will be evicted from map before we will set it into our + * backend's xmin. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + xmin = CSNSnapshotToXmin(snapshot_csn); + if (!TransactionIdIsValid(xmin)) + { + LWLockRelease(ProcArrayLock); + elog(ERROR, "CSNSnapshotToXmin: csn snapshot too old"); + } + + MyProc->originalXmin = MyProc->xmin; + MyProc->xmin = TransactionXmin = xmin; + LWLockRelease(ProcArrayLock); + + CurrentSnapshot->xmin = xmin; /* defuse SnapshotResetXmin() */ + CurrentSnapshot->snapshot_csn = snapshot_csn; + CurrentSnapshot->imported_csn = true; + CSNSnapshotSync(snapshot_csn); +} + +/* SQL accessor to ImportCSNSnapshot() */ +Datum +pg_csn_snapshot_import(PG_FUNCTION_ARGS) +{ + SnapshotCSN csn = PG_GETARG_UINT64(0); + + ImportCSNSnapshot(csn); + PG_RETURN_VOID(); +} diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 8b84e230f1c..4967c2024dc 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -245,7 +245,8 @@ static const char *const subdirs[] = { "pg_xact", "pg_logical", "pg_logical/snapshots", - "pg_logical/mappings" + "pg_logical/mappings", + "pg_csn" }; diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index c390ec51ce9..10d2cde8c5a 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -300,6 +300,8 @@ main(int argc, char *argv[]) ControlFile->max_locks_per_xact); printf(_("track_commit_timestamp setting: %s\n"), ControlFile->track_commit_timestamp ? _("on") : _("off")); + printf(_("enable_csn_snapshot setting: %s\n"), + ControlFile->enable_csn_snapshot ? _("on") : _("off")); printf(_("Maximum data alignment: %u\n"), ControlFile->maxAlign); /* we don't print floatFormat since can't say much useful about it */ diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index 4562dafcff5..e9c0b520a55 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -732,6 +732,11 @@ copy_xact_xlog_xid(void) check_ok(); } + if(old_cluster.controldata.cat_ver > CSN_BASE_SNAPSHOT_ADD_VER) + { + copy_subdir_files("pg_csn", "pg_csn"); + } + /* now reset the wal archives in the new cluster */ prep_status("Resetting WAL archives"); exec_prog(UTILITY_LOG_FILE, NULL, true, true, diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 7afa96716ec..5667b7f5471 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -126,6 +126,8 @@ extern char *output_files[]; */ #define JSONB_FORMAT_CHANGE_CAT_VER 201409291 +#define CSN_BASE_SNAPSHOT_ADD_VER 202002010 + /* * Each relation is represented by a relinfo structure. diff --git a/src/bin/pg_waldump/csnlogdesc.c b/src/bin/pg_waldump/csnlogdesc.c new file mode 120000 index 00000000000..dcde44b3ee7 --- /dev/null +++ b/src/bin/pg_waldump/csnlogdesc.c @@ -0,0 +1 @@ +../../../src/backend/access/rmgrdesc/csnlogdesc.c \ No newline at end of file diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 6b8c17bb4c4..7ee9ccb8f55 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -11,6 +11,7 @@ #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" +#include "access/csn_log.h" #include "access/generic_xlog.h" #include "access/ginxlog.h" #include "access/gistxlog.h" diff --git a/src/include/access/csn_log.h b/src/include/access/csn_log.h new file mode 100644 index 00000000000..5ec14cff6c1 --- /dev/null +++ b/src/include/access/csn_log.h @@ -0,0 +1,99 @@ +/* + * csn_log.h + * + * Commit-Sequence-Number log. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/csn_log.h + */ +#ifndef CSNLOG_H +#define CSNLOG_H + +#include "access/xlog.h" +#include "access/xlogreader.h" +#include "utils/snapshot.h" +#include "storage/sync.h" + + +#define InProgressCSN UINT64CONST(0x0) +#define AbortedCSN UINT64CONST(0x1) +#define FrozenCSN UINT64CONST(0x2) +#define InDoubtCSN UINT64CONST(0x3) +#define UnclearCSN UINT64CONST(0x4) +#define FirstNormalCSN UINT64CONST(0x5) + +#define CSNIsInProgress(csn) ((csn) == InProgressCSN) +#define CSNIsAborted(csn) ((csn) == AbortedCSN) +#define CSNIsFrozen(csn) ((csn) == FrozenCSN) +#define CSNIsInDoubt(csn) ((csn) == InDoubtCSN) +#define CSNIsUnclear(csn) ((csn) == UnclearCSN) +#define CSNIsNormal(csn) ((csn) >= FirstNormalCSN) + +/* XLOG stuff */ +#define XLOG_CSN_ASSIGNMENT 0x00 +#define XLOG_CSN_SETCSN 0x10 +#define XLOG_CSN_ZEROPAGE 0x20 +#define XLOG_CSN_TRUNCATE 0x30 + +/* + * We should log MAX generated CSN to wal, so that database will not generate + * a historical CSN after database restart. This may appear when system time + * turned back. + * + * However we can not log the MAX CSN every time it generated, if so it will + * cause too many wal expend, so we log it 5s more in the future. + * + * As a trade off, when this database restart, there will be 5s bad performance + * for time synchronization among sharding nodes. + * + * It looks like we can redefine this as a configure parameter, and the user + * can decide which way they prefer. + * + */ +#define CSN_ASSIGN_TIME_INTERVAL 5 + +typedef struct xl_csn_set +{ + CSN csn; + TransactionId xtop; /* XID's top-level XID */ + int nsubxacts; /* number of subtransaction XIDs */ + TransactionId xsub[FLEXIBLE_ARRAY_MEMBER]; /* assigned subxids */ +} xl_csn_set; + +#define MinSizeOfCSNSet offsetof(xl_csn_set, xsub) +#define CSNAddByNanosec(csn,second) (csn + second * 1000000000L) + +/* Main functions */ +extern void CSNLogSetCSN(TransactionId xid, int nsubxids, + TransactionId *subxids, CSN csn, bool write_xlog); +extern CSN CSNLogGetCSNByXid(TransactionId xid); + +/* Infrastructure functions */ +extern Size CSNLogShmemSize(void); +extern void CSNLogShmemInit(void); +extern void ActivateCSNlog(void); +extern void ExtendCSNLog(TransactionId newestXact); +extern void DeactivateCSNlog(void); + +extern void CheckPointCSNLog(void); +extern void TruncateCSNLog(TransactionId oldestXact); + +extern void csnlog_redo(XLogReaderState *record); +extern void csnlog_desc(StringInfo buf, XLogReaderState *record); +extern const char *csnlog_identify(uint8 info); +extern void WriteAssignCSNXlogRec(CSN csn); +extern void CatchCSNLog(void); +extern void StartupCSN(void); +extern void CompleteCSNInitialization(void); +extern void CSNlogParameterChange(bool newvalue, bool oldvalue); +extern bool get_csnlog_status(void); +extern int csnsyncfiletag(const FileTag *ftag, char *path); + +extern CSN GenerateCSN(bool locked, CSN assign); +extern CSN GetLastGeneratedCSN(void); + +extern TransactionId GetOldestXmin(void); + +#endif /* CSNLOG_H */ \ No newline at end of file diff --git a/src/include/access/csn_snapshot.h b/src/include/access/csn_snapshot.h new file mode 100644 index 00000000000..916603af0ce --- /dev/null +++ b/src/include/access/csn_snapshot.h @@ -0,0 +1,54 @@ +/*------------------------------------------------------------------------- + * + * csn_snapshot.h + * Support for cross-node snapshot isolation. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/csn_snapshot.h + * + *------------------------------------------------------------------------- + */ +#ifndef CSN_SNAPSHOT_H +#define CSN_SNAPSHOT_H + +#include "access/csn_log.h" +#include "port/atomics.h" +#include "storage/lock.h" +#include "utils/snapshot.h" +#include "utils/guc.h" + +/* + * snapshot.h is used in frontend code so atomic variant of SnapshotCSN type + * is defined here. + */ +typedef pg_atomic_uint64 CSN_atomic; + + +extern int csn_snapshot_defer_time; +extern int csn_time_shift; + + +extern Size CSNSnapshotShmemSize(void); +extern void CSNSnapshotShmemInit(void); +extern void CSNSnapshotStartup(TransactionId oldestActiveXID); + +extern void CSNSnapshotMapXmin(SnapshotCSN snapshot_csn); +extern TransactionId CSNSnapshotToXmin(SnapshotCSN snapshot_csn); + +extern bool XidInCSNSnapshot(TransactionId xid, Snapshot snapshot); + +extern CSN TransactionIdGetCSN(TransactionId xid); + +extern void CSNSnapshotAbort(PGPROC *proc, TransactionId xid, int nsubxids, + TransactionId *subxids); +extern void CSNSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids, + TransactionId *subxids); +extern void CSNSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids, + TransactionId *subxids); +extern void CSNSnapshotAssignCurrent(SnapshotCSN snapshot_csn); +extern SnapshotCSN CSNSnapshotPrepareCurrent(void); +extern void CSNSnapshotSync(SnapshotCSN remote_csn); + +#endif /* CSN_SNAPSHOT_H */ diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 463bcb67c57..c11ddcc624f 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +PG_RMGR(RM_CSNLOG_ID, "CSN", csnlog_redo, csnlog_desc, csnlog_identify, NULL, NULL, NULL, NULL) diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index b0fd338a00c..cd9d2a3c0cc 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -280,6 +280,7 @@ typedef struct xl_parameter_change int wal_level; bool wal_log_hints; bool track_commit_timestamp; + bool enable_csn_snapshot; } xl_parameter_change; /* logs restore point */ @@ -400,5 +401,6 @@ extern PGDLLIMPORT bool ArchiveRecoveryRequested; extern PGDLLIMPORT bool InArchiveRecovery; extern PGDLLIMPORT bool StandbyMode; extern PGDLLIMPORT char *recoveryRestoreCommand; +extern PGDLLIMPORT bool enable_csn_wal; #endif /* XLOG_INTERNAL_H */ diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index dc953977c5d..72a0c5ac0b6 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -181,6 +181,7 @@ typedef struct ControlFileData int max_prepared_xacts; int max_locks_per_xact; bool track_commit_timestamp; + bool enable_csn_snapshot; /* * This data is used to check for hardware-architecture compatibility of diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 6996073989a..e6df523fa7b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12043,4 +12043,21 @@ proname => 'any_value_transfn', prorettype => 'anyelement', proargtypes => 'anyelement anyelement', prosrc => 'any_value_transfn' }, + # csn shnapshot handling +{ oid => '10001', descr => 'export csn snapshot', + proname => 'pg_csn_snapshot_export', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => '', prosrc => 'pg_csn_snapshot_export' }, +{ oid => '10002', descr => 'import csn snapshot', + proname => 'pg_csn_snapshot_import', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => 'int8', prosrc => 'pg_csn_snapshot_import' }, +{ oid => '10003', descr => 'prepare distributed transaction for commit, get csn', + proname => 'pg_csn_snapshot_prepare', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => 'text', prosrc => 'pg_csn_snapshot_prepare' }, +{ oid => '10004', descr => 'assign csn to distributed transaction', + proname => 'pg_csn_snapshot_assign', provolatile => 'v', proparallel => 'u', + prorettype => 'void', proargtypes => 'text int8', prosrc => 'pg_csn_snapshot_assign' }, +{ oid => '10005', descr => 'get current CSN', + proname => 'pg_current_csn', provolatile => 'v', proparallel => 'u', + prorettype => 'int8', proargtypes => '', prosrc => 'pg_current_csn' }, + ] diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h index ab8ccf89ca9..2cab63bbec7 100644 --- a/src/include/datatype/timestamp.h +++ b/src/include/datatype/timestamp.h @@ -132,6 +132,9 @@ struct pg_itm_in #define USECS_PER_MINUTE INT64CONST(60000000) #define USECS_PER_SEC INT64CONST(1000000) +#define NSECS_PER_SEC INT64CONST(1000000000) +#define NSECS_PER_USEC INT64CONST(1000) + /* * We allow numeric timezone offsets up to 15:59:59 either way from Greenwich. * Currently, the record holders for wackiest offsets in actual use are zones diff --git a/src/include/fmgr.h b/src/include/fmgr.h index b120f5e7fef..92933173969 100644 --- a/src/include/fmgr.h +++ b/src/include/fmgr.h @@ -281,6 +281,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum); #define PG_GETARG_FLOAT4(n) DatumGetFloat4(PG_GETARG_DATUM(n)) #define PG_GETARG_FLOAT8(n) DatumGetFloat8(PG_GETARG_DATUM(n)) #define PG_GETARG_INT64(n) DatumGetInt64(PG_GETARG_DATUM(n)) +#define PG_GETARG_UINT64(n) DatumGetUInt64(PG_GETARG_DATUM(n)) /* use this if you want the raw, possibly-toasted input datum: */ #define PG_GETARG_RAW_VARLENA_P(n) ((struct varlena *) PG_GETARG_POINTER(n)) /* use this if you want the input datum de-toasted: */ diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 34169e5889e..ff886365138 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -181,6 +181,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_XACT_BUFFER = NUM_INDIVIDUAL_LWLOCKS, LWTRANCHE_COMMITTS_BUFFER, LWTRANCHE_SUBTRANS_BUFFER, + LWTRANCHE_CSN_LOG_BUFFERS, LWTRANCHE_MULTIXACTOFFSET_BUFFER, LWTRANCHE_MULTIXACTMEMBER_BUFFER, LWTRANCHE_NOTIFY_BUFFER, diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index ef74f326932..c372076f3c4 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -15,12 +15,14 @@ #define _PROC_H_ #include "access/clog.h" +#include "access/csn_snapshot.h" #include "access/xlogdefs.h" #include "lib/ilist.h" #include "storage/latch.h" #include "storage/lock.h" #include "storage/pg_sema.h" #include "storage/proclist_types.h" +#include "utils/snapshot.h" /* * Each backend advertises up to PGPROC_MAX_CACHED_SUBXIDS TransactionIds @@ -301,6 +303,18 @@ struct PGPROC PGPROC *lockGroupLeader; /* lock group leader, if I'm a member */ dlist_head lockGroupMembers; /* list of members, if I'm a leader */ dlist_node lockGroupLink; /* my member link, if I'm a member */ + + /* + * assignedCSN holds CSN for this transaction. It is generated + * under a ProcArray lock and later is written to a CSNLog. This + * variable defined as atomic only for case of group commit, in all other + * scenarios only backend responsible for this proc entry is working with + * this variable. + */ + CSN_atomic assignedCSN; + + /* Original xmin of this backend before csn snapshot was imported */ + TransactionId originalXmin; }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index d8cae3ce1c5..87e63813d1b 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -20,6 +20,10 @@ #include "utils/snapshot.h" +#define PROCARRAY_NON_IMPORTED_XMIN 0x80 /* use originalXmin instead + * of xmin to properly + * maintain csnXidMap */ + extern Size ProcArrayShmemSize(void); extern void CreateSharedProcArray(void); extern void ProcArrayAdd(PGPROC *proc); @@ -96,4 +100,7 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin); +extern void ProcArraySetCSNSnapshotXmin(TransactionId xmin); + +extern TransactionId ProcArrayGetCSNSnapshotXmin(void); #endif /* PROCARRAY_H */ diff --git a/src/include/storage/sync.h b/src/include/storage/sync.h index cfbcfa6797d..4233c24ad30 100644 --- a/src/include/storage/sync.h +++ b/src/include/storage/sync.h @@ -39,6 +39,7 @@ typedef enum SyncRequestHandler SYNC_HANDLER_COMMIT_TS, SYNC_HANDLER_MULTIXACT_OFFSET, SYNC_HANDLER_MULTIXACT_MEMBER, + SYNC_HANDLER_CSN, SYNC_HANDLER_NONE } SyncRequestHandler; diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 980d37a1947..6e5905631c8 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -41,10 +41,11 @@ && !RelationIsAccessibleInLogicalDecoding(rel) \ ) -#define EarlyPruningEnabled(rel) (old_snapshot_threshold >= 0 && RelationAllowsEarlyPruning(rel)) +#define EarlyPruningEnabled(rel) (old_snapshot_threshold >= 0 && !enable_csn_snapshot && RelationAllowsEarlyPruning(rel)) /* GUC variables */ extern PGDLLIMPORT int old_snapshot_threshold; +extern PGDLLIMPORT bool enable_csn_snapshot; extern Size SnapMgrShmemSize(void); @@ -101,7 +102,7 @@ extern PGDLLIMPORT SnapshotData CatalogSnapshotData; static inline bool OldSnapshotThresholdActive(void) { - return old_snapshot_threshold >= 0; + return (old_snapshot_threshold >= 0) && (!enable_csn_snapshot); } #endif @@ -132,6 +133,8 @@ extern void AtSubCommit_Snapshot(int level); extern void AtSubAbort_Snapshot(int level); extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin); +extern SnapshotCSN ExportCSNSnapshot(void); +extern void ImportCSNSnapshot(SnapshotCSN snapshot_csn); extern void ImportSnapshot(const char *idstr); extern bool XactHasExportedSnapshots(void); extern void DeleteAllExportedSnapshotFiles(void); diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index 583a667a40a..fc65242b7ae 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -121,6 +121,9 @@ typedef enum SnapshotType typedef struct SnapshotData *Snapshot; #define InvalidSnapshot ((Snapshot) NULL) +#define InvalidCSN ((CSN) 0) +typedef uint64 CSN; +typedef uint64 SnapshotCSN; /* * Struct representing all kind of possible snapshots. @@ -214,6 +217,14 @@ typedef struct SnapshotData * transactions completed since the last GetSnapshotData(). */ uint64 snapXactCompletionCount; + + /* + * SnapshotCSN for snapshot isolation support. + * Will be used only if enable_csn_snapshot is enabled. + */ + SnapshotCSN snapshot_csn; + /* Did we have our own snapshot_csn or imported one from different node */ + bool imported_csn; } SnapshotData; #endif /* SNAPSHOT_H */ diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 6331c976dcb..0241fa96d7b 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -7,6 +7,7 @@ include $(top_builddir)/src/Makefile.global SUBDIRS = \ brin \ commit_ts \ + csnsnapshot \ delay_execution \ dummy_index_am \ dummy_seclabel \ diff --git a/src/test/modules/csnsnapshot/Makefile b/src/test/modules/csnsnapshot/Makefile new file mode 100644 index 00000000000..15a07f88469 --- /dev/null +++ b/src/test/modules/csnsnapshot/Makefile @@ -0,0 +1,22 @@ +# src/test/modules/csnsnapshot/Makefile + +NO_INSTALLCHECK = 1 + +TAP_TESTS = 1 + +# Doesn't support full consistency of distributed commit in READ COMMITTED +# transactions. +PROVE_TESTS = t/001_base.pl \ + t/002_standby.pl \ + t/003_parallel_safe.pl + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/csnsnapshot +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/csnsnapshot/expected/csnsnapshot.out b/src/test/modules/csnsnapshot/expected/csnsnapshot.out new file mode 100644 index 00000000000..ac28e417b67 --- /dev/null +++ b/src/test/modules/csnsnapshot/expected/csnsnapshot.out @@ -0,0 +1 @@ +create table t1(i int, j int, k varchar); diff --git a/src/test/modules/csnsnapshot/t/001_base.pl b/src/test/modules/csnsnapshot/t/001_base.pl new file mode 100644 index 00000000000..b81419512e0 --- /dev/null +++ b/src/test/modules/csnsnapshot/t/001_base.pl @@ -0,0 +1,100 @@ +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More tests => 5; + +my ($node, $test_snapshot, $count1, $count2); +$node = PostgreSQL::Test::Cluster->new('csntest'); +$node->init; +$node->append_conf('postgresql.conf', qq{ + enable_csn_snapshot = on + csn_snapshot_defer_time = 10 + max_prepared_transactions = 10 + }); +$node->start; + +# Create a table +$node->safe_psql('postgres', 'create table t1(i int, j int)'); + +# insert test record +$node->safe_psql('postgres', 'insert into t1 values(1,1)'); +# export csn snapshot +$test_snapshot = $node->safe_psql('postgres', 'select pg_csn_snapshot_export()'); +# insert test record +$node->safe_psql('postgres', 'insert into t1 values(2,1)'); + +$count1 = $node->safe_psql('postgres', "select count(*) from t1"); +is($count1, '2', 'Get right number in normal query'); +$count2 = $node->safe_psql('postgres', " + begin transaction isolation level repeatable read; + select pg_csn_snapshot_import($test_snapshot); + select count(*) from t1; + commit;" + ); + +is($count2, ' +1', 'Get right number in csn import query'); + +#prepare transaction test +$node->safe_psql('postgres', " + begin; + insert into t1 values(3,1); + insert into t1 values(3,2); + prepare transaction 'pt3'; + "); +$node->safe_psql('postgres', " + begin; + insert into t1 values(4,1); + insert into t1 values(4,2); + prepare transaction 'pt4'; + "); +$node->safe_psql('postgres', " + begin; + insert into t1 values(5,1); + insert into t1 values(5,2); + prepare transaction 'pt5'; + "); +$node->safe_psql('postgres', " + begin; + insert into t1 values(6,1); + insert into t1 values(6,2); + prepare transaction 'pt6'; + "); +$node->safe_psql('postgres', "commit prepared 'pt4';"); + +# restart with enable_csn_snapshot off +$node->append_conf('postgresql.conf', "enable_csn_snapshot = off"); +$node->restart; +$node->safe_psql('postgres', " + insert into t1 values(7,1); + insert into t1 values(7,2); + "); +$node->safe_psql('postgres', "commit prepared 'pt3';"); +$count1 = $node->safe_psql('postgres', "select count(*) from t1"); +is($count1, '8', 'Get right number in normal query'); + + +# restart with enable_csn_snapshot on +$node->append_conf('postgresql.conf', "enable_csn_snapshot = on"); +$node->restart; +$node->safe_psql('postgres', " + insert into t1 values(8,1); + insert into t1 values(8,2); + "); +$node->safe_psql('postgres', "commit prepared 'pt5';"); +$count1 = $node->safe_psql('postgres', "select count(*) from t1"); +is($count1, '12', 'Get right number in normal query'); + +# restart with enable_csn_snapshot off +$node->append_conf('postgresql.conf', "enable_csn_snapshot = on"); +$node->restart; +$node->safe_psql('postgres', " + insert into t1 values(9,1); + insert into t1 values(9,2); + "); +$node->safe_psql('postgres', "commit prepared 'pt6';"); + +$count1 = $node->safe_psql('postgres', "select count(*) from t1"); +is($count1, '16', 'Get right number in normal query'); diff --git a/src/test/modules/csnsnapshot/t/002_standby.pl b/src/test/modules/csnsnapshot/t/002_standby.pl new file mode 100644 index 00000000000..27fcbb8f8ab --- /dev/null +++ b/src/test/modules/csnsnapshot/t/002_standby.pl @@ -0,0 +1,68 @@ +# Test simple scenario involving a standby + +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More tests => 6; + +my ($master, $bkplabel, $standby, $guc_on_master, $guc_on_standby); + +$bkplabel = 'backup'; +$master = PostgreSQL::Test::Cluster->new('master'); +$master->init(allows_streaming => 1); + +$master->append_conf( + 'postgresql.conf', qq{ + enable_csn_snapshot = on + max_wal_senders = 5 + }); +$master->start; +$master->backup($bkplabel); + +$standby = PostgreSQL::Test::Cluster->new('standby'); +$standby->init_from_backup($master, $bkplabel, has_streaming => 1); +$standby->start; + +$master->safe_psql('postgres', "create table t1(i int, j int)"); + +$guc_on_master = $master->safe_psql('postgres', 'show enable_csn_snapshot'); +is($guc_on_master, 'on', "GUC on master"); + +$guc_on_standby = $standby->safe_psql('postgres', 'show enable_csn_snapshot'); +is($guc_on_standby, 'on', "GUC on standby"); + +$master->append_conf('postgresql.conf', 'enable_csn_snapshot = off'); +$master->restart; + +$guc_on_master = $master->safe_psql('postgres', 'show enable_csn_snapshot'); +is($guc_on_master, 'off', "GUC off master"); + +$guc_on_standby = $standby->safe_psql('postgres', 'show enable_csn_snapshot'); +is($guc_on_standby, 'on', "GUC on standby"); + +# We consume a large number of transaction,for skip page +for my $i (1 .. 4096) #4096 +{ + $master->safe_psql('postgres', "insert into t1 values(1,$i)"); +} +$master->safe_psql('postgres', "select pg_sleep(2)"); +$master->append_conf('postgresql.conf', 'enable_csn_snapshot = on'); +$master->restart; + +my $count_standby = $standby->safe_psql('postgres', 'select count(*) from t1'); +is($count_standby, '4096', "Ok for siwtch xid-base > csn-base"); #4096 + +# We consume a large number of transaction,for skip page +for my $i (1 .. 4096) #4096 +{ + $master->safe_psql('postgres', "insert into t1 values(1,$i)"); +} +$master->safe_psql('postgres', "select pg_sleep(2)"); + +$master->append_conf('postgresql.conf', 'enable_csn_snapshot = off'); +$master->restart; + +$count_standby = $standby->safe_psql('postgres', 'select count(*) from t1'); +is($count_standby, '8192', "Ok for switch csn-base > xid-base"); #8192 \ No newline at end of file diff --git a/src/test/modules/csnsnapshot/t/003_parallel_safe.pl b/src/test/modules/csnsnapshot/t/003_parallel_safe.pl new file mode 100644 index 00000000000..e303e3f1a6b --- /dev/null +++ b/src/test/modules/csnsnapshot/t/003_parallel_safe.pl @@ -0,0 +1,67 @@ +# Check safety of CSN machinery for parallel mode. + +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More tests => 2; + +my ($node, $updScr, $selScr, $started, $pgb_handle1, $result, $errors); + +$node = PostgreSQL::Test::Cluster->new('csntest'); +$node->init; +$node->append_conf('postgresql.conf', qq{ + enable_csn_snapshot = on + csn_snapshot_defer_time = 10 + default_transaction_isolation = 'REPEATABLE READ' + + # force parallel mode. + max_worker_processes = 64 + max_parallel_workers_per_gather = 16 + max_parallel_workers = 32 + parallel_setup_cost = 1 + parallel_tuple_cost = 0.05 + min_parallel_table_scan_size = 0 +}); +$node->start; + +$node->command_ok([ 'pgbench', '-i', '-s', '1' ], "pgbench initialization ok"); +$node->safe_psql('postgres', qq{ + CREATE OR REPLACE FUNCTION cnt() RETURNS integer AS ' + SELECT sum(abalance) FROM pgbench_accounts; + ' LANGUAGE SQL PARALLEL SAFE COST 100000.; +}); + + +$updScr = File::Temp->new(); +append_to_file($updScr, q{ + UPDATE pgbench_accounts SET abalance = abalance + 1 WHERE aid = 1; +}); + +$selScr = ' + SELECT count(*) AS res FROM ( + SELECT cnt() AS y FROM pgbench_accounts WHERE aid < 20 + GROUP BY (y) + ) AS q; +'; + +# Launch updates +$pgb_handle1 = $node->pgbench_async(-n, -T => 10, -f => $updScr, 'postgres' ); + +$errors = 0; +$started = time(); +while (time() - $started < 10) +{ + # Check that each worker returns the same sum on balance column. + $result = $node->safe_psql('postgres', $selScr); + if ($result ne 1) + { + $errors++; + diag("Workers returned different sums: $result"); + } +} +is($errors, 0, 'isolation between UPDATE and concurrent SELECT workers.'); + +$node->pgbench_await($pgb_handle1); +$node->stop(); \ No newline at end of file diff --git a/src/test/modules/snapshot_too_old/sto.conf b/src/test/modules/snapshot_too_old/sto.conf index 7eeaeeb0dc3..3177cc0e15e 100644 --- a/src/test/modules/snapshot_too_old/sto.conf +++ b/src/test/modules/snapshot_too_old/sto.conf @@ -1,2 +1,3 @@ autovacuum = off old_snapshot_threshold = 0 +enable_csn_snapshot = false diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index 5e161dbee60..2ecb6160dbf 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -2166,6 +2166,34 @@ sub pgbench $self->command_checks_all(\@cmd, $stat, $out, $err, $name); } +sub pgbench_async() +{ + my ($self, @args) = @_; + + my ($in, $out, $err, $rc); + $in = ''; + $out = ''; + + my @pgbench_command = ( + 'pgbench', + -h => $self->host, + -p => $self->port, + @args + ); + my $handle = IPC::Run::start(\@pgbench_command, $in, $out); + return $handle; +} + +sub pgbench_await() +{ + my ($self, $pgbench_handle) = @_; + + # During run some pgbench threads can exit (for example due to + # serialization error). That will set non-zero returning code. + # So don't check return code here and leave it to a caller. + my $rc = IPC::Run::finish($pgbench_handle); +} + =pod =item $node->connect_ok($connstr, $test_name, %params) diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 001c6e7eb9d..7dd48bf016e 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -113,6 +113,8 @@ select name, setting from pg_settings where name like 'enable%'; --------------------------------+--------- enable_async_append | on enable_bitmapscan | on + enable_csn_snapshot | on + enable_csn_wal | on enable_gathermerge | on enable_hashagg | on enable_hashjoin | on @@ -132,7 +134,7 @@ select name, setting from pg_settings where name like 'enable%'; enable_seqscan | on enable_sort | on enable_tidscan | on -(21 rows) +(23 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail