From 0285f2200f24b72e72336a6e6fcbcecf9e6e6322 Mon Sep 17 00:00:00 2001 From: wangjingyuan8 <1577039175@qq.com> Date: Mon, 24 Jun 2024 15:51:15 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=99=AE=E9=80=9A=E5=9B=9E=E6=94=BE?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E9=97=B4=E5=90=8C=E6=AD=A5+bugfix?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/backend/catalog/storage.cpp | 9 + .../backend/pgxc_single/barrier/barrier.cpp | 18 + src/common/backend/utils/cache/relmapper.cpp | 3 + .../ddes/adapter/ss_dms_callback.cpp | 5 + .../ddes/adapter/ss_transaction.cpp | 32 ++ .../optimizer/commands/dbcommands.cpp | 18 + .../optimizer/commands/sequence/sequence.cpp | 12 + .../commands/sequence/sequence_util.cpp | 3 + .../optimizer/commands/tablecmds.cpp | 12 + .../optimizer/commands/tablespace.cpp | 6 + .../optimizer/commands/vacuumlazy.cpp | 9 + .../process/threadpool/knl_instance.cpp | 3 + .../process/threadpool/knl_thread.cpp | 2 +- .../storage/access/gin/ginbtree.cpp | 6 + .../storage/access/gin/gindatapage.cpp | 6 + .../storage/access/gin/ginfast.cpp | 9 + .../storage/access/gin/gininsert.cpp | 3 + .../storage/access/gin/ginutil.cpp | 3 + .../storage/access/gin/ginvacuum.cpp | 6 + src/gausskernel/storage/access/gist/gist.cpp | 6 + .../storage/access/gist/gistbuild.cpp | 3 + .../storage/access/gist/gistvacuum.cpp | 3 + src/gausskernel/storage/access/hash/hash.cpp | 6 + .../storage/access/hash/hashinsert.cpp | 6 + .../storage/access/hash/hashovfl.cpp | 9 + .../storage/access/hash/hashpage.cpp | 27 +- .../storage/access/heap/heapam.cpp | 50 ++- .../storage/access/heap/pruneheap.cpp | 3 + .../storage/access/heap/rewriteheap.cpp | 9 + .../storage/access/heap/visibilitymap.cpp | 3 + .../storage/access/nbtree/nbtdedup.cpp | 5 +- .../storage/access/nbtree/nbtinsert.cpp | 11 +- .../storage/access/nbtree/nbtpage.cpp | 21 ++ .../storage/access/nbtree/nbtree.cpp | 3 + .../storage/access/nbtree/nbtsort.cpp | 6 + .../storage/access/spgist/spgdoinsert.cpp | 18 + .../storage/access/spgist/spginsert.cpp | 14 +- .../storage/access/spgist/spgvacuum.cpp | 9 + .../storage/access/transam/clog.cpp | 6 + .../storage/access/transam/multi_redo_api.cpp | 14 +- .../storage/access/transam/multixact.cpp | 8 +- .../storage/access/transam/twophase.cpp | 9 + .../storage/access/transam/xact.cpp | 12 + .../storage/access/transam/xlog.cpp | 308 ++++++++++-------- .../storage/access/transam/xloginsert.cpp | 4 + src/gausskernel/storage/buffer/bufmgr.cpp | 3 + src/gausskernel/storage/ipc/standby.cpp | 3 + src/gausskernel/storage/lmgr/lwlock.cpp | 8 +- src/gausskernel/storage/replication/bcm.cpp | 6 + .../storage/replication/slotfuncs.cpp | 15 + src/gausskernel/storage/smgr/cfs/cfs_md.cpp | 3 + .../storage/smgr/segment/extent_group.cpp | 13 + .../storage/smgr/segment/space.cpp | 9 + .../storage/smgr/segment/xlog_atomic_op.cpp | 3 + src/gausskernel/storage/smgr/segstore.cpp | 3 + src/include/access/multixact.h | 2 +- src/include/access/xlog.h | 8 +- src/include/access/xlog_basic.h | 5 - src/include/ddes/dms/ss_common_attr.h | 3 + src/include/ddes/dms/ss_dms_recovery.h | 20 +- src/include/ddes/dms/ss_transaction.h | 4 + src/include/knl/knl_thread.h | 3 +- 62 files changed, 688 insertions(+), 161 deletions(-) diff --git a/src/common/backend/catalog/storage.cpp b/src/common/backend/catalog/storage.cpp index 922cbda6b..ede441c0d 100644 --- a/src/common/backend/catalog/storage.cpp +++ b/src/common/backend/catalog/storage.cpp @@ -327,6 +327,9 @@ void log_smgrcreate(RelFileNode* rnode, ForkNumber forkNum) XLogBeginInsert(); XLogRegisterData((char*)&xlrec, (int)size); XLogInsert(RM_SMGR_ID, info, rnode->bucketNode); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } static void CStoreRelDropStorage(Relation rel, RelFileNode* rnode, Oid ownerid) @@ -692,6 +695,9 @@ void RelationTruncate(Relation rel, BlockNumber nblocks, TransactionId latest_re XLogRegisterData((char*)&xlrec, (int)size); lsn = XLogInsert(RM_SMGR_ID, info, rel->rd_node.bucketNode); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* * Flush, because otherwise the truncation of the main relation might @@ -781,6 +787,9 @@ void PartitionTruncate(Relation parent, Partition part, BlockNumber nblocks, Tra XLogRegisterData((char*)&xlrec, redoSize); lsn = XLogInsert(RM_SMGR_ID, info, part->pd_node.bucketNode); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* * Flush, because otherwise the truncation of the main relation might diff --git a/src/common/backend/pgxc_single/barrier/barrier.cpp b/src/common/backend/pgxc_single/barrier/barrier.cpp index eef9408f1..43a35f223 100755 --- a/src/common/backend/pgxc_single/barrier/barrier.cpp +++ b/src/common/backend/pgxc_single/barrier/barrier.cpp @@ -150,6 +150,9 @@ void ProcessCreateBarrierCommit(const char* id) XLogRegisterData((char*)id, strlen(id) + 1); XLogRecPtr recptr = XLogInsert(RM_BARRIER_ID, XLOG_BARRIER_COMMIT, InvalidBktId); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); @@ -217,6 +220,9 @@ void ProcessCreateBarrierExecute(const char* id, bool isSwitchoverBarrier) } recptr = XLogInsert(RM_BARRIER_ID, isSwitchoverBarrier? XLOG_BARRIER_SWITCHOVER : XLOG_BARRIER_CREATE, InvalidBktId); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); if (IS_CSN_BARRIER(id) && !isSwitchoverBarrier) { @@ -400,6 +406,9 @@ void DisasterRecoveryRequestBarrier(const char* id, bool isSwitchoverBarrier) XLogRegisterData((char*)id, strlen(id) + 1); recptr = XLogInsert(RM_BARRIER_ID, XLOG_BARRIER_CREATE, InvalidBktId); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); #ifndef ENABLE_LITE_MODE if (t_thrd.role == BARRIER_CREATOR) { @@ -441,6 +450,9 @@ void CreateHadrSwitchoverBarrier() XLogRegisterData((char*)barrier_id, strlen(barrier_id) + 1); recptr = XLogInsert(RM_BARRIER_ID, XLOG_BARRIER_SWITCHOVER, InvalidBktId); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); g_instance.streaming_dr_cxt.switchoverBarrierLsn = recptr; @@ -860,6 +872,9 @@ static void ExecuteBarrier(const char* id, bool isSwitchoverBarrier) } recptr = XLogInsert(RM_BARRIER_ID, isSwitchoverBarrier? XLOG_BARRIER_SWITCHOVER : XLOG_BARRIER_CREATE, InvalidBktId); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); if (IS_CSN_BARRIER(id) && !isSwitchoverBarrier) { @@ -932,6 +947,9 @@ static void CommitBarrier(PGXCNodeAllHandles* prepared_handles, const char* id) XLogRegisterData((char*)id, strlen(id) + 1); XLogRecPtr recptr = XLogInsert(RM_BARRIER_ID, XLOG_BARRIER_COMMIT, InvalidBktId); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); } diff --git a/src/common/backend/utils/cache/relmapper.cpp b/src/common/backend/utils/cache/relmapper.cpp index f8e482370..c9f8bc682 100644 --- a/src/common/backend/utils/cache/relmapper.cpp +++ b/src/common/backend/utils/cache/relmapper.cpp @@ -824,6 +824,9 @@ static void write_relmap_file(bool shared, RelMapFile* newmap, bool write_wal, b XLogRegisterData((char*)(&xlrec), MinSizeOfRelmapUpdate); RegistRelMapWal(newmap); lsn = XLogInsert(RM_RELMAP_ID, XLOG_RELMAP_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* As always, WAL must hit the disk before the data update does */ XLogWaitFlush(lsn); diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index e0c1f2b64..f60b68d64 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -1255,6 +1255,11 @@ static int32 CBProcessBroadcast(void *db_handle, dms_broadcast_context_t *broad_ case BCAST_RELOAD_REFORM_CTRL_PAGE: ret = SSReloadReformCtrlPage(len); break; +#ifdef ENABLE_SS_MULTIMASTER + case BCAST_REDO_DONE: + ret = SSUpdateArgs(len); + break; +#endif default: ereport(WARNING, (errmodule(MOD_DMS), errmsg("invalid broadcast operate type"))); ret = DMS_ERROR; diff --git a/src/gausskernel/ddes/adapter/ss_transaction.cpp b/src/gausskernel/ddes/adapter/ss_transaction.cpp index ff5892125..c2fb99c89 100644 --- a/src/gausskernel/ddes/adapter/ss_transaction.cpp +++ b/src/gausskernel/ddes/adapter/ss_transaction.cpp @@ -653,6 +653,18 @@ int SSReloadReformCtrlPage(uint32 len) return DMS_SUCCESS; } +#ifdef ENABLE_SS_MULTIMASTER +int SSUpdateArgs(uint32 len) +{ + if (unlikely(len != sizeof(SSBroadcastCmdOnly))) { + return DMS_ERROR; + } + + g_instance.dms_cxt.SSRecoveryInfo.redo_done = true; + return DMS_SUCCESS; +} +#endif + int SSCheckDbBackends(char *data, uint32 len, char *output_msg, uint32 *output_msg_len) { if (unlikely(len != sizeof(SSBroadcastDbBackends))) { @@ -722,6 +734,26 @@ void SSRequestAllStandbyReloadReformCtrlPage() } while (ret != DMS_SUCCESS); } +#ifdef ENABLE_SS_MULTIMASTER +void SSRequestAllNodeUpdateArgs() +{ + dms_context_t dms_ctx; + InitDmsContext(&dms_ctx); + int ret; + SSBroadcastCmdOnly ssmsg; + ssmsg.type = BCAST_REDO_DONE; + do { + ret = dms_broadcast_msg(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastCmdOnly), + (unsigned char)false, SS_BROADCAST_WAIT_ONE_SECOND); + + if (ret == DMS_SUCCESS) { + return; + } + pg_usleep(5000L); + } while (ret != DMS_SUCCESS); +} +#endif + void SSSendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n) { dms_context_t dms_ctx; diff --git a/src/gausskernel/optimizer/commands/dbcommands.cpp b/src/gausskernel/optimizer/commands/dbcommands.cpp index 45d0a25bc..dc6938aa9 100644 --- a/src/gausskernel/optimizer/commands/dbcommands.cpp +++ b/src/gausskernel/optimizer/commands/dbcommands.cpp @@ -686,6 +686,9 @@ Oid createdb(const CreatedbStmt* stmt) XLogRegisterData((char*)&xlrec, sizeof(xl_dbase_create_rec)); (void)XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } } @@ -709,6 +712,9 @@ Oid createdb(const CreatedbStmt* stmt) XLogBeginInsert(); XLogRegisterData((char*)&xlrec, sizeof(xl_dbase_create_rec)); (void)XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif pfree_ext(srcpath); pfree_ext(dstpath); @@ -1030,6 +1036,9 @@ static void DropdbXactCallback(bool isCommit, const void* arg) XLogRegisterData((char*)&xlrec, sizeof(xl_dbase_drop_rec)); (void)XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } pfree_ext(dstpath); @@ -1588,6 +1597,9 @@ static void movedb(const char* dbname, const char* tblspcname) XLogRegisterData((char*)&xlrec, sizeof(xl_dbase_create_rec)); (void)XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* @@ -1711,6 +1723,9 @@ static void movedb_success_callback(Oid db_id, Oid src_tblspcoid) XLogRegisterData((char*)&xlrec, sizeof(xl_dbase_drop_rec)); (void)XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* Now it's safe to release the database lock */ @@ -2217,6 +2232,9 @@ static void remove_dbtablespaces(Oid db_id) XLogRegisterData((char*)&xlrec, sizeof(xl_dbase_drop_rec)); (void)XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } pfree_ext(dstpath); diff --git a/src/gausskernel/optimizer/commands/sequence/sequence.cpp b/src/gausskernel/optimizer/commands/sequence/sequence.cpp index 36543f4a0..72eb7a540 100644 --- a/src/gausskernel/optimizer/commands/sequence/sequence.cpp +++ b/src/gausskernel/optimizer/commands/sequence/sequence.cpp @@ -648,6 +648,9 @@ static int128 GetNextvalLocal(SeqTable elm, Relation seqrel) recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, seqrel->rd_node.bucketNode); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* Now update sequence tuple to the intended final state */ @@ -1677,6 +1680,9 @@ void autoinc_setval(Oid relid, int128 next, bool iscalled) recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, seqrel->rd_node.bucketNode); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1843,6 +1849,9 @@ static void do_setval(Oid relid, int128 next, bool iscalled) recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, seqrel->rd_node.bucketNode); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -2938,6 +2947,9 @@ static void updateNextValForSequence(Buffer buf, Form_pg_sequence seq, HeapTuple XLogRegisterData((char*)seqtuple.t_data, seqtuple.t_len); recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, seqrel->rd_node.bucketNode); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* * We must mark the buffer dirty before doing XLogInsert(); see notes in diff --git a/src/gausskernel/optimizer/commands/sequence/sequence_util.cpp b/src/gausskernel/optimizer/commands/sequence/sequence_util.cpp index d3c095982..1d72bafb9 100644 --- a/src/gausskernel/optimizer/commands/sequence/sequence_util.cpp +++ b/src/gausskernel/optimizer/commands/sequence/sequence_util.cpp @@ -125,6 +125,9 @@ void fill_seq_with_data(Relation rel, HeapTuple tuple) recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rel->rd_node.bucketNode); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/optimizer/commands/tablecmds.cpp b/src/gausskernel/optimizer/commands/tablecmds.cpp index e411cb485..5ea09df82 100755 --- a/src/gausskernel/optimizer/commands/tablecmds.cpp +++ b/src/gausskernel/optimizer/commands/tablecmds.cpp @@ -19682,6 +19682,9 @@ static void copy_relation_data(Relation rel, SMgrRelation* dstptr, ForkNumber fo errno_t rc = memcpy_s(BufferGetBlock(buf), BLCKSZ, page, BLCKSZ); securec_check(rc, "\0", "\0"); PageSetLSN(BufferGetPage(buf), xlog_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif MarkBufferDirty(buf); UnlockReleaseBuffer(buf); @@ -19693,6 +19696,9 @@ static void copy_relation_data(Relation rel, SMgrRelation* dstptr, ForkNumber fo */ if (use_wal) { log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page, false, &tde_info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } if (RelationisEncryptEnable(rel)) { @@ -19915,6 +19921,9 @@ static void mergeHeapBlock(Relation src, Relation dest, ForkNumber forkNum, char errno_t rc = memcpy_s(BufferGetBlock(buf), BLCKSZ, page, BLCKSZ); securec_check(rc, "\0", "\0"); PageSetLSN(BufferGetPage(buf), xlog_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif MarkBufferDirty(buf); UnlockReleaseBuffer(buf); } else { @@ -19925,6 +19934,9 @@ static void mergeHeapBlock(Relation src, Relation dest, ForkNumber forkNum, char RelationOpenSmgr(dest); if (use_wal) { log_newpage(&dest->rd_smgr->smgr_rnode.node, forkNum, dest_blkno, page, true, &tde_info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } if (RelationisEncryptEnable(src)) { diff --git a/src/gausskernel/optimizer/commands/tablespace.cpp b/src/gausskernel/optimizer/commands/tablespace.cpp index a953be97a..d7e0fcbba 100644 --- a/src/gausskernel/optimizer/commands/tablespace.cpp +++ b/src/gausskernel/optimizer/commands/tablespace.cpp @@ -793,6 +793,9 @@ Oid CreateTableSpace(CreateTableSpaceStmt* stmt) * So We use different xlog info to mark relative */ (void)XLogInsert(RM_TBLSPC_ID, relative ? XLOG_TBLSPC_RELATIVE_CREATE : XLOG_TBLSPC_CREATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* @@ -1026,6 +1029,9 @@ void DropTableSpace(DropTableSpaceStmt* stmt) XLogRegisterData((char*)&xlrec, sizeof(xl_tblspc_drop_rec)); (void)XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_DROP); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* diff --git a/src/gausskernel/optimizer/commands/vacuumlazy.cpp b/src/gausskernel/optimizer/commands/vacuumlazy.cpp index c34218a1f..5d86b0bed 100644 --- a/src/gausskernel/optimizer/commands/vacuumlazy.cpp +++ b/src/gausskernel/optimizer/commands/vacuumlazy.cpp @@ -1505,6 +1505,9 @@ static IndexBulkDeleteResult** lazy_scan_heap( changedMultiXid ? u_sess->cmd_cxt.MultiXactFrzLimit : InvalidMultiXactId, frozen, nfrozen); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); if (TransactionIdPrecedes(((HeapPageHeader)page)->pd_xid_base, u_sess->utils_cxt.RecentXmin)) { @@ -1522,6 +1525,9 @@ static IndexBulkDeleteResult** lazy_scan_heap( recptr = log_heap_invalid(onerel, buf, u_sess->cmd_cxt.FreezeLimit, invalid, ninvalid); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); if (TransactionIdPrecedes(((HeapPageHeader)page)->pd_xid_base, u_sess->utils_cxt.RecentXmin)) { @@ -1833,6 +1839,9 @@ static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, i recptr = log_heap_clean(onerel, buffer, NULL, 0, NULL, 0, unused, uncnt, vacrelstats->latestRemovedXid, true); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index afa7cdbdf..cb8729315 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -210,6 +210,9 @@ static void knl_g_dms_init(knl_g_dms_context *dms_cxt) dms_cxt->SSRecoveryInfo.recovery_trapped_in_page_request = false; dms_cxt->SSRecoveryInfo.dorado_sharestorage_inited = false; dms_cxt->SSRecoveryInfo.ondemand_recovery_pause_status = NOT_PAUSE; +#ifdef ENABLE_SS_MULTIMASTER + dms_cxt->SSRecoveryInfo.redo_done = false; +#endif dms_cxt->log_timezone = NULL; pg_atomic_init_u32(&dms_cxt->inDmsThreShmemInitCnt, 0); pg_atomic_init_u32(&dms_cxt->inProcExitCnt, 0); diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index a01fefe75..1a95ee6d1 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -437,7 +437,7 @@ static void knl_t_xlog_init(knl_t_xlog_context* xlog_cxt) xlog_cxt->XactLastCommitEnd = InvalidXLogRecPtr; xlog_cxt->RedoRecPtr = InvalidXLogRecPtr; #ifdef ENABLE_SS_MULTIMASTER - xlog_cxt->RedoLogicLSN = 0; + xlog_cxt->logicRedoRecPtr = 0; #endif xlog_cxt->doPageWrites = false; xlog_cxt->RedoStartLSN = InvalidXLogRecPtr; diff --git a/src/gausskernel/storage/access/gin/ginbtree.cpp b/src/gausskernel/storage/access/gin/ginbtree.cpp index bb2a323e1..dbe050392 100644 --- a/src/gausskernel/storage/access/gin/ginbtree.cpp +++ b/src/gausskernel/storage/access/gin/ginbtree.cpp @@ -392,6 +392,9 @@ static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, void *insertdat if (BufferIsValid(childbuf)) { PageSetLSN(childpage, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -546,6 +549,9 @@ static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack, void *insertdat if (BufferIsValid(childbuf)) { PageSetLSN(childpage, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/gin/gindatapage.cpp b/src/gausskernel/storage/access/gin/gindatapage.cpp index 842999f73..32e8e6ff1 100644 --- a/src/gausskernel/storage/access/gin/gindatapage.cpp +++ b/src/gausskernel/storage/access/gin/gindatapage.cpp @@ -780,6 +780,9 @@ void ginVacuumPostingTreeLeaf(Relation indexrel, Buffer buffer, GinVacuumState * XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen); recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_DATA_LEAF_PAGE); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1665,6 +1668,9 @@ BlockNumber createPostingTree(Relation index, ItemPointerData *items, uint32 nit recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } UnlockReleaseBuffer(buffer); diff --git a/src/gausskernel/storage/access/gin/ginfast.cpp b/src/gausskernel/storage/access/gin/ginfast.cpp index 0c2b6352c..76e7197aa 100644 --- a/src/gausskernel/storage/access/gin/ginfast.cpp +++ b/src/gausskernel/storage/access/gin/ginfast.cpp @@ -125,6 +125,9 @@ static int32 writeListPage(Relation index, Buffer buffer, IndexTuple *tuples, in recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* get free space before releasing buffer */ @@ -387,6 +390,9 @@ void ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) if (buffer != InvalidBuffer) { PageSetLSN(page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } if (buffer != InvalidBuffer) @@ -557,6 +563,9 @@ static void shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, bo page = BufferGetPage(buffers[i]); PageSetLSN(page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } for (i = 0; i < data.ndeleted; i++) diff --git a/src/gausskernel/storage/access/gin/gininsert.cpp b/src/gausskernel/storage/access/gin/gininsert.cpp index fcfa4e0d9..b95fafa60 100644 --- a/src/gausskernel/storage/access/gin/gininsert.cpp +++ b/src/gausskernel/storage/access/gin/gininsert.cpp @@ -335,6 +335,9 @@ static void buildInitialize(Relation index, GinBuildState *buildstate) page = BufferGetPage(MetaBuffer); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } UnlockReleaseBuffer(MetaBuffer); diff --git a/src/gausskernel/storage/access/gin/ginutil.cpp b/src/gausskernel/storage/access/gin/ginutil.cpp index 45fb22595..37542611a 100644 --- a/src/gausskernel/storage/access/gin/ginutil.cpp +++ b/src/gausskernel/storage/access/gin/ginutil.cpp @@ -590,6 +590,9 @@ void ginUpdateStats(Relation index, const GinStatsData *stats) XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT); recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE); PageSetLSN(metapage, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } UnlockReleaseBuffer(metabuffer); END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/gin/ginvacuum.cpp b/src/gausskernel/storage/access/gin/ginvacuum.cpp index a7a0d3ddd..84d36f922 100644 --- a/src/gausskernel/storage/access/gin/ginvacuum.cpp +++ b/src/gausskernel/storage/access/gin/ginvacuum.cpp @@ -98,6 +98,9 @@ static void xlogVacuumPage(Relation index, Buffer buffer) recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_VACUUM_PAGE); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } static bool ginVacuumPostingTreeLeaves(GinVacuumState *gvs, BlockNumber blkno, bool isRoot, Buffer *rootBuffer) @@ -246,6 +249,9 @@ static void ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNum PageSetLSN(page, recptr); PageSetLSN(parentPage, recptr); PageSetLSN(BufferGetPage(lBuffer), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } if (!isParentRoot) diff --git a/src/gausskernel/storage/access/gist/gist.cpp b/src/gausskernel/storage/access/gist/gist.cpp index 4ab541080..26c57ff43 100644 --- a/src/gausskernel/storage/access/gist/gist.cpp +++ b/src/gausskernel/storage/access/gist/gist.cpp @@ -412,6 +412,9 @@ bool gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, Buffer for (ptr = dist; ptr; ptr = ptr->next) { PageSetLSN(ptr->page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* * Return the new child buffers to the caller. @@ -454,6 +457,9 @@ bool gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate, Buffer recptr = gistXLogUpdate(buffer, deloffs, ndeloffs, itup, ntup, leftchildbuf); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } else { recptr = GetXLogRecPtrForTemp(); PageSetLSN(page, recptr); diff --git a/src/gausskernel/storage/access/gist/gistbuild.cpp b/src/gausskernel/storage/access/gist/gistbuild.cpp index 9d5e118c7..80ebec2c5 100644 --- a/src/gausskernel/storage/access/gist/gistbuild.cpp +++ b/src/gausskernel/storage/access/gist/gistbuild.cpp @@ -183,6 +183,9 @@ Datum gistbuild(PG_FUNCTION_ARGS) recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } else PageSetLSN(page, GetXLogRecPtrForTemp()); diff --git a/src/gausskernel/storage/access/gist/gistvacuum.cpp b/src/gausskernel/storage/access/gist/gistvacuum.cpp index 5763df2d8..4c19457f7 100644 --- a/src/gausskernel/storage/access/gist/gistvacuum.cpp +++ b/src/gausskernel/storage/access/gist/gistvacuum.cpp @@ -203,6 +203,9 @@ Datum gistbulkdelete(PG_FUNCTION_ARGS) recptr = gistXLogUpdate(buffer, todelete, ntodelete, NULL, 0, InvalidBuffer); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } else PageSetLSN(page, GetXLogRecPtrForTemp()); diff --git a/src/gausskernel/storage/access/hash/hash.cpp b/src/gausskernel/storage/access/hash/hash.cpp index e7cfe74d7..5f68e91e3 100644 --- a/src/gausskernel/storage/access/hash/hash.cpp +++ b/src/gausskernel/storage/access/hash/hash.cpp @@ -656,6 +656,9 @@ loop_top: recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_UPDATE_META_PAGE); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -860,6 +863,9 @@ void hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf, PageSetLSN(BufferGetPage(bucket_buf), recptr); } PageSetLSN(BufferGetPage(buf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/hash/hashinsert.cpp b/src/gausskernel/storage/access/hash/hashinsert.cpp index 6c28075a2..fec4dc812 100644 --- a/src/gausskernel/storage/access/hash/hashinsert.cpp +++ b/src/gausskernel/storage/access/hash/hashinsert.cpp @@ -216,6 +216,9 @@ restart_insert: PageSetLSN(BufferGetPage(buf), recptr); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -384,6 +387,9 @@ static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf, RelF PageSetLSN(BufferGetPage(buf), recptr); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/hash/hashovfl.cpp b/src/gausskernel/storage/access/hash/hashovfl.cpp index acaa4ee62..2848a69d7 100644 --- a/src/gausskernel/storage/access/hash/hashovfl.cpp +++ b/src/gausskernel/storage/access/hash/hashovfl.cpp @@ -397,6 +397,9 @@ found: PageSetLSN(BufferGetPage(newmapbuf), recptr); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -678,6 +681,9 @@ BlockNumber _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf, if (update_metap) PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -936,6 +942,9 @@ readpage: PageSetLSN(BufferGetPage(wbuf), recptr); PageSetLSN(BufferGetPage(rbuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/hash/hashpage.cpp b/src/gausskernel/storage/access/hash/hashpage.cpp index c8baba488..3f00702ff 100644 --- a/src/gausskernel/storage/access/hash/hashpage.cpp +++ b/src/gausskernel/storage/access/hash/hashpage.cpp @@ -385,6 +385,9 @@ uint32 _hash_init(Relation rel, double num_tuples, ForkNumber forkNum) recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_META_PAGE); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } num_buckets = metap->hashm_maxbucket + 1; @@ -411,12 +414,16 @@ uint32 _hash_init(Relation rel, double num_tuples, ForkNumber forkNum) _hash_initbuf(buf, metap->hashm_maxbucket, i, LH_BUCKET_PAGE, false); MarkBufferDirty(buf); - if (use_wal) + if (use_wal) { log_newpage(&rel->rd_node, forkNum, blkno, BufferGetPage(buf), true); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif + } _hash_relbuf(rel, buf); } @@ -465,6 +472,9 @@ uint32 _hash_init(Relation rel, double num_tuples, ForkNumber forkNum) PageSetLSN(BufferGetPage(bitmapbuf), recptr); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* all done */ @@ -903,6 +913,9 @@ restart_expand: PageSetLSN(BufferGetPage(buf_oblkno), recptr); PageSetLSN(BufferGetPage(buf_nblkno), recptr); PageSetLSN(BufferGetPage(metabuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -987,12 +1000,16 @@ static bool _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nbl ovflopaque->hasho_page_id = HASHO_PAGE_ID; PageSetChecksumInplace(zerobuf, lastblock); - if (RelationNeedsWAL(rel)) + if (RelationNeedsWAL(rel)) { log_newpage(&rel->rd_node, MAIN_FORKNUM, lastblock, zerobuf, true); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif + } if (IsSegmentFileNode(rel->rd_node)) { Buffer buf; @@ -1267,6 +1284,9 @@ static void _hash_splitbucket(Relation rel, Buffer metabuf, Bucket obucket, Buck PageSetLSN(BufferGetPage(bucket_obuf), recptr); PageSetLSN(BufferGetPage(bucket_nbuf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1427,6 +1447,9 @@ static void log_split_page(Relation rel, Buffer buf) recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_PAGE); PageSetLSN(BufferGetPage(buf), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } } diff --git a/src/gausskernel/storage/access/heap/heapam.cpp b/src/gausskernel/storage/access/heap/heapam.cpp index 333c41f9d..1ca7c3ff3 100755 --- a/src/gausskernel/storage/access/heap/heapam.cpp +++ b/src/gausskernel/storage/access/heap/heapam.cpp @@ -3103,6 +3103,9 @@ Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, Bu recptr = XLogInsert(RM_HEAP_ID, info, InvalidBktId, istoast); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (tdeinfo != NULL) { pfree_ext(tdeinfo); @@ -3313,6 +3316,9 @@ void heap_abort_speculative(Relation relation, HeapTuple tuple) XLOG_HEAP_DELETE | XLOG_TUPLE_LOCK_UPGRADE_FLAG); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -3479,6 +3485,9 @@ static void HeapPageShiftBase(Buffer buffer, Page page, bool multi, int64 delta) recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_BASE_SHIFT); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif END_CRIT_SECTION(); } @@ -3625,6 +3634,9 @@ static int freeze_single_heap_page(Relation relation, Buffer buffer) changedMultiXid ? freeze_mxid : InvalidMultiXactId, frozen, nfrozen); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -3639,6 +3651,9 @@ static int freeze_single_heap_page(Relation relation, Buffer buffer) XLogRecPtr recptr = log_heap_invalid(relation, buffer, freeze_xid, invalid, ninvalid); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -4328,6 +4343,9 @@ int heap_multi_insert(Relation relation, Relation parent, HeapTuple* tuples, int recptr = XLogInsert(RM_HEAP2_ID, info); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -4853,6 +4871,9 @@ l1: XLOG_HEAP_DELETE | XLOG_TUPLE_LOCK_UPGRADE_FLAG); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -5638,7 +5659,6 @@ l2: newbuf = buffer; heaptup = newtup; } - /* * We're about to create the new tuple -- check for conflict first, to * avoid possibly having to roll back work we've just done. @@ -5784,6 +5804,9 @@ l2: PageSetLSN(BufferGetPage(newbuf), recptr); } PageSetLSN(BufferGetPage(buffer), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -5856,6 +5879,9 @@ static XLogRecPtr log_heap_new_cid_insert(xl_heap_new_cid *xlrec, int bucketid) XLogRegisterData((char *) xlrec, SizeOfHeapNewCid); /* will be looked at irrespective of origin */ recptr = XLogInsert(RM_HEAP3_ID, XLOG_HEAP3_NEW_CID, bucketid); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif return recptr; } @@ -6887,6 +6913,9 @@ failed: recptr = XLogInsert(RM_HEAP_ID, useOldXlog ? XLOG_HEAP_LOCK : XLOG_HEAP_LOCK | XLOG_TUPLE_LOCK_UPGRADE_FLAG); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -7444,6 +7473,9 @@ l4: recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK | XLOG_TUPLE_LOCK_UPGRADE_FLAG); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -7611,6 +7643,9 @@ void heap_inplace_update(Relation relation, HeapTuple tuple, bool waitFlush) recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -8068,6 +8103,9 @@ XLogRecPtr log_heap_cleanup_info(const RelFileNode* rnode, TransactionId latest_ XLogRegisterData((char*)&xlrec, SizeOfHeapCleanupInfo); recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_CLEANUP_INFO, rnode->bucketNode); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif return recptr; } @@ -8698,6 +8736,9 @@ XLogRecPtr log_logical_newpage(RelFileNode* rnode, ForkNumber forkNum, BlockNumb recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_LOGICAL_NEWPAGE, rnode->bucketNode); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif PageSetLogical(page); END_CRIT_SECTION(); @@ -8756,13 +8797,18 @@ XLogRecPtr log_newpage_buffer(Buffer buffer, bool page_std, TdeInfo* tde_info) RelFileNode rnode; ForkNumber forkNum; BlockNumber blkno; + XLogRecPtr recptr; /* We should be in a critical section. */ Assert(t_thrd.int_cxt.CritSectionCount > 0); BufferGetTag(buffer, &rnode, &forkNum, &blkno); - return log_newpage(&rnode, forkNum, blkno, page, page_std, tde_info); + recptr = log_newpage(&rnode, forkNum, blkno, page, page_std, tde_info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif + return recptr; } /* diff --git a/src/gausskernel/storage/access/heap/pruneheap.cpp b/src/gausskernel/storage/access/heap/pruneheap.cpp index 915b7d3ed..7990714f7 100644 --- a/src/gausskernel/storage/access/heap/pruneheap.cpp +++ b/src/gausskernel/storage/access/heap/pruneheap.cpp @@ -260,6 +260,9 @@ int heap_page_prune(Relation relation, Buffer buffer, TransactionId oldest_xmin, repair_fragmentation); PageSetLSN(BufferGetPage(buffer), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } } else { /* diff --git a/src/gausskernel/storage/access/heap/rewriteheap.cpp b/src/gausskernel/storage/access/heap/rewriteheap.cpp index 5dbfce0d9..5114ea9a4 100644 --- a/src/gausskernel/storage/access/heap/rewriteheap.cpp +++ b/src/gausskernel/storage/access/heap/rewriteheap.cpp @@ -360,6 +360,9 @@ static void rewrite_write_one_page(RewriteState state, Page page) errno_t rc = memcpy_s(BufferGetBlock(buf), BLCKSZ, page, BLCKSZ); securec_check(rc, "\0", "\0"); PageSetLSN(BufferGetPage(buf), xlog_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif MarkBufferDirty(buf); UnlockReleaseBuffer(buf); } else { @@ -368,6 +371,9 @@ static void rewrite_write_one_page(RewriteState state, Page page) if (state->rs_use_wal) { log_newpage(&state->rs_new_rel->rd_node, MAIN_FORKNUM, state->rs_blockno, page, true, &tde_info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } RelationOpenSmgr(state->rs_new_rel); @@ -735,6 +741,9 @@ static void prepare_cmpr_buffer(RewriteState state, Size meta_size, const char * errno_t rc = memcpy_s(BufferGetBlock(buf), BLCKSZ, page, BLCKSZ); securec_check(rc, "\0", "\0"); PageSetLSN(BufferGetPage(buf), xlog_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif MarkBufferDirty(buf); UnlockReleaseBuffer(buf); diff --git a/src/gausskernel/storage/access/heap/visibilitymap.cpp b/src/gausskernel/storage/access/heap/visibilitymap.cpp index f5c1a66d4..ae64a97ee 100644 --- a/src/gausskernel/storage/access/heap/visibilitymap.cpp +++ b/src/gausskernel/storage/access/heap/visibilitymap.cpp @@ -244,6 +244,9 @@ void visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf, XLogRe } } PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/nbtree/nbtdedup.cpp b/src/gausskernel/storage/access/nbtree/nbtdedup.cpp index 9e8a9daf1..013d22613 100644 --- a/src/gausskernel/storage/access/nbtree/nbtdedup.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtdedup.cpp @@ -191,6 +191,9 @@ void btree_dedup_write_wal(BTDedupState state, Buffer buf) Page page = BufferGetPage(buf); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } void btree_dedup_single_value_fillfactor(Page page, BTDedupState state, Size newitemsz) @@ -476,4 +479,4 @@ static bool btree_dedup_posting_valid(IndexTuple posting) return true; } -#endif \ No newline at end of file +#endif diff --git a/src/gausskernel/storage/access/nbtree/nbtinsert.cpp b/src/gausskernel/storage/access/nbtree/nbtinsert.cpp index e65beddd7..0d288679c 100644 --- a/src/gausskernel/storage/access/nbtree/nbtinsert.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtinsert.cpp @@ -1198,6 +1198,9 @@ static void _bt_insertonpg(Relation rel, BTScanInsert itup_key, Buffer buf, Buff } } PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1704,6 +1707,9 @@ static Buffer _bt_split(Relation rel, BTScanInsert itup_key, Buffer buf, Buffer PageSetLSN(BufferGetPage(cbuf), recptr); } } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -2445,6 +2451,9 @@ static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) } PageSetLSN(rootpage, recptr); PageSetLSN(metapg, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -2605,4 +2614,4 @@ bool CheckPartitionIsInvisible(GPIScanDesc gpiScan) } return false; -} \ No newline at end of file +} diff --git a/src/gausskernel/storage/access/nbtree/nbtpage.cpp b/src/gausskernel/storage/access/nbtree/nbtpage.cpp index 33fae0154..01fc0da05 100644 --- a/src/gausskernel/storage/access/nbtree/nbtpage.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtpage.cpp @@ -302,6 +302,9 @@ Buffer _bt_getroot(Relation rel, int access) PageSetLSN(rootpage, recptr); PageSetLSN(metapg, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -582,6 +585,9 @@ static void _bt_log_reuse_page(const Relation rel, BlockNumber blkno, Transactio XLogRegisterData((char *)&xlrec_reuse, SizeOfBtreeReusePage); (void)XLogInsert(RM_BTREE_ID, XLOG_BTREE_REUSE_PAGE, rel->rd_node.bucketNode); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* @@ -899,6 +905,9 @@ void _bt_delitems_vacuum(const Relation rel, Buffer buf, OffsetNumber *deletable } PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -980,6 +989,9 @@ void _bt_delitems_delete(const Relation rel, Buffer buf, OffsetNumber *itemnos, recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_DELETE, bucket_id); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1688,6 +1700,9 @@ int _bt_pagedel_old(Relation rel, Buffer buf, BTStack stack) page = BufferGetPage(lbuf); PageSetLSN(page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -2153,6 +2168,9 @@ static bool _bt_mark_page_halfdead(Relation rel, Buffer leafbuf, BTStack stack) PageSetLSN(page, recptr); page = BufferGetPage(leafbuf); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -2532,6 +2550,9 @@ static bool _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, bool *rightsi page = BufferGetPage(leafbuf); PageSetLSN(page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/nbtree/nbtree.cpp b/src/gausskernel/storage/access/nbtree/nbtree.cpp index 3171da388..63b916100 100644 --- a/src/gausskernel/storage/access/nbtree/nbtree.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtree.cpp @@ -227,6 +227,9 @@ void btbuildempty_internal(Relation index) smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE, (char *)metapage, true); log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, BTREE_METAPAGE, metapage, false); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* * An immediate sync is require even if we xlog'd the page, because the diff --git a/src/gausskernel/storage/access/nbtree/nbtsort.cpp b/src/gausskernel/storage/access/nbtree/nbtsort.cpp index c6ef7953f..62cf3af1e 100644 --- a/src/gausskernel/storage/access/nbtree/nbtsort.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtsort.cpp @@ -319,6 +319,9 @@ static void _bt_segment_blwritepage(BTWriteState *wstate, Page page, BlockNumber errno_t rc = memcpy_s(BufferGetBlock(buf), BLCKSZ, page, BLCKSZ); securec_check(rc, "\0", "\0"); PageSetLSN(BufferGetPage(buf), xlog_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif MarkBufferDirty(buf); UnlockReleaseBuffer(buf); pfree(page); @@ -343,6 +346,9 @@ static void _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno) if (wstate->btws_use_wal) { /* We use the heap NEWPAGE record type for this */ log_newpage(&wstate->index->rd_node, MAIN_FORKNUM, blkno, page, true); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } if (blkno >= wstate->btws_pages_written) { diff --git a/src/gausskernel/storage/access/spgist/spgdoinsert.cpp b/src/gausskernel/storage/access/spgist/spgdoinsert.cpp index 16f31a78f..61adfc6ae 100644 --- a/src/gausskernel/storage/access/spgist/spgdoinsert.cpp +++ b/src/gausskernel/storage/access/spgist/spgdoinsert.cpp @@ -291,6 +291,9 @@ static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple lea if (xlrec.offnumParent != InvalidOffsetNumber) { PageSetLSN(parent->page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -496,6 +499,9 @@ static void moveLeafs(Relation index, SpGistState *state, SPPageDesc *current, S PageSetLSN(current->page, recptr); PageSetLSN(npage, recptr); PageSetLSN(parent->page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1256,6 +1262,9 @@ static bool doPickSplit(Relation index, SpGistState *state, SPPageDesc *current, if (parent->buffer != InvalidBuffer) { PageSetLSN(parent->page, recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1377,6 +1386,9 @@ static void spgAddNodeAction(Relation index, SpGistState *state, SpGistInnerTupl recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE); PageSetLSN(current->page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1502,6 +1514,9 @@ static void spgAddNodeAction(Relation index, SpGistState *state, SpGistInnerTupl PageSetLSN(current->page, recptr); PageSetLSN(parent->page, recptr); PageSetLSN(saveCurrent.page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -1652,6 +1667,9 @@ static void spgSplitNodeAction(Relation index, SpGistState *state, SpGistInnerTu if (newBuffer != InvalidBuffer) { PageSetLSN(BufferGetPage(newBuffer), recptr); } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/spgist/spginsert.cpp b/src/gausskernel/storage/access/spgist/spginsert.cpp index 5fd04c511..9f0a3e5d9 100644 --- a/src/gausskernel/storage/access/spgist/spginsert.cpp +++ b/src/gausskernel/storage/access/spgist/spginsert.cpp @@ -106,6 +106,9 @@ Datum spgbuild(PG_FUNCTION_ARGS) PageSetLSN(BufferGetPage(metabuffer), recptr); PageSetLSN(BufferGetPage(rootbuffer), recptr); PageSetLSN(BufferGetPage(nullbuffer), recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -171,6 +174,9 @@ Datum spgbuildempty(PG_FUNCTION_ARGS) PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO); smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO, (char *)page, true); log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO, page, false); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* Likewise for the root page. */ SpGistInitPage(page, SPGIST_LEAF); @@ -178,6 +184,9 @@ Datum spgbuildempty(PG_FUNCTION_ARGS) PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO); smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_ROOT_BLKNO, (char *)page, true); log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, SPGIST_ROOT_BLKNO, page, true); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* Likewise for the null-tuples root page. */ SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS); @@ -185,6 +194,9 @@ Datum spgbuildempty(PG_FUNCTION_ARGS) PageSetChecksumInplace(page, SPGIST_NULL_BLKNO); smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_NULL_BLKNO, (char *)page, true); log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, SPGIST_NULL_BLKNO, page, true); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* * An immediate sync is required even if we xlog'd the pages, because the @@ -247,4 +259,4 @@ Datum spgmerge(PG_FUNCTION_ARGS) ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("spgmerge: unimplemented"))); PG_RETURN_POINTER(result); -} \ No newline at end of file +} diff --git a/src/gausskernel/storage/access/spgist/spgvacuum.cpp b/src/gausskernel/storage/access/spgist/spgvacuum.cpp index ceda3ceaa..c5f21124a 100644 --- a/src/gausskernel/storage/access/spgist/spgvacuum.cpp +++ b/src/gausskernel/storage/access/spgist/spgvacuum.cpp @@ -359,6 +359,9 @@ static void vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffe recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -430,6 +433,9 @@ static void vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffe recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -541,6 +547,9 @@ static void vacuumRedirectAndPlaceholder(Relation index, Buffer buffer) recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/access/transam/clog.cpp b/src/gausskernel/storage/access/transam/clog.cpp index 4109ef785..00e2d024c 100644 --- a/src/gausskernel/storage/access/transam/clog.cpp +++ b/src/gausskernel/storage/access/transam/clog.cpp @@ -1090,6 +1090,9 @@ static void WriteZeroPageXlogRec(int64 pageno) XLogBeginInsert(); XLogRegisterData((char *)(&pageno), sizeof(int64)); (void)XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } /* @@ -1105,6 +1108,9 @@ static void WriteTruncateXlogRec(int64 pageno) XLogBeginInsert(); XLogRegisterData((char *)(&pageno), sizeof(int64)); recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); } diff --git a/src/gausskernel/storage/access/transam/multi_redo_api.cpp b/src/gausskernel/storage/access/transam/multi_redo_api.cpp index c451266fe..cc674ca1c 100644 --- a/src/gausskernel/storage/access/transam/multi_redo_api.cpp +++ b/src/gausskernel/storage/access/transam/multi_redo_api.cpp @@ -411,14 +411,6 @@ void DiagLogRedoRecord(XLogReaderState *record, const char *funcName) void ApplyRedoRecord(XLogReaderState *record) { -#ifdef ENABLE_SS_MULTIMASTER - XLogReaderArgs args; - if (record->args) { - memset_s(&args, sizeof(args), 0, sizeof(args)); - CurrentArgsCopy(&args); - NodeArgsSwitch(record->args); - } -#endif ErrorContextCallback errContext; errContext.callback = rm_redo_error_callback; errContext.arg = (void *)record; @@ -428,11 +420,7 @@ void ApplyRedoRecord(XLogReaderState *record) DiagLogRedoRecord(record, "ApplyRedoRecord"); } RmgrTable[XLogRecGetRmid(record)].rm_redo(record); -#ifdef ENABLE_SS_MULTIMASTER - if (record->args) { - NodeArgsSwitch(&args); - } -#endif + t_thrd.log_cxt.error_context_stack = errContext.previous; } diff --git a/src/gausskernel/storage/access/transam/multixact.cpp b/src/gausskernel/storage/access/transam/multixact.cpp index fa2fb0c59..e0bdea8f5 100644 --- a/src/gausskernel/storage/access/transam/multixact.cpp +++ b/src/gausskernel/storage/access/transam/multixact.cpp @@ -740,6 +740,9 @@ static MultiXactId CreateMultiXactId(int nmembers, MultiXactMember *members) XLogRegisterData((char *)xidsWithStatus, (unsigned)nmembers * sizeof(TransactionId)); (void)XLogInsert(RM_MULTIXACT_ID, XLOG_MULTIXACT_CREATE_ID); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* Now enter the information into the OFFSETs and MEMBERs logs */ RecordNewMultiXact(multi, offset, nmembers, xidsWithStatus); @@ -2118,6 +2121,9 @@ static void WriteMZeroPageXlogRec(int64 pageno, uint8 info) XLogRegisterData((char *)(&pageno), sizeof(int)); } (void)XLogInsert(RM_MULTIXACT_ID, info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } void get_multixact_pageno(uint8 info, int64 *pageno, XLogReaderState *record) @@ -2227,7 +2233,7 @@ void SSMultiXactShmemClear(void) } #ifdef ENABLE_SS_MULTIMASTER -Size SSMultiXactSize(void) +Size SSGetMultiXactSize(void) { return sizeof(MultiXactStateData); } diff --git a/src/gausskernel/storage/access/transam/twophase.cpp b/src/gausskernel/storage/access/transam/twophase.cpp index 8f2b68aea..e5ae7f72d 100644 --- a/src/gausskernel/storage/access/transam/twophase.cpp +++ b/src/gausskernel/storage/access/transam/twophase.cpp @@ -1966,6 +1966,9 @@ void EndPrepare(GlobalTransaction gxact) } gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(gxact->prepare_end_lsn); /* If we crash now, we have prepared: WAL replay will fix things */ @@ -3544,6 +3547,9 @@ static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren, Tr info |= XLR_REL_COMPRESS; } recptr = XLogInsert(RM_XACT_ID, (uint8)info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (nrels > 0) { globalDelayDDLLSN = GetDDLDelayStartPtr(); @@ -3642,6 +3648,9 @@ static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, Tra info |= XLR_REL_COMPRESS; } recptr = XLogInsert(RM_XACT_ID, (uint8)info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (nrels > 0) { globalDelayDDLLSN = GetDDLDelayStartPtr(); diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index f1cba2d9d..841b098e6 100755 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -880,6 +880,9 @@ static void AssignTransactionId(TransactionState s) t_thrd.xact_cxt.nUnreportedXids = 0; /* mark top, not current xact as having been logged */ TopTransactionStateData.didLogXid = true; +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } } } @@ -1703,6 +1706,9 @@ static TransactionId RecordTransactionCommit(void) info |= XLR_REL_COMPRESS; } commitRecLSN = XLogInsert(RM_XACT_ID, (uint8)info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (nrels > 0) { globalDelayDDLLSN = GetDDLDelayStartPtr(); @@ -1743,6 +1749,9 @@ static TransactionId RecordTransactionCommit(void) info |= XLR_REL_COMPRESS; } (void)XLogInsert(RM_XACT_ID, (uint8)info); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } } @@ -2142,6 +2151,9 @@ static TransactionId RecordTransactionAbort(bool isSubXact) #else abortRecLSN = XLogInsert(RM_XACT_ID, (uint8)(XLOG_XACT_ABORT | info)); #endif +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (nrels > 0) { globalDelayDDLLSN = GetDDLDelayStartPtr(); diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 54ed742cf..8392efe7f 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -193,6 +193,7 @@ #define SS_STANDBY_INST_SKIP_SHARED_FILE false #define SS_REDO_ID 0 #define SS_REDO_MODE (ENABLE_DMS && (SS_MY_INST_ID == SS_REDO_ID)) +#define SS_FORK_MODE (ENABLE_DMS && (SS_MY_INST_ID != SS_REDO_ID)) #define SS_IS_FORK_RECORD(state) (state->args != NULL && state->instId != SS_MY_INST_ID) #endif @@ -346,10 +347,6 @@ typedef struct XLogSwitchInfo { volatile bool IsPendingXactsRecoveryDone = false; -#ifdef ENABLE_SS_MULTIMASTER -static uint32 GetXlogRecordLogicLSN(XLogRecPtr RecPtr); -#endif - static void XLogFlushCore(XLogRecPtr writeRqstPtr); static void XLogSelfFlush(void); static void XLogSelfFlushWithoutStatus(int numHitsOnStartPage, XLogRecPtr currPos, int currLRC); @@ -517,7 +514,7 @@ static void XLogInsertRecordGroupLeader(PGPROC *leader, uint64 *end_byte_pos_ptr uint32 logic_lsn = 0; #ifdef ENABLE_SS_MULTIMASTER - *leader->xlogGroupRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoLogicLSN; + *leader->xlogGroupRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.logicRedoRecPtr; #else *leader->xlogGroupRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoRecPtr; #endif @@ -597,7 +594,7 @@ static void XLogInsertRecordGroupFollowers(PGPROC *leader, const uint32 head, ui while (nextidx != (uint32)(leader->pgprocno)) { follower = g_instance.proc_base_all_procs[nextidx]; #ifdef ENABLE_SS_MULTIMASTER - *follower->xlogGroupRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoLogicLSN; + *follower->xlogGroupRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.logicRedoRecPtr; #else *follower->xlogGroupRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoRecPtr; #endif @@ -735,7 +732,7 @@ static XLogRecPtr XLogInsertRecordGroup(XLogRecData *rdata, XLogRecPtr fpw_lsn) proc->xlogGroupXactLastRecEnd = &t_thrd.xlog_cxt.XactLastRecEnd; proc->xlogGroupCurrentTransactionState = GetCurrentTransactionState(); #ifdef ENABLE_SS_MULTIMASTER - proc->xlogGroupRedoRecPtr = &t_thrd.xlog_cxt.RedoLogicLSN; + proc->xlogGroupRedoRecPtr = &t_thrd.xlog_cxt.logicRedoRecPtr; #else proc->xlogGroupRedoRecPtr = &t_thrd.xlog_cxt.RedoRecPtr; #endif @@ -1317,13 +1314,13 @@ static XLogRecPtr XLogInsertRecordSingle(XLogRecData *rdata, XLogRecPtr fpw_lsn) Assert(t_thrd.xlog_cxt.RedoRecPtr < Insert->RedoRecPtr); t_thrd.xlog_cxt.RedoRecPtr = Insert->RedoRecPtr; #ifdef ENABLE_SS_MULTIMASTER - t_thrd.xlog_cxt.RedoLogicLSN = Insert->RedoLogicLSN; + t_thrd.xlog_cxt.logicRedoRecPtr = Insert->logicRedoRecPtr; #endif } t_thrd.xlog_cxt.doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites); #ifdef ENABLE_SS_MULTIMASTER - if (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= t_thrd.xlog_cxt.RedoLogicLSN && t_thrd.xlog_cxt.doPageWrites) { + if (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= t_thrd.xlog_cxt.logicRedoRecPtr && t_thrd.xlog_cxt.doPageWrites) { #else if (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= t_thrd.xlog_cxt.RedoRecPtr && t_thrd.xlog_cxt.doPageWrites) { #endif @@ -9737,15 +9734,8 @@ void NodeArgsSwitch(XLogReaderArgs *args) t_thrd.shemem_ptr_cxt.MultiXactState = args->MultiXactState; t_thrd.shemem_ptr_cxt.ClogCtl = args->ExtendClogCtl; t_thrd.shemem_ptr_cxt.CsnlogCtlPtr = args->ExtendCsnlogCtlPtr; -} - -void CurrentArgsCopy(XLogReaderArgs *args) -{ - args->ControlFile = t_thrd.shemem_ptr_cxt.ControlFile; - args->ShmemVariableCache = t_thrd.xact_cxt.ShmemVariableCache; - args->MultiXactState = t_thrd.shemem_ptr_cxt.MultiXactState; - args->ExtendClogCtl = t_thrd.shemem_ptr_cxt.ClogCtl; - args->ExtendCsnlogCtlPtr = t_thrd.shemem_ptr_cxt.CsnlogCtlPtr; + t_thrd.shemem_ptr_cxt.MultiXactOffsetCtl = args->ExtendMultiXactOffsetCtl; + t_thrd.shemem_ptr_cxt.MultiXactMemberCtl = args->ExtendMultiXactMemberCtl; } static int get_min_xlogreader(XLogReaderState **xlogreaderList, int instanceNum) @@ -9773,6 +9763,8 @@ void SlruInit(XLogReaderArgs *args, int id) args->ExtendClogCtl = (SlruCtlData*)palloc0(NUM_CLOG_PARTITIONS * sizeof(SlruCtlData)); args->ExtendCsnlogCtlPtr = (SlruCtlData*)palloc0(NUM_CSNLOG_PARTITIONS * sizeof(SlruCtlData)); + args->ExtendMultiXactOffsetCtl = (SlruCtlData*)palloc0(sizeof(SlruCtlData)); + args->ExtendMultiXactMemberCtl = (SlruCtlData*)palloc0(sizeof(SlruCtlData)); rc = snprintf_s(dir, MAXPGPATH, MAXPGPATH - 1, "%s/%s%d", g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name, "pg_clog", id); @@ -9789,6 +9781,28 @@ void SlruInit(XLogReaderArgs *args, int id) ExtendSimpleLruInit(&args->ExtendCsnlogCtlPtr[i], LWTRANCHE_CSNLOG_CTL, CSNLOGShmemBuffers(), 0, CSNBufMappingPartitionLockByIndex(i), dir); } + + rc = snprintf_s(dir, MAXPGPATH, MAXPGPATH - 1, + "%s/%s/offsets", g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name, "pg_multixact", id); + securec_check_ss(rc, "\0", "\0"); + if (ENABLE_DSS) { + ExtendSimpleLruInit(args->ExtendMultiXactOffsetCtl, LWTRANCHE_MULTIXACTOFFSET_CTL, DSS_MAX_MXACTOFFSET, 0, + MultiXactOffsetControlLock, dir); + } else { + ExtendSimpleLruInit(args->ExtendMultiXactOffsetCtl, LWTRANCHE_MULTIXACTOFFSET_CTL, NUM_MXACTOFFSET_BUFFERS, 0, + MultiXactOffsetControlLock, dir); + } + + rc = snprintf_s(dir, MAXPGPATH, MAXPGPATH - 1, + "%s/%s/members", g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name, "pg_multixact", id); + securec_check_ss(rc, "\0", "\0"); + if (ENABLE_DSS) { + ExtendSimpleLruInit(args->ExtendMultiXactMemberCtl, LWTRANCHE_MULTIXACTMEMBER_CTL, DSS_MAX_MXACTMEMBER, 0, + MultiXactMemberControlLock, dir); + } else { + ExtendSimpleLruInit(args->ExtendMultiXactMemberCtl, LWTRANCHE_MULTIXACTMEMBER_CTL, NUM_MXACTMEMBER_BUFFERS, 0, + MultiXactMemberControlLock, dir); + } } #endif /* @@ -9796,6 +9810,7 @@ void SlruInit(XLogReaderArgs *args, int id) */ void StartupXLOG(void) { + uint64 redoTime = GetCurrentTimestamp(); XLogCtlInsert *Insert = NULL; CheckPoint checkPoint; CheckPointNew checkPointNew; /* to adapt update and not to modify the storage format */ @@ -9820,6 +9835,7 @@ void StartupXLOG(void) int instanceNum = 0; XLogReaderState **xlogreaderList = NULL; XLogReaderArgs **xlogReaderArgsList = NULL; + uint64 checkpointRedoLogicLSN = 0; #endif XLogPageReadPrivate readprivate; bool RecoveryByPending = false; /* recovery caused by pending mode */ @@ -9923,11 +9939,13 @@ void StartupXLOG(void) xlogReaderArgsList[i]->MultiXactState = t_thrd.shemem_ptr_cxt.MultiXactState; xlogReaderArgsList[i]->ExtendClogCtl = t_thrd.shemem_ptr_cxt.ClogCtl; xlogReaderArgsList[i]->ExtendCsnlogCtlPtr = t_thrd.shemem_ptr_cxt.CsnlogCtlPtr; + xlogReaderArgsList[i]->ExtendMultiXactOffsetCtl = t_thrd.shemem_ptr_cxt.MultiXactOffsetCtl; + xlogReaderArgsList[i]->ExtendMultiXactMemberCtl = t_thrd.shemem_ptr_cxt.MultiXactMemberCtl; } else { xlogReaderArgsList[i]->ControlFile = (ControlFileData *)palloc(sizeof(ControlFileData)); xlogReaderArgsList[i]->checkPointUndo = (CheckPointUndo *)palloc(sizeof(CheckPointUndo)); xlogReaderArgsList[i]->ShmemVariableCache = (VariableCacheData *)palloc(sizeof(VariableCacheData)); - xlogReaderArgsList[i]->MultiXactState = (MultiXactStateData *)palloc(SSMultiXactSize()); + xlogReaderArgsList[i]->MultiXactState = (MultiXactStateData *)palloc(SSGetMultiXactSize()); SSReadControlFile(i, false, xlogReaderArgsList[i]->ControlFile); SlruInit(xlogReaderArgsList[i], i); @@ -9937,16 +9955,13 @@ void StartupXLOG(void) xlogReaderArgsList[i]->checkPointUndo->globalRecycleXid = InvalidTransactionId; errno_t rc = memset_s(xlogReaderArgsList[i]->ShmemVariableCache,sizeof(VariableCacheData),0,sizeof(VariableCacheData)); securec_check(rc, "\0", "\0"); - rc = memset_s(xlogReaderArgsList[i]->MultiXactState, sizeof(SSMultiXactSize()), 0, sizeof(SSMultiXactSize())); + rc = memset_s(xlogReaderArgsList[i]->MultiXactState, SSGetMultiXactSize(), 0, SSGetMultiXactSize()); securec_check(rc, "", ""); } xlogReaderArgsList[i]->readFile = -1; xlogReaderArgsList[i]->readSegNo = 0; } } else { - SSCSNLOGShmemClear(); - SSCLOGShmemClear(); - SSMultiXactShmemClear(); src_id = SS_MY_INST_ID; SSReadControlFile(SS_MY_INST_ID); } @@ -10181,7 +10196,7 @@ void StartupXLOG(void) if (ENABLE_DMS && ENABLE_DSS) { SSGetRecoveryXlogPath(); #ifdef ENABLE_SS_MULTIMASTER - if (SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD || !SS_REDO_MODE) { + if (SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD || SS_FORK_MODE) { xlogreader = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER); close_readFile_if_open(); } else { @@ -10471,6 +10486,12 @@ void StartupXLOG(void) wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN); wasCheckpoint = wasShutdown || (record->xl_info == XLOG_CHECKPOINT_ONLINE); +#ifdef ENABLE_SS_MULTIMASTER + if (XLByteLT(checkPoint.redo, checkPointLoc)) { + record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false); + } + checkpointRedoLogicLSN = record->logic_lsn; +#endif } /* @@ -10711,6 +10732,10 @@ void StartupXLOG(void) t_thrd.xlog_cxt.lastFullPageWrites = checkPoint.fullPageWrites; t_thrd.xlog_cxt.RedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoRecPtr = checkPoint.redo; +#ifdef ENABLE_SS_MULTIMASTER + t_thrd.xlog_cxt.logicRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr = + t_thrd.shemem_ptr_cxt.XLogCtl->Insert.logicRedoRecPtr = checkpointRedoLogicLSN; +#endif } if (ENABLE_INCRE_CKPT) { @@ -10752,14 +10777,33 @@ void StartupXLOG(void) } #ifdef ENABLE_SS_MULTIMASTER - if (!SS_REDO_MODE && t_thrd.xlog_cxt.InRecovery == true && SS_ONDEMAND_REALTIME_BUILD_DISABLED) { + if (SS_FORK_MODE && t_thrd.xlog_cxt.InRecovery == true && SS_ONDEMAND_REALTIME_BUILD_DISABLED) { + /* do not need replay anything in SS_FORK_MODE */ + ereport(LOG, (errmsg("[SS] Skip redo replay in standby mode"))); + t_thrd.xlog_cxt.InRecovery = false; + do { + record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); + } while (record != NULL); + t_thrd.xlog_cxt.LastRec = t_thrd.xlog_cxt.ReadRecPtr; + /* but need to wait SS_REDO_MODE */ + while (!g_instance.dms_cxt.SSRecoveryInfo.redo_done) { + pg_usleep(5000L); + } + SSCSNLOGShmemClear(); + SSCLOGShmemClear(); + SSMultiXactShmemClear(); + errno_t rc = memcpy_s(t_thrd.xact_cxt.ShmemVariableCache, sizeof(VariableCacheData), g_instance.dms_cxt.SSReformerControl.argsList[SS_MY_INST_ID].ShmemVariableCache, sizeof(VariableCacheData)); + securec_check(rc, "", ""); + rc = memcpy_s(t_thrd.shemem_ptr_cxt.MultiXactState, MUXACT_SIZE, g_instance.dms_cxt.SSReformerControl.argsList[SS_MY_INST_ID].PartMultiXactState, MUXACT_SIZE); + securec_check(rc, "", ""); + } #else if (SS_STANDBY_MODE && t_thrd.xlog_cxt.InRecovery == true && SS_ONDEMAND_REALTIME_BUILD_DISABLED) { -#endif /* do not need replay anything in SS standby mode */ ereport(LOG, (errmsg("[SS] Skip redo replay in standby mode"))); t_thrd.xlog_cxt.InRecovery = false; } +#endif g_instance.dms_cxt.SSRecoveryInfo.in_ondemand_recovery = false; SetDefaultExtremeRtoMode(); @@ -10774,11 +10818,19 @@ void StartupXLOG(void) } } +#ifdef ENABLE_SS_MULTIMASTER + if (SS_REDO_MODE || SS_DISASTER_MAIN_STANDBY_NODE) { + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); + } +#else if (SS_PRIMARY_MODE || SS_DISASTER_MAIN_STANDBY_NODE) { LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); SSUpdateReformerCtrl(); LWLockRelease(ControlFileLock); } +#endif if (SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD) { t_thrd.xlog_cxt.InRecovery = true; @@ -11081,6 +11133,7 @@ void StartupXLOG(void) record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); } + XLogReaderState *oldXlogReader = xlogreader; #ifdef ENABLE_SS_MULTIMASTER if (SS_REDO_MODE) { if (record == NULL) { @@ -11097,7 +11150,7 @@ void StartupXLOG(void) } if (XLByteLT(tmpRedoPointLoc, tmpCheckPointLoc)) { t_thrd.xlog_cxt.InRecovery = true; - record = ReadRecord(xlogreaderList[i], checkPoint.redo, PANIC, false); + record = ReadRecord(xlogreaderList[i], tmpRedoPointLoc, PANIC, false); } else { record = ReadRecord(xlogreaderList[i], InvalidXLogRecPtr, LOG, false); if (record == NULL) { @@ -11112,9 +11165,6 @@ void StartupXLOG(void) xlogctl->lastReplayedLogicLSN = record->logic_lsn; } } - XLogReaderState *oldXlogReader = xlogreaderList[SS_MY_INST_ID]; -#else - XLogReaderState *oldXlogReader = xlogreader; #endif if (record != NULL) { @@ -11183,6 +11233,11 @@ void StartupXLOG(void) pfree_ext(buf.data); } #endif +#ifdef ENABLE_SS_MULTIMASTER + if (xlogreader->args) { + NodeArgsSwitch(xlogreader->args); + } +#endif /* Handle interrupt signals of startup process */ RedoInterruptCallBack(); @@ -11350,6 +11405,9 @@ void StartupXLOG(void) #ifdef ENABLE_SS_MULTIMASTER if (SS_REDO_MODE) { xlogreader = xlogreaderList[SS_MY_INST_ID]; + if (xlogreader->args) { + NodeArgsSwitch(xlogreader->args); + } for (int i = 0; i < instanceNum; i++) { if (xlogreaderList[i]->instId == SS_MY_INST_ID) { continue; @@ -11358,6 +11416,9 @@ void StartupXLOG(void) (void)SimpleLruFlush(&xlogreaderList[i]->args->ExtendClogCtl[j], false); for (int j = 0; j < NUM_CSNLOG_PARTITIONS; j++) (void)SimpleLruFlush(&xlogreaderList[i]->args->ExtendCsnlogCtlPtr[j], false); + (void)SimpleLruFlush(xlogreaderList[i]->args->ExtendMultiXactOffsetCtl, false); + (void)SimpleLruFlush(xlogreaderList[i]->args->ExtendMultiXactMemberCtl, false); + } } #endif @@ -11504,14 +11565,6 @@ void StartupXLOG(void) * Re-fetch the last valid or last applied record, so we can identify the * exact endpoint of what we consider the valid portion of WAL. */ -#ifdef ENABLE_SS_MULTIMASTER - do { - record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); - t_thrd.xlog_cxt.LastRec = t_thrd.xlog_cxt.ReadRecPtr; - } while (record != NULL); - record = ReadRecord(xlogreader, t_thrd.xlog_cxt.ReadRecPtr, LOG, false); - EndOfLog = t_thrd.xlog_cxt.EndRecPtr; -#else xlogreader->readSegNo = 0; xlogreader->readOff = 0; xlogreader->readLen = 0; @@ -11519,7 +11572,6 @@ void StartupXLOG(void) UpdateTermFromXLog(record->xl_term); EndOfLog = t_thrd.xlog_cxt.EndRecPtr; -#endif XLByteToPrevSeg(EndOfLog, endLogSegNo); if (!SS_DISASTER_STANDBY_CLUSTER && ((ENABLE_DMS && SS_STANDBY_FAILOVER) || SS_STANDBY_PROMOTING)) { @@ -11606,11 +11658,6 @@ void StartupXLOG(void) Insert->LogicLSN = record->logic_lsn + 1; } g_instance.wal_cxt.prevValidPtr = t_thrd.xlog_cxt.LastRec; - - if (!IsInitdb) { - record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false); - t_thrd.xlog_cxt.RedoLogicLSN = t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoLogicLSN = record->logic_lsn; - } #else Insert->PrevByteSize = XLogRecPtrToBytePos(EndOfLog) - XLogRecPtrToBytePos(t_thrd.xlog_cxt.LastRec); #endif @@ -11781,12 +11828,7 @@ void StartupXLOG(void) #ifndef ENABLE_MULTIPLE_NODES StartupReplicationOrigin(); #endif -#ifdef ENABLE_SS_MULTIMASTER - if (SS_REDO_MODE) - TrimCLOG(); -#else TrimCLOG(); -#endif /* Reload shared-memory state for prepared transactions */ RecoverPreparedTransactions(); @@ -11811,6 +11853,10 @@ void StartupXLOG(void) /* Shut down the xlog reader facility. */ #ifdef ENABLE_SS_MULTIMASTER for (int i = 0; i < instanceNum; i++) { + errno_t rc = memcpy_s(g_instance.dms_cxt.SSReformerControl.argsList[i].ShmemVariableCache, sizeof(VariableCacheData), xlogReaderArgsList[i]->ShmemVariableCache, sizeof(VariableCacheData)); + securec_check(rc, "", ""); + rc = memcpy_s(g_instance.dms_cxt.SSReformerControl.argsList[i].PartMultiXactState, MUXACT_SIZE, xlogReaderArgsList[i]->MultiXactState, MUXACT_SIZE); + securec_check(rc, "", ""); XLogReaderFree(xlogreaderList[i]); xlogreaderList[i] = NULL; } @@ -11955,7 +12001,19 @@ void StartupXLOG(void) } g_instance.dms_cxt.SSRecoveryInfo.in_ondemand_recovery = false; } - +#ifdef ENABLE_SS_MULTIMASTER + if (SS_REDO_MODE) { + g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status = CLUSTER_NORMAL; + g_instance.dms_cxt.SSRecoveryInfo.ondemand_realtime_build_status = DISABLED; + /* for other nodes in cluster */ + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + g_instance.dms_cxt.SSReformerControl.clusterStatus = CLUSTER_NORMAL; + SSUpdateReformerCtrl(); + LWLockRelease(ControlFileLock); + SSRequestAllStandbyReloadReformCtrlPage(); + SSRequestAllNodeUpdateArgs(); + } +#else if (SS_PRIMARY_MODE) { g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status = CLUSTER_NORMAL; g_instance.dms_cxt.SSRecoveryInfo.ondemand_realtime_build_status = DISABLED; @@ -11966,6 +12024,7 @@ void StartupXLOG(void) LWLockRelease(ControlFileLock); SSRequestAllStandbyReloadReformCtrlPage(); } +#endif ereport(LOG, (errmsg("redo done, nextXid: " XID_FMT ", startupMaxXid: " XID_FMT ", recentLocalXmin: " XID_FMT ", recentGlobalXmin: %lu, PendingPreparedXacts: %d" @@ -12599,10 +12658,9 @@ XLogRecPtr GetRedoRecPtr(void) */ void GetFullPageWriteInfo(XLogFPWInfo *fpwInfo_p) { -#ifdef ENABLE_SS_MULTIMASTER - fpwInfo_p->redoRecPtr = t_thrd.xlog_cxt.RedoLogicLSN; -#else fpwInfo_p->redoRecPtr = t_thrd.xlog_cxt.RedoRecPtr; +#ifdef ENABLE_SS_MULTIMASTER + fpwInfo_p->logicRedoRecPtr = t_thrd.xlog_cxt.logicRedoRecPtr; #endif fpwInfo_p->doPageWrites = t_thrd.xlog_cxt.doPageWrites && !ENABLE_INCRE_CKPT; @@ -12914,6 +12972,40 @@ static void LogCheckpointEnd(bool restartpoint) } } +#ifdef ENABLE_SS_MULTIMASTER +uint64 GetCheckPointRedoLogicLSN(XLogRecPtr redo) { + uint64 logicLSN = 0; + TimeLineID tli = 0; + XLogReaderState *xlogreader = NULL; + XLogPageReadPrivate readprivate; + + if (!t_thrd.xlog_cxt.expectedTLIs && t_thrd.shemem_ptr_cxt.XLogCtl && t_thrd.shemem_ptr_cxt.ControlFile) { + tli = t_thrd.shemem_ptr_cxt.XLogCtl->ThisTimeLineID; + t_thrd.xlog_cxt.expectedTLIs = list_make1_int((int)tli); + + tli = t_thrd.shemem_ptr_cxt.ControlFile->checkPointCopy.ThisTimeLineID; + /* Build list with newest item first */ + if (!list_member_int(t_thrd.xlog_cxt.expectedTLIs, (int)tli)) { + t_thrd.xlog_cxt.expectedTLIs = lcons_int((int)tli, t_thrd.xlog_cxt.expectedTLIs); + } + } + + errno_t rc = memset_s(&readprivate, sizeof(XLogPageReadPrivate), 0, sizeof(XLogPageReadPrivate)); + securec_check(rc, "\0", "\0"); + if (ENABLE_DMS && ENABLE_DSS) { + xlogreader = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER); + } else { + xlogreader = XLogReaderAllocate(&XLogPageRead, &readprivate); + } + XLogRecord *record = ReadRecord(xlogreader, redo, LOG, false); + logicLSN = record->logic_lsn; + ShutdownReadFileFacility(); + XLogReaderFree(xlogreader); + xlogreader = NULL; + return logicLSN; +} +#endif + /* * Perform a checkpoint --- either during shutdown, or on-the-fly * @@ -13463,6 +13555,10 @@ void CreateCheckPoint(int flags) recptr = XLogInsert(RM_XLOG_ID, shutdown ? XLOG_CHECKPOINT_SHUTDOWN : XLOG_CHECKPOINT_ONLINE); XLogWaitFlush(recptr); +#ifdef ENABLE_SS_MULTIMASTER + t_thrd.xlog_cxt.logicRedoRecPtr = xlogctl->Insert.logicRedoRecPtr = GetCheckPointRedoLogicLSN(checkPoint.redo); + g_curr_lsn = 0; +#endif /* * We mustn't write any new WAL after a shutdown checkpoint, or it will be @@ -13540,15 +13636,6 @@ void CreateCheckPoint(int flags) */ END_CRIT_SECTION(); -#ifdef ENABLE_SS_MULTIMASTER - if (!IsInitdb) { - uint32 logic_lsn = GetXlogRecordLogicLSN(checkPoint.redo); - if (logic_lsn) { - t_thrd.xlog_cxt.RedoLogicLSN = xlogctl->Insert.RedoLogicLSN = logic_lsn; - } - } -#endif - SyncPostCheckpoint(); if (!t_thrd.cbm_cxt.XlogCbmSys->firstCPCreated) { @@ -13988,6 +14075,10 @@ bool CreateRestartPoint(int flags) */ StartSuspendWalInsert(&lastlrc); xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo; +#ifdef ENABLE_SS_MULTIMASTER + uint64 lastRedoLSN = GetCheckPointRedoLogicLSN(lastCheckPoint.redo); + xlogctl->Insert.logicRedoRecPtr = lastRedoLSN; +#endif #ifdef ENABLE_MOT CallCheckpointCallback(EVENT_CHECKPOINT_SNAPSHOT_READY, lastCheckPointRecPtr); #endif @@ -13996,6 +14087,9 @@ bool CreateRestartPoint(int flags) /* Also update the info_lck-protected copy */ SpinLockAcquire(&xlogctl->info_lck); xlogctl->RedoRecPtr = lastCheckPoint.redo; +#ifdef ENABLE_SS_MULTIMASTER + xlogctl->logicRedoRecPtr = lastRedoLSN; +#endif SpinLockRelease(&xlogctl->info_lck); /* @@ -14623,6 +14717,9 @@ void XLogPutNextOid(Oid nextOid) XLogRegisterData((char *)(&nextOid), sizeof(Oid)); (void)XLogInsert(RM_XLOG_ID, XLOG_NEXTOID); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* * We need not flush the NEXTOID record immediately, because any of the @@ -14659,7 +14756,11 @@ XLogRecPtr RequestXLogSwitch(void) /* XLOG SWITCH has no data */ XLogBeginInsert(); - return XLogInsert(RM_XLOG_ID, XLOG_SWITCH); + XLogRecPtr recPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif + return recPtr; } /* @@ -14679,6 +14780,9 @@ XLogRecPtr XLogRestorePoint(const char *rpName) XLogRegisterData((char *)&xlrec, sizeof(xl_restore_point)); RecPtr = XLogInsert(RM_XLOG_ID, XLOG_RESTORE_POINT); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif ereport(LOG, (errmsg("restore point \"%s\" created at %X/%X", rpName, (uint32)(RecPtr >> 32), (uint32)RecPtr))); @@ -14716,6 +14820,9 @@ static void XLogReportParameters(void) XLogRegisterData((char *)&xlrec, sizeof(xlrec)); recptr = XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); } @@ -14779,6 +14886,9 @@ void UpdateFullPageWrites(void) XLogRegisterData((char *)(&u_sess->attr.attr_storage.fullPageWrites), sizeof(bool)); (void)XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } if (!u_sess->attr.attr_storage.fullPageWrites) { @@ -16162,6 +16272,9 @@ XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive, unsigned long XLogBeginInsert(); XLogRegisterData((char *)(&startpoint), sizeof(startpoint)); stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (strcmp(u_sess->attr.attr_common.application_name, "gs_roach") == 0) { char *fileName = BACKUP_LABEL_FILE_ROACH; struct stat statbuf; @@ -19730,6 +19843,9 @@ void SetDelayXlogRecycle(bool toDelay, bool isRedo) XLogBeginInsert(); XLogRegisterData((char *)(&toDelay), sizeof(toDelay)); recptr = XLogInsert(RM_XLOG_ID, XLOG_DELAY_XLOG_RECYCLE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); END_CRIT_SECTION(); } @@ -21712,69 +21828,3 @@ void XlogArchUwal(XLogRecPtr archRqstPtr) return; } -#ifdef ENABLE_SS_MULTIMASTER -/* - * get the LogicLSN of the xlog record - */ -uint32 GetXlogRecordLogicLSN(XLogRecPtr RecPtr) -{ - XLogReaderState *xlogreader = NULL; - XLogPageReadPrivate readprivate; - uint32 res = 0; - TimeLineID tli = 0; - XLogRecord *rec = NULL; - char *errorMsg = NULL; - errno_t rc = EOK; - - if (!t_thrd.xlog_cxt.expectedTLIs && t_thrd.shemem_ptr_cxt.XLogCtl && t_thrd.shemem_ptr_cxt.ControlFile) { - tli = t_thrd.shemem_ptr_cxt.XLogCtl->ThisTimeLineID; - t_thrd.xlog_cxt.expectedTLIs = list_make1_int((int)tli); - - tli = t_thrd.shemem_ptr_cxt.ControlFile->checkPointCopy.ThisTimeLineID; - /* Build list with newest item first */ - if (!list_member_int(t_thrd.xlog_cxt.expectedTLIs, (int)tli)) { - t_thrd.xlog_cxt.expectedTLIs = lcons_int((int)tli, t_thrd.xlog_cxt.expectedTLIs); - } - } - - if (!XRecOffIsValid(RecPtr)) { - ereport(ERROR, (errcode(ERRCODE_CASE_NOT_FOUND), - errmsg("invalid record offset at %X/%X.", (uint32)(RecPtr >> 32), (uint32)RecPtr))); - } - - /* Set up XLOG reader facility */ - rc = memset_s(&readprivate, sizeof(XLogPageReadPrivate), 0, sizeof(XLogPageReadPrivate)); - securec_check(rc, "\0", "\0"); - - xlogreader = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER); - - if (xlogreader == NULL) { - ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"), - errdetail("Failed while allocating an XLog reading processor"))); - } - xlogreader->system_identifier = GetSystemIdentifier(); - - if (dummyStandbyMode) { - rec = ReadRecord(xlogreader, RecPtr, LOG, false); - } else { - rec = XLogReadRecord(xlogreader, RecPtr, &errorMsg, true); - } - - if (rec != NULL) { - res = rec->logic_lsn; - - if (dummyStandbyMode) { - SetDummyStandbyEndRecPtr(xlogreader); - } - } - - /* Shut down readFile facility, free space. */ - ShutdownReadFileFacility(); - - /* Shut down the xlog reader facility. */ - XLogReaderFree(xlogreader); - xlogreader = NULL; - - return res; -} -#endif diff --git a/src/gausskernel/storage/access/transam/xloginsert.cpp b/src/gausskernel/storage/access/transam/xloginsert.cpp index 4e27cda69..d540fbdd8 100755 --- a/src/gausskernel/storage/access/transam/xloginsert.cpp +++ b/src/gausskernel/storage/access/transam/xloginsert.cpp @@ -765,7 +765,11 @@ static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogFPWInfo fpw_ */ XLogRecPtr page_lsn = PageGetLSN(regbuf->page); +#ifdef ENABLE_SS_MULTIMASTER + needs_backup = XLByteLE(page_lsn, fpw_info.logicRedoRecPtr); +#else needs_backup = XLByteLE(page_lsn, fpw_info.redoRecPtr); +#endif if (!needs_backup) { if (XLByteEQ(*fpw_lsn, InvalidXLogRecPtr) || XLByteLT(page_lsn, *fpw_lsn)) *fpw_lsn = page_lsn; diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index c8f317cef..6217b3e26 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -6124,6 +6124,9 @@ void MarkBufferDirtyHint(Buffer buffer, bool buffer_std) t_thrd.vacuum_cxt.VacuumCostBalance += u_sess->attr.attr_storage.VacuumCostPageDirty; } } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif buf_state = old_buf_state | (BM_DIRTY | BM_JUST_DIRTIED); diff --git a/src/gausskernel/storage/ipc/standby.cpp b/src/gausskernel/storage/ipc/standby.cpp index cbab224f7..beb5bb055 100755 --- a/src/gausskernel/storage/ipc/standby.cpp +++ b/src/gausskernel/storage/ipc/standby.cpp @@ -1200,6 +1200,9 @@ void CleanUpMakeCommitAbort(List* committingCsnList) XLogBeginInsert(); XLogRegisterData((char *)action, sizeof(TransactionId)); XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_CSN_ABORTED); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } MemoryContext oldCtx = NULL; if (IsExtremeRtoRunning()) { diff --git a/src/gausskernel/storage/lmgr/lwlock.cpp b/src/gausskernel/storage/lmgr/lwlock.cpp index f2b5a1abb..8070ab919 100644 --- a/src/gausskernel/storage/lmgr/lwlock.cpp +++ b/src/gausskernel/storage/lmgr/lwlock.cpp @@ -396,6 +396,12 @@ int NumLWLocks(void) /* clog.c needs one per CLOG buffer */ numLocks += DMS_MAX_INSTANCES * NUM_CLOG_PARTITIONS * CLOGShmemBuffers(); + + if (ENABLE_DSS) { + numLocks += DMS_MAX_INSTANCES * (DSS_MAX_MXACTOFFSET + DSS_MAX_MXACTMEMBER); + } else { + numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS; + } #else /* clog.c needs one per CLOG buffer */ numLocks += CLOGShmemBuffers(); @@ -405,7 +411,6 @@ int NumLWLocks(void) /* clog.c needs one per CLOG buffer */ numLocks += NUM_CLOG_PARTITIONS * CLOGShmemBuffers(); -#endif /* multixact.c needs two SLRU areas */ if (ENABLE_DSS) { @@ -413,6 +418,7 @@ int NumLWLocks(void) } else { numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS; } +#endif /* async.c needs one per Async buffer */ numLocks += NUM_ASYNC_BUFFERS; diff --git a/src/gausskernel/storage/replication/bcm.cpp b/src/gausskernel/storage/replication/bcm.cpp index 4c00338f0..c14284fdb 100644 --- a/src/gausskernel/storage/replication/bcm.cpp +++ b/src/gausskernel/storage/replication/bcm.cpp @@ -193,6 +193,9 @@ void BCMLogCU(Relation rel, uint64 offset, int col, BCMBitStatus status, int cou cuBlock = cstore_offset_to_cstoreblock(offset, align_size); recptr = log_cu_bcm(&(rel->rd_node), col, cuBlock, status, count); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); @@ -427,6 +430,9 @@ void BCMSetStatusBit(Relation rel, uint64 heapBlk, Buffer buf, BCMBitStatus stat recptr = log_heap_bcm(&(rel->rd_node), 0, heapBlk, status); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } END_CRIT_SECTION(); } diff --git a/src/gausskernel/storage/replication/slotfuncs.cpp b/src/gausskernel/storage/replication/slotfuncs.cpp index e26518054..db67cbb81 100755 --- a/src/gausskernel/storage/replication/slotfuncs.cpp +++ b/src/gausskernel/storage/replication/slotfuncs.cpp @@ -65,6 +65,9 @@ void log_slot_create(const ReplicationSlotPersistentData *slotInfo, char* extra_ XLogRegisterData(extra_content, strlen(extra_content) + 1); } recptr = XLogInsert(RM_SLOT_ID, XLOG_SLOT_CREATE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); if (g_instance.attr.attr_storage.max_wal_senders > 0) WalSndWakeup(); @@ -99,6 +102,9 @@ void log_slot_advance(const ReplicationSlotPersistentData *slotInfo, char* extra XLogRegisterData(extra_content, strlen(extra_content) + 1); } Ptr = XLogInsert(RM_SLOT_ID, XLOG_SLOT_ADVANCE); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(Ptr); if (g_instance.attr.attr_storage.max_wal_senders > 0) WalSndWakeup(); @@ -132,6 +138,9 @@ void log_slot_drop(const char *name) XLogRegisterData((char *)&xlrec, ReplicationSlotPersistentDataConstSize); Ptr = XLogInsert(RM_SLOT_ID, XLOG_SLOT_DROP); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(Ptr); if (g_instance.attr.attr_storage.max_wal_senders > 0) WalSndWakeup(); @@ -160,6 +169,9 @@ void LogCheckSlot() XLogRegisterData((char *)LogicalSlot, size); recptr = XLogInsert(RM_SLOT_ID, XLOG_SLOT_CHECK); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); if (g_instance.attr.attr_storage.max_wal_senders > 0) WalSndWakeup(); @@ -1154,6 +1166,9 @@ void write_term_log(uint32 term) XLogRegisterData((char *)&term, sizeof(uint32)); recptr = XLogInsert(RM_SLOT_ID, XLOG_TERM_LOG); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif XLogWaitFlush(recptr); if (g_instance.attr.attr_storage.max_wal_senders > 0) { WalSndWakeup(); diff --git a/src/gausskernel/storage/smgr/cfs/cfs_md.cpp b/src/gausskernel/storage/smgr/cfs/cfs_md.cpp index 96568c942..db2a88476 100644 --- a/src/gausskernel/storage/smgr/cfs/cfs_md.cpp +++ b/src/gausskernel/storage/smgr/cfs/cfs_md.cpp @@ -773,6 +773,9 @@ void CfsShrinkRecord(const RelFileNode &node, ForkNumber forknum) XLogBeginInsert(); XLogRegisterData((char *)&data, sizeof(CfsShrink_t)); XLogRecPtr lsn = XLogInsert((RmgrId)RM_COMPRESSION_REL_ID, XLOG_CFS_SHRINK_OPERATION); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif END_CRIT_SECTION(); XLogWaitFlush(lsn); diff --git a/src/gausskernel/storage/smgr/segment/extent_group.cpp b/src/gausskernel/storage/smgr/segment/extent_group.cpp index ddfb334a2..4ffd3bcac 100644 --- a/src/gausskernel/storage/smgr/segment/extent_group.cpp +++ b/src/gausskernel/storage/smgr/segment/extent_group.cpp @@ -475,6 +475,9 @@ void eg_init_bitmap_page(SegExtentGroup *seg, BlockNumber pageno, BlockNumber fi XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT); XLogRecPtr rec_ptr = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_INIT_MAPPAGE, SegmentBktId); PageSetLSN(BufferGetPage(buffer), rec_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif END_CRIT_SECTION(); @@ -495,6 +498,9 @@ void eg_init_invrsptr_page(SegExtentGroup *seg, BlockNumber pageno) XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT); XLogRecPtr rec_ptr = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_INIT_INVRSPTR_PAGE, SegmentBktId); PageSetLSN(page, rec_ptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif END_CRIT_SECTION(); SegUnlockReleaseBuffer(buffer); @@ -562,6 +568,9 @@ void eg_add_map_group(SegExtentGroup *seg, BlockNumber pageno, uint8 group_size, XLogRecPtr recptr = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_ADD_NEW_GROUP, SegmentBktId); PageSetLSN(page, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif END_CRIT_SECTION(); } SEGMENTTEST(EXTENT_GROUP_ADD_NEW_GROUP_XLOG, (errmsg("EXTENT_GROUP_ADD_NEW_GROUP_XLOG %s: add new group xlog success!\n", @@ -785,6 +794,10 @@ void eg_create_if_necessary(SegExtentGroup *seg) SEGMENTTEST(EXTENT_GROUP_CREATE_EXTENT, (errmsg("EXTENT_GROUP_CREATE_EXTENT %s: create segment file success!\n", g_instance.attr.attr_common.PGXCNodeName))); t_thrd.pgxact->delayChkpt = false; +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif + END_CRIT_SECTION(); } } diff --git a/src/gausskernel/storage/smgr/segment/space.cpp b/src/gausskernel/storage/smgr/segment/space.cpp index c34489e3c..46bcdeaf6 100644 --- a/src/gausskernel/storage/smgr/segment/space.cpp +++ b/src/gausskernel/storage/smgr/segment/space.cpp @@ -366,6 +366,9 @@ SegSpace *spc_drop(Oid spcNode, Oid dbNode, bool redo) XLogRegisterData((char *)&spcNode, sizeof(Oid)); XLogRegisterData((char *)&dbNode, sizeof(Oid)); XLogRecPtr lsn = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_SPACE_DROP); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif END_CRIT_SECTION(); XLogWaitFlush(lsn); @@ -582,6 +585,9 @@ static void copy_extent(SegExtentGroup *seg, RelFileNode logic_rnode, uint32 log XLogRegisterData(pagedata, BLCKSZ); XLogRecPtr recptr = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_NEW_PAGE); PageSetLSN(pagedata, recptr); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* 2. double write */ if (dw_enabled() && pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) > 0) { @@ -1123,6 +1129,9 @@ void spc_shrink_files(SegExtentGroup *seg, BlockNumber target_size, bool redo) xlog_data.forknum = seg->forknum; XLogRegisterData((char *)&xlog_data, sizeof(XLogDataSpaceShrink)); XLogRecPtr lsn = XLogInsert(RM_SEGPAGE_ID, XLOG_SEG_SPACE_SHRINK); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif /* Standby's log is reported in "redo_space_shrink" */ ereport(LOG, (errmsg("call space shrink files, filename: %s, xlog lsn: %lX", diff --git a/src/gausskernel/storage/smgr/segment/xlog_atomic_op.cpp b/src/gausskernel/storage/smgr/segment/xlog_atomic_op.cpp index c37285cdd..40fc88c91 100644 --- a/src/gausskernel/storage/smgr/segment/xlog_atomic_op.cpp +++ b/src/gausskernel/storage/smgr/segment/xlog_atomic_op.cpp @@ -352,6 +352,9 @@ void XLogAtomicOperation::XLogCommit(RmgrId rmid, uint8 info, int bucket_id) } } } +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif if (critical_section) { END_CRIT_SECTION(); diff --git a/src/gausskernel/storage/smgr/segstore.cpp b/src/gausskernel/storage/smgr/segstore.cpp index 1a20f8ccb..2b0ca9e2f 100755 --- a/src/gausskernel/storage/smgr/segstore.cpp +++ b/src/gausskernel/storage/smgr/segstore.cpp @@ -1497,6 +1497,9 @@ void seg_extend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, cha PageSetLSN(BufferGetPage(seg_buffer), xlog_rec); PageSetLSN(buffer, xlog_rec); +#ifdef ENABLE_SS_MULTIMASTER + g_curr_lsn = 0; +#endif } SegUnlockReleaseBuffer(seg_buffer); diff --git a/src/include/access/multixact.h b/src/include/access/multixact.h index a9e0e0c89..90bb9764d 100644 --- a/src/include/access/multixact.h +++ b/src/include/access/multixact.h @@ -147,7 +147,7 @@ extern void SSMultiXactShmemClear(void); extern void SSGetMultiXactIdMembers(MultiXactId multi, int *nmembers, MultiXactMember **members, int owner); #ifdef ENABLE_SS_MULTIMASTER -extern Size SSMultiXactSize(void); +extern Size SSGetMultiXactSize(void); #endif #endif /* MULTIXACT_H */ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 823c5aa26..fe782dff4 100755 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -296,6 +296,9 @@ typedef struct CheckpointStatsData { */ typedef struct XLogFPWInfo { XLogRecPtr redoRecPtr; +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr logicRedoRecPtr; +#endif bool doPageWrites; bool forcePageWrites; } XLogFPWInfo; @@ -455,7 +458,7 @@ typedef struct XLogCtlInsert { */ XLogRecPtr RedoRecPtr; /* current redo point for insertions */ #ifdef ENABLE_SS_MULTIMASTER - uint32 RedoLogicLSN; + XLogRecPtr logicRedoRecPtr; #endif bool forcePageWrites; /* forcing full-page writes for PITR? */ bool fullPageWrites; @@ -482,6 +485,9 @@ typedef struct XLogCtlData { /* Protected by info_lck: */ XLogwrtRqst LogwrtRqst; XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */ +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr logicRedoRecPtr; +#endif TransactionId ckptXid; XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */ diff --git a/src/include/access/xlog_basic.h b/src/include/access/xlog_basic.h index 03cdb8897..4596c4bf5 100644 --- a/src/include/access/xlog_basic.h +++ b/src/include/access/xlog_basic.h @@ -504,10 +504,5 @@ typedef struct ShareStorageOperateCtl_ { extern List *readTimeLineHistory(TimeLineID targetTLI); -#ifdef ENABLE_SS_MULTIMASTER -extern void NodeArgsSwitch(XLogReaderArgs *args); -extern void CurrentArgsCopy(XLogReaderArgs *args); -#endif - #endif /* XLOG_BASIC_H */ diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index b552b5207..182944b90 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -180,6 +180,9 @@ typedef enum SSBroadcastOp { BCAST_CHECK_DB_BACKENDS, BCAST_SEND_SNAPSHOT, BCAST_RELOAD_REFORM_CTRL_PAGE, +#ifdef ENABLE_SS_MULTIMASTER + BCAST_REDO_DONE, +#endif BCAST_END } SSBroadcastOp; diff --git a/src/include/ddes/dms/ss_dms_recovery.h b/src/include/ddes/dms/ss_dms_recovery.h index bb5034444..bc1619581 100644 --- a/src/include/ddes/dms/ss_dms_recovery.h +++ b/src/include/ddes/dms/ss_dms_recovery.h @@ -59,6 +59,17 @@ g_instance.dms_cxt.SSRecoveryInfo.ondemand_recovery_pause_status == PAUSE_FOR_PRUNE_TRXN_QUEUE) #define REFORM_CTRL_VERSION 1 + +#ifdef ENABLE_SS_MULTIMASTER +#define SHEMVARCACHE_SIZE 200 +#define MUXACT_SIZE 16 + +typedef struct reformArgs { + char ShmemVariableCache[SHEMVARCACHE_SIZE]; + char PartMultiXactState[MUXACT_SIZE]; +} reformArgs; +#endif + typedef struct st_reformer_ctrl { uint32 version; uint64 list_stable; // stable instances list @@ -66,6 +77,10 @@ typedef struct st_reformer_ctrl { int recoveryInstId; SSGlobalClusterState clusterStatus; ClusterRunMode clusterRunMode; +#ifdef ENABLE_SS_MULTIMASTER + reformArgs argsList[DMS_MAX_INSTANCE]; + +#endif pg_crc32c crc; } ss_reformer_ctrl_t; @@ -145,6 +160,9 @@ typedef struct ss_recovery_info { volatile ondemand_realtime_build_status_t ondemand_realtime_build_status; bool dorado_sharestorage_inited; // used in dorado mode volatile ondemand_recovery_pause_status_t ondemand_recovery_pause_status; +#ifdef ENABLE_SS_MULTIMASTER + bool redo_done; +#endif } ss_recovery_info_t; typedef struct ondemand_htab_ctrl { @@ -167,4 +185,4 @@ void StartupOndemandRecovery(); void OndemandRealtimeBuildHandleFailover(); -#endif \ No newline at end of file +#endif diff --git a/src/include/ddes/dms/ss_transaction.h b/src/include/ddes/dms/ss_transaction.h index ba4d62b1c..222a04c48 100644 --- a/src/include/ddes/dms/ss_transaction.h +++ b/src/include/ddes/dms/ss_transaction.h @@ -127,6 +127,10 @@ void SSSendLatestSnapshotToStandby(TransactionId xmin, TransactionId xmax, Commi int SSUpdateLatestSnapshotOfStandby(char *data, uint32 len); int SSReloadReformCtrlPage(uint32 len); void SSRequestAllStandbyReloadReformCtrlPage(); +#ifdef ENABLE_SS_MULTIMASTER +void SSRequestAllNodeUpdateArgs(); +int SSUpdateArgs(uint32 len); +#endif bool SSCanFetchLocalSnapshotTxnRelatedInfo(); void SSGetMultiXactIdMembers(MultiXactId multi, int *nmembers, MultiXactMember **members, int owner); diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 30c3c3408..cedb76ff0 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -608,8 +608,9 @@ typedef struct knl_t_xlog_context { */ XLogRecPtr RedoRecPtr; #ifdef ENABLE_SS_MULTIMASTER - uint32 RedoLogicLSN; + XLogRecPtr logicRedoRecPtr; #endif + /* * doPageWrites is this backend's local copy of (forcePageWrites || * fullPageWrites). It is used together with RedoRecPtr to decide whether -- Gitee From 597473f3c8a5b7679d496a2929c736afcc0fdc0f Mon Sep 17 00:00:00 2001 From: wangjingyuan8 <1577039175@qq.com> Date: Mon, 24 Jun 2024 15:52:02 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=8C=89=E9=9C=80=E5=9B=9E=E6=94=BEbugfix?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../optimizer/commands/sequence/sequence.cpp | 8 ++++ .../storage/access/redo/redo_xlogutils.cpp | 10 +++++ .../storage/access/transam/multi_redo_api.cpp | 41 +++++++++++++++++++ .../storage/access/transam/xlog.cpp | 25 +++++++++-- .../storage/access/transam/xloginsert.cpp | 4 ++ .../storage/replication/logical/logical.cpp | 11 ++++- .../storage/replication/logical/snapbuild.cpp | 4 ++ src/include/access/multi_redo_api.h | 1 + src/include/access/xlog.h | 4 +- src/include/storage/buf/bufpage.h | 1 - 10 files changed, 100 insertions(+), 9 deletions(-) diff --git a/src/gausskernel/optimizer/commands/sequence/sequence.cpp b/src/gausskernel/optimizer/commands/sequence/sequence.cpp index 72eb7a540..02012ce6d 100644 --- a/src/gausskernel/optimizer/commands/sequence/sequence.cpp +++ b/src/gausskernel/optimizer/commands/sequence/sequence.cpp @@ -542,7 +542,11 @@ static int64 GetNextvalGlobal(SeqTable sess_elm, Relation seqrel) /* forced log to satisfy local demand for values */ log = fetch + SEQ_LOG_VALS; } else { +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr redoptr = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr; +#else XLogRecPtr redoptr = GetRedoRecPtr(); +#endif if (XLByteLE(PageGetLSN(page), redoptr)) { /* last update of seq was before checkpoint */ @@ -601,7 +605,11 @@ static int128 GetNextvalLocal(SeqTable elm, Relation seqrel) fetch = log = fetch + SEQ_LOG_VALS; logit = true; } else { +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr redoptr = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr; +#else XLogRecPtr redoptr = GetRedoRecPtr(); +#endif if (XLByteLE(PageGetLSN(page), redoptr)) { /* last update of seq was before checkpoint */ diff --git a/src/gausskernel/storage/access/redo/redo_xlogutils.cpp b/src/gausskernel/storage/access/redo/redo_xlogutils.cpp index 2aa458b12..ccb0d153a 100644 --- a/src/gausskernel/storage/access/redo/redo_xlogutils.cpp +++ b/src/gausskernel/storage/access/redo/redo_xlogutils.cpp @@ -1805,9 +1805,15 @@ bool XLogBlockRedoForExtremeRTO(XLogRecParseState *redoblocktate, RedoBufferInfo return false; } +#ifdef ENABLE_SS_MULTIMASTER + if ((block_valid != BLOCK_DATA_UNDO_TYPE) && g_instance.attr.attr_storage.EnableHotStandby && + IsDefaultExtremeRtoMode() && XLByteLT(PageGetLSN(bufferinfo->pageinfo.page), blockhead->logic_lsn) && + !IsSegmentFileNode(bufferinfo->blockinfo.rnode)) { +#else if ((block_valid != BLOCK_DATA_UNDO_TYPE) && g_instance.attr.attr_storage.EnableHotStandby && IsDefaultExtremeRtoMode() && XLByteLT(PageGetLSN(bufferinfo->pageinfo.page), blockhead->end_ptr) && !IsSegmentFileNode(bufferinfo->blockinfo.rnode)) { +#endif if (unlikely(bufferinfo->blockinfo.forknum >= EXRTO_FORK_NUM)) { ereport(PANIC, (errmsg("forknum is illegal: %d", bufferinfo->blockinfo.forknum))); } @@ -2044,7 +2050,11 @@ void redo_target_page(const BufferTag &buf_tag, StandbyReadLsnInfoArray *lsn_inf ereport(ERROR, (errmsg("redo_target_page: internal error, xlog in lsn %X/%X doesn't contain target block.", (uint32)(lsn_info->lsn_array[i] >> LSN_MOVE32), (uint32)(lsn_info->lsn_array[i])))); } +#ifdef ENABLE_SS_MULTIMASTER + buf_info.lsn = state_iter->blockparse.blockhead.logic_lsn; +#else buf_info.lsn = state_iter->blockparse.blockhead.end_ptr; +#endif buf_info.blockinfo.pblk = state_iter->blockparse.blockhead.pblk; wal_block_redo_for_extreme_rto_read(state_iter, &buf_info); XLogBlockParseStateRelease(state); diff --git a/src/gausskernel/storage/access/transam/multi_redo_api.cpp b/src/gausskernel/storage/access/transam/multi_redo_api.cpp index cc674ca1c..e357bc5a0 100644 --- a/src/gausskernel/storage/access/transam/multi_redo_api.cpp +++ b/src/gausskernel/storage/access/transam/multi_redo_api.cpp @@ -57,6 +57,47 @@ void StartUpMultiRedo(XLogReaderState *xlogreader, XLogReaderState **xlogreaderL parallel_recovery::StartRecoveryWorkers(xlogreader->ReadRecPtr); } } + +void InitReaderStateByOld(XLogReaderState *newState, XLogReaderState *oldState) +{ + newState->read_page = oldState->read_page; + newState->system_identifier = oldState->system_identifier; + newState->private_data = oldState->private_data; + newState->errormsg_buf = oldState->errormsg_buf; + newState->isPRProcess = oldState->isPRProcess; + + newState->ReadRecPtr = oldState->ReadRecPtr; + newState->EndRecPtr = oldState->EndRecPtr; +#ifdef ENABLE_SS_MULTIMASTER + newState->xlogPath = oldState->xlogPath; + newState->isEnd = oldState->isEnd; + newState->instId = oldState->instId; + newState->logicLSN = oldState->logicLSN; + newState->args = oldState->args; +#endif + newState->readSegNo = oldState->readSegNo; + newState->readOff = oldState->readOff; + newState->readPageTLI = oldState->readPageTLI; + newState->curReadSegNo = oldState->curReadSegNo; + newState->curReadOff = oldState->curReadOff; + newState->latestPagePtr = oldState->latestPagePtr; + newState->latestPageTLI = oldState->latestPageTLI; + newState->currRecPtr = oldState->currRecPtr; + newState->readBuf = oldState->readBuf; + newState->readLen = oldState->readLen; + newState->preReadStartPtr = oldState->preReadStartPtr; + newState->preReadBuf = oldState->preReadBuf; + + newState->decoded_record = NULL; + newState->main_data = NULL; + newState->main_data_len = 0; + + newState->max_block_id = -1; + newState->readblocks = 0; + /* move block clear to FreeRedoItem because we used MCXT_ALLOC_ZERO to alloc buf, if the variable is not init to 0, + you should put it here. */ + +} #endif bool IsMultiThreadRedoRunning() diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 8392efe7f..111bd8d1c 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -10197,7 +10197,11 @@ void StartupXLOG(void) SSGetRecoveryXlogPath(); #ifdef ENABLE_SS_MULTIMASTER if (SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD || SS_FORK_MODE) { + SSDisasterGetXlogPathList(); xlogreader = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER); + xlogreader->xlogPath = g_instance.dms_cxt.SSRecoveryInfo.xlog_list[SS_MY_INST_ID]; + xlogreader->instId = SS_MY_INST_ID; + xlogreader->isEnd = false; close_readFile_if_open(); } else { xlogreaderList = (XLogReaderState **)palloc(instanceNum * sizeof(XLogReaderState *)); @@ -10490,7 +10494,11 @@ void StartupXLOG(void) if (XLByteLT(checkPoint.redo, checkPointLoc)) { record = ReadRecord(xlogreader, checkPoint.redo, PANIC, false); } - checkpointRedoLogicLSN = record->logic_lsn; + if (record != NULL) { + checkpointRedoLogicLSN = record->logic_lsn; + } else { + checkpointRedoLogicLSN = InvalidXLogRecPtr; + } #endif } @@ -11378,6 +11386,9 @@ void StartupXLOG(void) GetRedoStartTime(t_thrd.xlog_cxt.timeCost[TIME_COST_STEP_1]); if (xlogreader->isPRProcess && IsExtremeRedo()) { record = ExtremeReadNextXLogRecord(&xlogreader, LOG); +#ifdef ENABLE_SS_MULTIMASTER + InitReaderStateByOld(xlogreaderList[xlogreader->instId], xlogreader); +#endif } else { xlogreader = newXlogReader; record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); @@ -11396,7 +11407,6 @@ void StartupXLOG(void) } CountRedoTime(t_thrd.xlog_cxt.timeCost[TIME_COST_STEP_1]); } while (record != NULL); // end of main redo apply loop - if (SS_IN_ONDEMAND_RECOVERY) { OnDemandSendRecoveryEndMarkToWorkersAndWaitForReach(0); } else { @@ -12997,8 +13007,14 @@ uint64 GetCheckPointRedoLogicLSN(XLogRecPtr redo) { } else { xlogreader = XLogReaderAllocate(&XLogPageRead, &readprivate); } + xlogreader->xlogPath = g_instance.dms_cxt.SSRecoveryInfo.xlog_list[SS_MY_INST_ID]; + xlogreader->instId = SS_MY_INST_ID; XLogRecord *record = ReadRecord(xlogreader, redo, LOG, false); - logicLSN = record->logic_lsn; + if (record != NULL) { + logicLSN = record->logic_lsn; + } else { + logicLSN = InvalidXLogRecPtr; + } ShutdownReadFileFacility(); XLogReaderFree(xlogreader); xlogreader = NULL; @@ -13556,7 +13572,7 @@ void CreateCheckPoint(int flags) XLogWaitFlush(recptr); #ifdef ENABLE_SS_MULTIMASTER - t_thrd.xlog_cxt.logicRedoRecPtr = xlogctl->Insert.logicRedoRecPtr = GetCheckPointRedoLogicLSN(checkPoint.redo); + t_thrd.xlog_cxt.logicRedoRecPtr = xlogctl->Insert.logicRedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr = GetCheckPointRedoLogicLSN(checkPoint.redo); g_curr_lsn = 0; #endif @@ -14078,6 +14094,7 @@ bool CreateRestartPoint(int flags) #ifdef ENABLE_SS_MULTIMASTER uint64 lastRedoLSN = GetCheckPointRedoLogicLSN(lastCheckPoint.redo); xlogctl->Insert.logicRedoRecPtr = lastRedoLSN; + t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr = lastRedoLSN; #endif #ifdef ENABLE_MOT CallCheckpointCallback(EVENT_CHECKPOINT_SNAPSHOT_READY, lastCheckPointRecPtr); diff --git a/src/gausskernel/storage/access/transam/xloginsert.cpp b/src/gausskernel/storage/access/transam/xloginsert.cpp index d540fbdd8..c5be1a0ac 100755 --- a/src/gausskernel/storage/access/transam/xloginsert.cpp +++ b/src/gausskernel/storage/access/transam/xloginsert.cpp @@ -1115,7 +1115,11 @@ XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std) /* * Update RedoRecPtr so that we can make the right decision */ +#ifdef ENABLE_SS_MULTIMASTER + RedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr; +#else RedoRecPtr = GetRedoRecPtr(); +#endif /* * We assume page LSN is first data on *every* page that can be passed to diff --git a/src/gausskernel/storage/replication/logical/logical.cpp b/src/gausskernel/storage/replication/logical/logical.cpp index 211690175..1f1d3f7b0 100644 --- a/src/gausskernel/storage/replication/logical/logical.cpp +++ b/src/gausskernel/storage/replication/logical/logical.cpp @@ -377,15 +377,24 @@ LogicalDecodingContext *CreateInitDecodingContext(const char *plugin, List *outp XLogRecPtr flushptr; /* start at current insert position */ +#ifdef ENABLE_SS_MULTIMASTER + slot->data.restart_lsn = GetXLogInsertLogicLSN(); +#else slot->data.restart_lsn = GetXLogInsertRecPtr(); +#endif /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); /* and make sure it's fsynced to disk */ XLogWaitFlush(flushptr); - } else + } else { +#ifdef ENABLE_SS_MULTIMASTER + slot->data.restart_lsn = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr; +#else slot->data.restart_lsn = GetRedoRecPtr(); +#endif + } /* prevent WAL removal as fast as possible */ ReplicationSlotsComputeRequiredLSN(NULL); diff --git a/src/gausskernel/storage/replication/logical/snapbuild.cpp b/src/gausskernel/storage/replication/logical/snapbuild.cpp index 2efcda067..c97e169c7 100644 --- a/src/gausskernel/storage/replication/logical/snapbuild.cpp +++ b/src/gausskernel/storage/replication/logical/snapbuild.cpp @@ -1768,7 +1768,11 @@ void CheckPointSnapBuild(void) * We start of with a minimum of the last redo pointer. No new replication * slot will start before that, so that's a safe upper bound for removal. */ +#ifdef ENABLE_SS_MULTIMASTER + redo = t_thrd.shemem_ptr_cxt.XLogCtl->logicRedoRecPtr; +#else redo = GetRedoRecPtr(); +#endif /* now check for the restart ptrs from existing slots */ cutoff = ReplicationSlotsComputeLogicalRestartLSN(); diff --git a/src/include/access/multi_redo_api.h b/src/include/access/multi_redo_api.h index 3a827824b..aad30c1fd 100644 --- a/src/include/access/multi_redo_api.h +++ b/src/include/access/multi_redo_api.h @@ -126,6 +126,7 @@ void MultiRedoMain(); void StartUpMultiRedo(XLogReaderState* xlogreader, uint32 privateLen); #else void StartUpMultiRedo(XLogReaderState *xlogreader, XLogReaderState **xlogreaderList, int instanceNum, uint32 privateLen); +void InitReaderStateByOld(XLogReaderState *newState, XLogReaderState *oldState); #endif void ProcTxnWorkLoad(bool force); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index fe782dff4..00578b479 100755 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -485,9 +485,6 @@ typedef struct XLogCtlData { /* Protected by info_lck: */ XLogwrtRqst LogwrtRqst; XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */ -#ifdef ENABLE_SS_MULTIMASTER - XLogRecPtr logicRedoRecPtr; -#endif TransactionId ckptXid; XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */ @@ -599,6 +596,7 @@ typedef struct XLogCtlData { #ifdef ENABLE_SS_MULTIMASTER uint32 lastReplayedLogicLSN; uint32 RedoStartLogicLSN; + XLogRecPtr logicRedoRecPtr; #endif /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ TimestampTz recoveryLastXTime; diff --git a/src/include/storage/buf/bufpage.h b/src/include/storage/buf/bufpage.h index ccebc6f3f..51e38f536 100644 --- a/src/include/storage/buf/bufpage.h +++ b/src/include/storage/buf/bufpage.h @@ -403,7 +403,6 @@ inline OffsetNumber PageGetMaxOffsetNumber(char* pghr) #define PageGetLSN(page) (((uint64)((PageHeader)(page))->pd_lsn.xlogid << 32) | ((PageHeader)(page))->pd_lsn.xrecoff) #define PageSetLSNInternal(page, lsn) \ (((PageHeader)(page))->pd_lsn.xlogid = (uint32)((lsn) >> 32), ((PageHeader)(page))->pd_lsn.xrecoff = (uint32)(lsn)) - #ifndef FRONTEND extern thread_local uint32 g_curr_lsn; inline void PageSetLSN(Page page, XLogRecPtr LSN, bool check = true) -- Gitee